From 886c360e260e957fac45f6eac52af32138c22419 Mon Sep 17 00:00:00 2001 From: n9k Date: Sun, 27 Feb 2022 01:01:32 +0000 Subject: [PATCH] Tell websockets which users are watching This adds a field 'watching' in `user_for_websocket` that's True iff WATCHING, False iff NOTWATCHING, and None otherwise (since clients don't need to know if a user is tentative or absent). When the value of this field changes for any user, they get added to the update buffer (like with any other change). Removed race condition in `t_sunset_users`: `broadcast_users_update` was being called *after* a user was removed from memory (and for each user being removed, which was redundant). In that scenario if there's a user in the update buffer and `t_sunset_users` wins the race between it and `t_broadcast_users_update`, then when `t_sunset_users` calls `broadcast_users_update` a KeyError would be raised since the user's already been removed. Fixed unintended behaviour of `t_sunset_users`: it was removing users based on the result of `is_visible`, so users who were actually tenative (as opposed to absent) were being removed. --- anonstream/helpers/user.py | 32 ++------------------ anonstream/routes/nojs.py | 4 +-- anonstream/routes/wrappers.py | 3 +- anonstream/stream.py | 10 +++++-- anonstream/tasks.py | 23 ++++++--------- anonstream/user.py | 55 ++++++++++++++++++++++++++++------- anonstream/utils/user.py | 31 +++++++++++++++++--- 7 files changed, 96 insertions(+), 62 deletions(-) diff --git a/anonstream/helpers/user.py b/anonstream/helpers/user.py index a1df64e..d8d9f5d 100644 --- a/anonstream/helpers/user.py +++ b/anonstream/helpers/user.py @@ -1,25 +1,15 @@ import hashlib import base64 from collections import OrderedDict -from enum import Enum from math import inf from quart import current_app from anonstream.utils.colour import generate_colour, colour_to_color +from anonstream.utils.user import Presence CONFIG = current_app.config -Presence = Enum( - 'Presence', - names=( - 'WATCHING', - 'NOTWATCHING', - 'TENTATIVE', - 'ABSENT', - ) -) - def generate_token_hash_and_tag(token): parts = CONFIG['SECRET_KEY'] + b'token-hash\0' + token.encode() digest = hashlib.sha256(parts).digest() @@ -29,7 +19,7 @@ def generate_token_hash_and_tag(token): return token_hash, tag -def generate_user(timestamp, token, broadcaster): +def generate_user(timestamp, token, broadcaster, presence): colour = generate_colour( seed='name\0' + token, bg=CONFIG['CHAT_BACKGROUND_COLOUR'], @@ -51,6 +41,7 @@ def generate_user(timestamp, token, broadcaster): 'seen': timestamp, 'watching': -inf, }, + 'presence': presence, } def get_default_name(user): @@ -75,20 +66,3 @@ def get_presence(timestamp, user): return Presence.TENTATIVE return Presence.ABSENT - -def is_watching(timestamp, user): - return get_presence(timestamp, user) == Presence.WATCHING - -def is_listed(timestamp, user): - return ( - get_presence(timestamp, user) - in {Presence.WATCHING, Presence.NOTWATCHING} - ) - -def is_visible(timestamp, messages, user): - def user_left_messages(): - return any( - message['token'] == user['token'] - for message in messages - ) - return is_listed(timestamp, user) or user_left_messages() diff --git a/anonstream/routes/nojs.py b/anonstream/routes/nojs.py index 85e1b00..1ba0dc3 100644 --- a/anonstream/routes/nojs.py +++ b/anonstream/routes/nojs.py @@ -3,10 +3,10 @@ from quart import current_app, request, render_template, redirect, url_for, esca from anonstream.captcha import get_random_captcha_digest_for from anonstream.chat import add_chat_message, Rejected from anonstream.stream import get_stream_title, get_stream_uptime, get_stream_viewership -from anonstream.user import add_state, pop_state, try_change_appearance, get_users_by_presence, verify, deverify, BadCaptcha +from anonstream.user import add_state, pop_state, try_change_appearance, get_users_by_presence, Presence, verify, deverify, BadCaptcha from anonstream.routes.wrappers import with_user_from, render_template_with_etag from anonstream.helpers.chat import get_scrollback -from anonstream.helpers.user import get_default_name, Presence +from anonstream.helpers.user import get_default_name from anonstream.utils.chat import generate_nonce from anonstream.utils.user import concatenate_for_notice diff --git a/anonstream/routes/wrappers.py b/anonstream/routes/wrappers.py index f387f0e..ca1ae14 100644 --- a/anonstream/routes/wrappers.py +++ b/anonstream/routes/wrappers.py @@ -8,7 +8,7 @@ from werkzeug.security import check_password_hash from anonstream.broadcast import broadcast from anonstream.user import see from anonstream.helpers.user import generate_user -from anonstream.utils.user import generate_token +from anonstream.utils.user import generate_token, Presence CONFIG = current_app.config MESSAGES = current_app.messages @@ -68,6 +68,7 @@ def with_user_from(context): timestamp=timestamp, token=token, broadcaster=broadcaster, + presence=Presence.NOTWATCHING, ) USERS_BY_TOKEN[token] = user diff --git a/anonstream/stream.py b/anonstream/stream.py index e3eeebe..cf642f6 100644 --- a/anonstream/stream.py +++ b/anonstream/stream.py @@ -1,3 +1,5 @@ +import itertools +import operator import time import aiofiles @@ -5,7 +7,7 @@ from quart import current_app from anonstream.segments import get_playlist, Offline from anonstream.wrappers import ttl_cache_async, with_timestamp -from anonstream.helpers.user import is_watching +from anonstream.user import get_watching_users CONFIG = current_app.config USERS = current_app.users @@ -36,7 +38,11 @@ def get_stream_uptime(rounded=True): @with_timestamp def get_stream_viewership(timestamp): - return sum(map(lambda user: is_watching(timestamp, user), USERS)) + users = get_watching_users(timestamp) + return max( + map(operator.itemgetter(0), zip(itertools.count(1), users)), + default=0, + ) def get_stream_viewership_or_none(uptime): viewership = get_stream_viewership() diff --git a/anonstream/tasks.py b/anonstream/tasks.py index 9c835ea..1c9885f 100644 --- a/anonstream/tasks.py +++ b/anonstream/tasks.py @@ -6,8 +6,8 @@ from quart import current_app from anonstream.broadcast import broadcast, broadcast_users_update from anonstream.stream import is_online, get_stream_title, get_stream_uptime, get_stream_viewership_or_none +from anonstream.user import get_sunsettable_users from anonstream.wrappers import with_timestamp -from anonstream.helpers.user import is_visible CONFIG = current_app.config MESSAGES = current_app.messages @@ -46,21 +46,16 @@ async def t_sunset_users(timestamp, iteration): if iteration == 0: return - tokens = [] - for token in USERS_BY_TOKEN: - user = USERS_BY_TOKEN[token] - if not is_visible(timestamp, MESSAGES, user): - tokens.append(token) + # Broadcast a users update, in case any users being + # removed have been mutated or are new. + broadcast_users_update() token_hashes = [] - while tokens: - token = tokens.pop() - token_hash = USERS_BY_TOKEN.pop(token)['token_hash'] - token_hashes.append(token_hash) - - # Broadcast a users update, in case any users being - # removed have been mutated or are new. - broadcast_users_update() + users = list(get_sunsettable_users(timestamp)) + while users: + user = users.pop() + USERS_BY_TOKEN.pop(user['token']) + token_hashes.append(user['token_hash']) if token_hashes: broadcast( diff --git a/anonstream/user.py b/anonstream/user.py index 7633aaa..209229c 100644 --- a/anonstream/user.py +++ b/anonstream/user.py @@ -4,11 +4,11 @@ from math import inf from quart import current_app from anonstream.wrappers import try_except_log, with_timestamp -from anonstream.helpers.user import is_visible, get_presence, Presence +from anonstream.helpers.user import get_presence, Presence from anonstream.helpers.captcha import check_captcha_digest, Answer from anonstream.helpers.tripcode import generate_tripcode from anonstream.utils.colour import color_to_colour, get_contrast, NotAColor -from anonstream.utils.user import get_user_for_websocket +from anonstream.utils.user import get_user_for_websocket, trilean CONFIG = current_app.config MESSAGES = current_app.messages @@ -117,13 +117,9 @@ def watched(timestamp, user): @with_timestamp def get_all_users_for_websocket(timestamp): - visible_users = filter( - lambda user: is_visible(timestamp, MESSAGES, user), - USERS, - ) return { user['token_hash']: get_user_for_websocket(user) - for user in visible_users + for user in get_unsunsettable_users(timestamp) } def verify(user, digest, answer): @@ -159,6 +155,46 @@ def deverify(timestamp, user): if n_user_messages >= CONFIG['FLOOD_THRESHOLD']: user['verified'] = False +def get_users_and_update_presence(timestamp): + for user in USERS: + old, user['presence'] = user['presence'], get_presence(timestamp, user) + if trilean(user['presence']) != trilean(old): + USERS_UPDATE_BUFFER.add(user['token']) + yield user + +def get_watching_users(timestamp): + return filter( + lambda user: user['presence'] == Presence.WATCHING, + get_users_and_update_presence(timestamp), + ) + +def get_absent_users(timestamp): + return filter( + lambda user: user['presence'] == Presence.ABSENT, + get_users_and_update_presence(timestamp), + ) + +def is_sunsettable(user): + return user['presence'] == Presence.ABSENT and not has_left_messages(user) + +def has_left_messages(user): + return any( + message['token'] == user['token'] + for message in MESSAGES + ) + +def get_sunsettable_users(timestamp): + return filter( + is_sunsettable, + get_users_and_update_presence(timestamp), + ) + +def get_unsunsettable_users(timestamp): + return filter( + lambda user: not is_sunsettable(user), + get_users_and_update_presence(timestamp), + ) + @with_timestamp def get_users_by_presence(timestamp): users_by_presence = { @@ -167,7 +203,6 @@ def get_users_by_presence(timestamp): Presence.TENTATIVE: [], Presence.ABSENT: [], } - for user in USERS: - presence = get_presence(timestamp, user) - users_by_presence[presence].append(user) + for user in get_users_and_update_presence(timestamp): + users_by_presence[user['presence']].append(user) return users_by_presence diff --git a/anonstream/utils/user.py b/anonstream/utils/user.py index 912e139..5132b16 100644 --- a/anonstream/utils/user.py +++ b/anonstream/utils/user.py @@ -2,17 +2,24 @@ import base64 import hashlib import secrets from collections import OrderedDict +from enum import Enum from math import inf from quart import escape, Markup +Presence = Enum( + 'Presence', + names=( + 'WATCHING', + 'NOTWATCHING', + 'TENTATIVE', + 'ABSENT', + ) +) + def generate_token(): return secrets.token_hex(16) -def get_user_for_websocket(user): - keys = ['broadcaster', 'name', 'color', 'tripcode', 'tag'] - return {key: user[key] for key in keys} - def concatenate_for_notice(string, *tuples): if not tuples: return string @@ -23,3 +30,19 @@ def concatenate_for_notice(string, *tuples): ) ) return string + markup + +def trilean(presence): + match presence: + case Presence.WATCHING: + return True + case Presence.NOTWATCHING: + return False + case _: + return None + +def get_user_for_websocket(user): + keys = ('broadcaster', 'name', 'color', 'tripcode', 'tag') + return { + **{key: user[key] for key in keys}, + 'watching': trilean(user['presence']), + }