From 1e6563c4a255b1d50d2acd0a5a7c0feb51620c61 Mon Sep 17 00:00:00 2001 From: n9k Date: Fri, 18 Feb 2022 01:32:34 +0000 Subject: [PATCH] Some more project structure --- anonstream/__init__.py | 7 ++-- anonstream/chat.py | 52 ++++++++++++++++++----------- anonstream/helpers/user.py | 8 +++-- anonstream/routes/nojs.py | 13 ++------ anonstream/routes/websocket.py | 18 +++------- anonstream/routes/wrappers.py | 35 ++++++++++--------- anonstream/static/anonstream.js | 17 +++++----- anonstream/templates/nojs_chat.html | 2 +- anonstream/user.py | 18 +++++----- anonstream/utils/chat.py | 6 ++-- anonstream/utils/user.py | 4 +-- anonstream/websocket.py | 24 ++++--------- 12 files changed, 100 insertions(+), 104 deletions(-) diff --git a/anonstream/__init__.py b/anonstream/__init__.py index aed7287..4ba49ac 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -42,9 +42,10 @@ async def create_app(): assert app.config['MAX_CHAT_MESSAGES'] >= app.config['MAX_CHAT_SCROLLBACK'] assert app.config['THRESHOLD_ABSENT'] >= app.config['THRESHOLD_IDLE'] - app.chat = {'messages': OrderedDict(), 'nonce_hashes': set()} - app.users = {} - app.websockets = set() + app.messages_by_id = OrderedDict() + app.users_by_token = {} + app.messages = app.messages_by_id.values() + app.users = app.users_by_token.values() app.segments_directory_cache = DirectoryCache(config['stream']['segments_dir']) async with app.app_context(): diff --git a/anonstream/chat.py b/anonstream/chat.py index e219a14..3fc230b 100644 --- a/anonstream/chat.py +++ b/anonstream/chat.py @@ -1,22 +1,37 @@ import time from datetime import datetime -from quart import escape +from quart import current_app, escape -from anonstream.user import users_for_websocket from anonstream.helpers.chat import generate_nonce_hash +from anonstream.utils.chat import message_for_websocket + +MESSAGES_BY_ID = current_app.messages_by_id +MESSAGES = current_app.messages +USERS_BY_TOKEN = current_app.users_by_token +USERS = current_app.users class Rejected(Exception): pass -async def broadcast(websockets, payload): - for queue in websockets: - await queue.put(payload) +async def broadcast(users, payload): + for user in users: + for queue in user['websockets']: + await queue.put(payload) -async def add_chat_message(chat, users, websockets, user, nonce, comment): +def messages_for_websocket(): + return list(map( + lambda message: message_for_websocket( + user=USERS_BY_TOKEN[message['token']], + message=message, + ), + MESSAGES, + )) + +async def add_chat_message(user, nonce, comment): # check message - nonce_hash = generate_nonce_hash(nonce) - if nonce_hash in chat['nonce_hashes']: + message_id = generate_nonce_hash(nonce) + if message_id in MESSAGES_BY_ID: raise Rejected('Discarded suspected duplicate message') if len(comment) == 0: raise Rejected('Message was empty') @@ -25,17 +40,19 @@ async def add_chat_message(chat, users, websockets, user, nonce, comment): timestamp_ms = time.time_ns() // 1_000_000 timestamp = timestamp_ms // 1000 try: - last_message = next(reversed(chat['messages'].values())) + last_message = next(reversed(MESSAGES)) except StopIteration: - message_id = timestamp_ms + seq = timestamp_ms else: - if timestamp <= last_message['id']: - message_id = last_message['id'] + 1 + if timestamp_ms > last_message['seq']: + seq = timestamp_ms + else: + seq = last_message['seq'] + 1 dt = datetime.utcfromtimestamp(timestamp) markup = escape(comment) - chat['messages'][message_id] = { + MESSAGES_BY_ID[message_id] = { 'id': message_id, - 'nonce_hash': nonce_hash, + 'seq': seq, 'token': user['token'], 'timestamp': timestamp, 'date': dt.strftime('%Y-%m-%d'), @@ -45,15 +62,12 @@ async def add_chat_message(chat, users, websockets, user, nonce, comment): 'markup': markup, } - # collect nonce hash - chat['nonce_hashes'].add(nonce_hash) - # broadcast message to websockets await broadcast( - websockets, + USERS, payload={ 'type': 'chat', - 'id': message_id, + 'seq': seq, 'token_hash': user['token_hash'], 'markup': markup, } diff --git a/anonstream/helpers/user.py b/anonstream/helpers/user.py index ce67a10..74e488b 100644 --- a/anonstream/helpers/user.py +++ b/anonstream/helpers/user.py @@ -14,7 +14,7 @@ def generate_token_hash(token): digest = hashlib.sha256(parts).digest() return base64.b32encode(digest)[:26].lower().decode() -def generate_user(token, broadcaster, timestamp): +def generate_user(timestamp, token, broadcaster): colour = generate_colour( seed='name\0' + token, bg=CONFIG['CHAT_BACKGROUND_COLOUR'], @@ -23,6 +23,7 @@ def generate_user(token, broadcaster, timestamp): return { 'token': token, 'token_hash': generate_token_hash(token), + 'websockets': set(), 'broadcaster': broadcaster, 'name': None, 'color': colour_to_color(colour), @@ -49,7 +50,10 @@ def is_idle(timestamp, user): return is_present(timestamp, user) and not is_watching(timestamp, user) def is_present(timestamp, user): - return user['seen']['last'] >= timestamp - CONFIG['THRESHOLD_ABSENT'] + return ( + user['seen']['last'] >= timestamp - CONFIG['THRESHOLD_ABSENT'] + or len(user['websockets']) > 0 + ) def is_absent(timestamp, user): return not is_present(timestamp, user) diff --git a/anonstream/routes/nojs.py b/anonstream/routes/nojs.py index 022fde9..008f1b4 100644 --- a/anonstream/routes/nojs.py +++ b/anonstream/routes/nojs.py @@ -24,8 +24,8 @@ async def nojs_chat(user): return await render_template( 'nojs_chat.html', user=user, - users=current_app.users, - messages=current_app.chat['messages'].values(), + users_by_token=current_app.users_by_token, + messages=current_app.messages, get_default_name=get_default_name, ) @@ -53,14 +53,7 @@ async def nojs_submit_message(user): nonce = form.get('nonce', '') try: - await add_chat_message( - chat=current_app.chat, - users=current_app.users, - websockets=current_app.websockets, - user=user, - nonce=nonce, - comment=comment, - ) + await add_chat_message(user, nonce, comment) except Rejected as e: notice, *_ = e.args notice_id = add_notice(user, notice) diff --git a/anonstream/routes/websocket.py b/anonstream/routes/websocket.py index 012c013..6ef4bf2 100644 --- a/anonstream/routes/websocket.py +++ b/anonstream/routes/websocket.py @@ -9,21 +9,11 @@ from anonstream.routes.wrappers import with_user_from @with_user_from(websocket) async def live(user): queue = asyncio.Queue() - current_app.websockets.add(queue) + user['websockets'].add(queue) - producer = websocket_outbound( - queue=queue, - messages=current_app.chat['messages'].values(), - users=current_app.users, - ) - consumer = websocket_inbound( - queue=queue, - chat=current_app.chat, - users=current_app.users, - connected_websockets=current_app.websockets, - user=user, - ) + producer = websocket_outbound(queue) + consumer = websocket_inbound(queue, user) try: await asyncio.gather(producer, consumer) finally: - current_app.websockets.remove(queue) + user['websockets'].remove(queue) diff --git a/anonstream/routes/wrappers.py b/anonstream/routes/wrappers.py index ec1891c..c8bd0a8 100644 --- a/anonstream/routes/wrappers.py +++ b/anonstream/routes/wrappers.py @@ -5,17 +5,22 @@ from quart import current_app, request, abort, make_response from werkzeug.security import check_password_hash from anonstream.user import sunset, user_for_websocket -from anonstream.websocket import broadcast +from anonstream.chat import broadcast from anonstream.helpers.user import generate_user from anonstream.utils.user import generate_token +CONFIG = current_app.config +MESSAGES = current_app.messages +USERS_BY_TOKEN = current_app.users_by_token +USERS = current_app.users + def check_auth(context): auth = context.authorization return ( auth is not None and auth.type == "basic" - and auth.username == current_app.config["AUTH_USERNAME"] - and check_password_hash(current_app.config["AUTH_PWHASH"], auth.password) + and auth.username == CONFIG["AUTH_USERNAME"] + and check_password_hash(CONFIG["AUTH_PWHASH"], auth.password) ) def auth_required(f): @@ -44,40 +49,40 @@ def with_user_from(context): # Check if broadcaster broadcaster = check_auth(context) if broadcaster: - token = current_app.config['AUTH_TOKEN'] + token = CONFIG['AUTH_TOKEN'] else: token = context.args.get('token') or context.cookies.get('token') or generate_token() # Remove non-visible absent users - token_hashes = sunset( - messages=current_app.chat['messages'].values(), - users=current_app.users, + sunsetted_token_hashes = sunset( + messages=MESSAGES, + users_by_token=USERS_BY_TOKEN, ) - if len(token_hashes) > 0: + if sunsetted_token_hashes: await broadcast( - current_app.websockets, + users=USERS, payload={ 'type': 'rem-users', - 'token_hashes': token_hashes, + 'token_hashes': sunsetted_token_hashes, } ) # Update / create user - user = current_app.users.get(token) + user = USERS_BY_TOKEN.get(token) if user is not None: user['seen']['last'] = timestamp else: user = generate_user( - secret=current_app.config['SECRET_KEY'], + timestamp=timestamp, token=token, broadcaster=broadcaster, - timestamp=timestamp, ) - current_app.users[token] = user + USERS_BY_TOKEN[token] = user await broadcast( - current_app.websockets, + USERS, payload={ 'type': 'add-user', + 'token_hash': user['token_hash'], 'user': user_for_websocket(user), } ) diff --git a/anonstream/static/anonstream.js b/anonstream/static/anonstream.js index 249cab8..31d8ad3 100644 --- a/anonstream/static/anonstream.js +++ b/anonstream/static/anonstream.js @@ -52,7 +52,7 @@ const create_chat_message = (object) => { const chat_message = document.createElement("li"); chat_message.classList.add("chat-message"); - chat_message.dataset.id = object.id; + chat_message.dataset.seq = object.seq; chat_message.dataset.tokenHash = object.token_hash; const chat_message_name = document.createElement("span"); @@ -126,11 +126,11 @@ const on_websocket_message = (event) => { users = receipt.users; update_user_styles(); - const ids = new Set(receipt.chat.map((message) => {return message.id;})); + const seqs = new Set(receipt.messages.map((message) => {return message.seq;})); const to_delete = []; for (const chat_message of chat_messages.children) { - const chat_message_id = parseInt(chat_message.dataset.id); - if (!ids.has(chat_message_id)) { + const chat_message_seq = parseInt(chat_message.dataset.seq); + if (!seqs.has(chat_message_seq)) { to_delete.push(chat_message); } } @@ -138,9 +138,10 @@ const on_websocket_message = (event) => { chat_message.remove(); } - const last_id = Math.max(...[...chat_messages.children].map((element) => parseInt(element.dataset.id))); - for (const message of receipt.chat) { - if (message.id > last_id) { + const last = chat_messages.children.length == 0 ? null : chat_messages.children[chat_messages.children.length - 1]; + const last_seq = last === null ? null : parseInt(last.dataset.seq); + for (const message of receipt.messages) { + if (message.seq > last_seq) { const chat_message = create_chat_message(message); chat_messages.insertAdjacentElement("beforeend", chat_message); } @@ -184,7 +185,7 @@ const on_websocket_message = (event) => { case "add-user": console.log("ws add-user", receipt); - users[receipt.user.token_hash] = receipt.user; + users[receipt.token_hash] = receipt.user; update_user_styles(); break; diff --git a/anonstream/templates/nojs_chat.html b/anonstream/templates/nojs_chat.html index 044bbca..d80a5dd 100644 --- a/anonstream/templates/nojs_chat.html +++ b/anonstream/templates/nojs_chat.html @@ -48,7 +48,7 @@