diff --git a/anonstream/__init__.py b/anonstream/__init__.py index 69373ff..cc6af77 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -31,8 +31,9 @@ def create_app(config_file): 'MAX_CAPTCHAS': config['memory']['captchas'], 'MAX_CHAT_MESSAGES': config['memory']['chat_messages'], 'MAX_CHAT_SCROLLBACK': config['memory']['chat_scrollback'], - 'CHECKUP_PERIOD_USER': config['intervals']['sunset_users'], - 'CHECKUP_PERIOD_CAPTCHA': config['intervals']['expire_captchas'], + 'TASK_PERIOD_ROTATE_USERS': config['tasks']['rotate_users'], + 'TASK_PERIOD_ROTATE_CAPTCHAS': config['tasks']['rotate_captchas'], + 'TASK_PERIOD_BROADCAST_USERS_UPDATE': config['tasks']['broadcast_users_update'], 'THRESHOLD_USER_NOTWATCHING': config['thresholds']['user_notwatching'], 'THRESHOLD_USER_TENTATIVE': config['thresholds']['user_tentative'], 'THRESHOLD_USER_ABSENT': config['thresholds']['user_absent'], @@ -61,13 +62,18 @@ def create_app(config_file): ) app.messages_by_id = OrderedDict() - app.users_by_token = {} app.messages = app.messages_by_id.values() + + app.users_by_token = {} app.users = app.users_by_token.values() + app.segments_directory_cache = DirectoryCache(config['stream']['segments_dir']) + + app.captchas = OrderedDict() app.captcha_factory = create_captcha_factory(app.config['CAPTCHA_FONTS']) app.captcha_signer = create_captcha_signer(app.config['SECRET_KEY']) - app.captchas = OrderedDict() + + app.users_update_buffer = set() app.background_sleep = set() diff --git a/anonstream/broadcast.py b/anonstream/broadcast.py new file mode 100644 index 0000000..07c43e7 --- /dev/null +++ b/anonstream/broadcast.py @@ -0,0 +1,29 @@ +from quart import current_app + +from anonstream.utils.user import get_user_for_websocket + +USERS = current_app.users +USERS_BY_TOKEN = current_app.users_by_token +USERS_UPDATE_BUFFER = current_app.users_update_buffer + +def broadcast(users, payload): + for user in users: + for queue in user['websockets']: + queue.put_nowait(payload) + +def broadcast_users_update(): + users_for_websocket = {} + for token in USERS_UPDATE_BUFFER: + user = USERS_BY_TOKEN[token] + token_hash = user['token_hash'] + users_for_websocket[token_hash] = get_user_for_websocket(user) + + if users_for_websocket: + broadcast( + users=USERS, + payload={ + 'type': 'set-users', + 'users': users_for_websocket, + }, + ) + USERS_UPDATE_BUFFER.clear() diff --git a/anonstream/chat.py b/anonstream/chat.py index f39cdf2..b9bedb4 100644 --- a/anonstream/chat.py +++ b/anonstream/chat.py @@ -3,8 +3,9 @@ from datetime import datetime from quart import current_app, escape +from anonstream.broadcast import broadcast, broadcast_users_update from anonstream.helpers.chat import generate_nonce_hash, get_scrollback -from anonstream.utils.chat import message_for_websocket +from anonstream.utils.chat import get_message_for_websocket CONFIG = current_app.config MESSAGES_BY_ID = current_app.messages_by_id @@ -15,14 +16,9 @@ USERS = current_app.users class Rejected(ValueError): pass -def broadcast(users, payload): - for user in users: - for queue in user['websockets']: - queue.put_nowait(payload) - -def messages_for_websocket(): +def get_all_messages_for_websocket(): return list(map( - lambda message: message_for_websocket( + lambda message: get_message_for_websocket( user=USERS_BY_TOKEN[message['token']], message=message, ), @@ -30,11 +26,11 @@ def messages_for_websocket(): )) def add_chat_message(user, nonce, comment, ignore_empty=False): - # special case: if the comment is empty, do nothing and return + # Special case: if the comment is empty, do nothing and return if ignore_empty and len(comment) == 0: return - # check message + # Check message message_id = generate_nonce_hash(nonce) if message_id in MESSAGES_BY_ID: raise Rejected('Discarded suspected duplicate message') @@ -43,7 +39,7 @@ def add_chat_message(user, nonce, comment, ignore_empty=False): if len(comment) > 512: raise Rejected('Message exceeded 512 chars') - # add message + # Create and add message timestamp_ms = time.time_ns() // 1_000_000 timestamp = timestamp_ms // 1000 try: @@ -72,7 +68,11 @@ def add_chat_message(user, nonce, comment, ignore_empty=False): while len(MESSAGES_BY_ID) > CONFIG['MAX_CHAT_MESSAGES']: MESSAGES_BY_ID.pop(last=False) - # broadcast message to websockets + # Broadcast a users update to all websockets, + # in case this message is from a new user + broadcast_users_update() + + # Broadcast message to websockets broadcast( USERS, payload={ diff --git a/anonstream/routes/wrappers.py b/anonstream/routes/wrappers.py index c82ca78..340cd7e 100644 --- a/anonstream/routes/wrappers.py +++ b/anonstream/routes/wrappers.py @@ -5,8 +5,8 @@ from functools import wraps from quart import current_app, request, abort, make_response, render_template, request from werkzeug.security import check_password_hash -from anonstream.user import see, user_for_websocket -from anonstream.chat import broadcast +from anonstream.broadcast import broadcast +from anonstream.user import see from anonstream.helpers.user import generate_user from anonstream.utils.user import generate_token @@ -14,6 +14,7 @@ CONFIG = current_app.config MESSAGES = current_app.messages USERS_BY_TOKEN = current_app.users_by_token USERS = current_app.users +USERS_UPDATE_BUFFER = current_app.users_update_buffer def check_auth(context): auth = context.authorization @@ -65,14 +66,9 @@ def with_user_from(context): broadcaster=broadcaster, ) USERS_BY_TOKEN[token] = user - broadcast( - USERS, - payload={ - 'type': 'add-user', - 'token_hash': user['token_hash'], - 'user': user_for_websocket(user), - }, - ) + + # Add to the users update buffer + USERS_UPDATE_BUFFER.add(token) # Set cookie response = await f(user, *args, **kwargs) diff --git a/anonstream/static/anonstream.js b/anonstream/static/anonstream.js index 02bdab0..5299315 100644 --- a/anonstream/static/anonstream.js +++ b/anonstream/static/anonstream.js @@ -329,31 +329,24 @@ const on_websocket_message = (event) => { }); break; - case "add-user": - console.log("ws add-user", receipt); - users[receipt.token_hash] = receipt.user; - update_user_colors(receipt.token_hash); - update_user_tripcodes(receipt.token_hash); - break; - - case "mut-user": - console.log("ws mut-user", receipt); - const user = users[receipt.token_hash]; - user.name = receipt.name; - user.color = receipt.color; - user.tripcode = receipt.tripcode; - update_user_names(receipt.token_hash); - update_user_colors(receipt.token_hash); - update_user_tripcodes(receipt.token_hash); - break; + case "set-users": + console.log("ws set-users", receipt); + for (const token_hash of Object.keys(receipt.users)) { + users[token_hash] = receipt.users[token_hash]; + } + update_user_names(); + update_user_colors(); + update_user_tripcodes(); + break; case "rem-users": - console.log("ws rem-users", receipt); - for (const token_hash of receipt.token_hashes) { - delete users[token_hash]; - } - update_user_styles(); - break; + console.log("ws rem-users", receipt); + for (const token_hash of receipt.token_hashes) { + delete users[token_hash]; + } + update_user_colors(); + update_user_tripcodes(); + break; default: console.log("incomprehensible websocket message", receipt); diff --git a/anonstream/tasks.py b/anonstream/tasks.py index bccb657..49f2d53 100644 --- a/anonstream/tasks.py +++ b/anonstream/tasks.py @@ -3,7 +3,7 @@ from functools import wraps from quart import current_app -from anonstream.chat import broadcast +from anonstream.broadcast import broadcast, broadcast_users_update from anonstream.wrappers import with_timestamp from anonstream.helpers.user import is_visible @@ -36,7 +36,7 @@ def with_period(period): return periodically -@with_period(CONFIG['CHECKUP_PERIOD_USER']) +@with_period(CONFIG['TASK_PERIOD_ROTATE_USERS']) @with_timestamp def t_sunset_users(timestamp): tokens = [] @@ -51,6 +51,10 @@ def t_sunset_users(timestamp): token_hash = USERS_BY_TOKEN.pop(token)['token_hash'] token_hashes.append(token_hash) + # Broadcast a users update, in case any users being + # removed have been mutated or are new. + broadcast_users_update() + if token_hashes: broadcast( users=USERS, @@ -60,4 +64,9 @@ def t_sunset_users(timestamp): }, ) +@with_period(CONFIG['TASK_PERIOD_BROADCAST_USERS_UPDATE']) +def t_broadcast_users_update(): + broadcast_users_update() + current_app.add_background_task(t_sunset_users) +current_app.add_background_task(t_broadcast_users_update) diff --git a/anonstream/user.py b/anonstream/user.py index 395bcb1..9c3197f 100644 --- a/anonstream/user.py +++ b/anonstream/user.py @@ -3,18 +3,18 @@ from math import inf from quart import current_app -from anonstream.chat import broadcast from anonstream.wrappers import try_except_log, with_timestamp from anonstream.helpers.user import is_visible from anonstream.helpers.captcha import check_captcha_digest, Answer from anonstream.helpers.tripcode import generate_tripcode from anonstream.utils.colour import color_to_colour, get_contrast, NotAColor -from anonstream.utils.user import user_for_websocket +from anonstream.utils.user import get_user_for_websocket CONFIG = current_app.config MESSAGES = current_app.messages USERS = current_app.users CAPTCHA_SIGNER = current_app.captcha_signer +USERS_UPDATE_BUFFER = current_app.users_update_buffer class BadAppearance(ValueError): pass @@ -57,16 +57,8 @@ def try_change_appearance(user, name, color, password, elif want_change_tripcode: change_tripcode(user, password) - broadcast( - USERS, - payload={ - 'type': 'mut-user', - 'token_hash': user['token_hash'], - 'name': user['name'], - 'color': user['color'], - 'tripcode': user['tripcode'], - }, - ) + # Add to the users update buffer + USERS_UPDATE_BUFFER.add(user['token']) return errors @@ -113,13 +105,13 @@ def see(user): user['last']['seen'] = int(time.time()) @with_timestamp -def users_for_websocket(timestamp): +def get_all_users_for_websocket(timestamp): visible_users = filter( lambda user: is_visible(timestamp, MESSAGES, user), USERS, ) return { - user['token_hash']: user_for_websocket(user) + user['token_hash']: get_user_for_websocket(user) for user in visible_users } diff --git a/anonstream/utils/chat.py b/anonstream/utils/chat.py index 907b7f9..7e42f13 100644 --- a/anonstream/utils/chat.py +++ b/anonstream/utils/chat.py @@ -8,7 +8,7 @@ class NonceReuse(Exception): def generate_nonce(): return secrets.token_urlsafe(16) -def message_for_websocket(user, message): +def get_message_for_websocket(user, message): message_keys = ('seq', 'date', 'time_minutes', 'time_seconds', 'markup') user_keys = ('token_hash',) return { diff --git a/anonstream/utils/user.py b/anonstream/utils/user.py index a4a9453..48ad26c 100644 --- a/anonstream/utils/user.py +++ b/anonstream/utils/user.py @@ -9,7 +9,7 @@ from quart import escape, Markup def generate_token(): return secrets.token_hex(16) -def user_for_websocket(user): +def get_user_for_websocket(user): keys = ['broadcaster', 'name', 'color', 'tripcode'] return {key: user[key] for key in keys} diff --git a/anonstream/websocket.py b/anonstream/websocket.py index 5f5ea84..495e3f2 100644 --- a/anonstream/websocket.py +++ b/anonstream/websocket.py @@ -4,9 +4,8 @@ from quart import current_app, websocket from anonstream.stream import get_stream_title, get_stream_uptime from anonstream.captcha import get_random_captcha_digest -from anonstream.chat import messages_for_websocket, add_chat_message, Rejected -from anonstream.user import users_for_websocket, see, verify, BadCaptcha -from anonstream.wrappers import with_first_argument +from anonstream.chat import get_all_messages_for_websocket, add_chat_message, Rejected +from anonstream.user import get_all_users_for_websocket, see, verify, BadCaptcha from anonstream.utils.chat import generate_nonce from anonstream.utils.websocket import parse_websocket_data, Malformed @@ -18,8 +17,8 @@ async def websocket_outbound(queue, user): 'nonce': generate_nonce(), 'title': get_stream_title(), 'uptime': get_stream_uptime(), - 'messages': messages_for_websocket(), - 'users': users_for_websocket(), + 'messages': get_all_messages_for_websocket(), + 'users': get_all_users_for_websocket(), 'default': { True: CONFIG['DEFAULT_HOST_NAME'], False: CONFIG['DEFAULT_ANON_NAME'], diff --git a/config.toml b/config.toml index df2e829..02b8b7d 100644 --- a/config.toml +++ b/config.toml @@ -20,9 +20,10 @@ captchas = 256 chat_messages = 8192 chat_scrollback = 256 -[intervals] -sunset_users = 60 -expire_captchas = 60 +[tasks] +rotate_users = 60 +rotate_captchas = 60 +broadcast_users_update = 4 [names] broadcaster = "Broadcaster"