Tell websockets which users are watching
This adds a field 'watching' in `user_for_websocket` that's True iff WATCHING, False iff NOTWATCHING, and None otherwise (since clients don't need to know if a user is tentative or absent). When the value of this field changes for any user, they get added to the update buffer (like with any other change). Removed race condition in `t_sunset_users`: `broadcast_users_update` was being called *after* a user was removed from memory (and for each user being removed, which was redundant). In that scenario if there's a user in the update buffer and `t_sunset_users` wins the race between it and `t_broadcast_users_update`, then when `t_sunset_users` calls `broadcast_users_update` a KeyError would be raised since the user's already been removed. Fixed unintended behaviour of `t_sunset_users`: it was removing users based on the result of `is_visible`, so users who were actually tenative (as opposed to absent) were being removed.
このコミットが含まれているのは:
コミット
886c360e26
|
@ -1,25 +1,15 @@
|
|||
import hashlib
|
||||
import base64
|
||||
from collections import OrderedDict
|
||||
from enum import Enum
|
||||
from math import inf
|
||||
|
||||
from quart import current_app
|
||||
|
||||
from anonstream.utils.colour import generate_colour, colour_to_color
|
||||
from anonstream.utils.user import Presence
|
||||
|
||||
CONFIG = current_app.config
|
||||
|
||||
Presence = Enum(
|
||||
'Presence',
|
||||
names=(
|
||||
'WATCHING',
|
||||
'NOTWATCHING',
|
||||
'TENTATIVE',
|
||||
'ABSENT',
|
||||
)
|
||||
)
|
||||
|
||||
def generate_token_hash_and_tag(token):
|
||||
parts = CONFIG['SECRET_KEY'] + b'token-hash\0' + token.encode()
|
||||
digest = hashlib.sha256(parts).digest()
|
||||
|
@ -29,7 +19,7 @@ def generate_token_hash_and_tag(token):
|
|||
|
||||
return token_hash, tag
|
||||
|
||||
def generate_user(timestamp, token, broadcaster):
|
||||
def generate_user(timestamp, token, broadcaster, presence):
|
||||
colour = generate_colour(
|
||||
seed='name\0' + token,
|
||||
bg=CONFIG['CHAT_BACKGROUND_COLOUR'],
|
||||
|
@ -51,6 +41,7 @@ def generate_user(timestamp, token, broadcaster):
|
|||
'seen': timestamp,
|
||||
'watching': -inf,
|
||||
},
|
||||
'presence': presence,
|
||||
}
|
||||
|
||||
def get_default_name(user):
|
||||
|
@ -75,20 +66,3 @@ def get_presence(timestamp, user):
|
|||
return Presence.TENTATIVE
|
||||
|
||||
return Presence.ABSENT
|
||||
|
||||
def is_watching(timestamp, user):
|
||||
return get_presence(timestamp, user) == Presence.WATCHING
|
||||
|
||||
def is_listed(timestamp, user):
|
||||
return (
|
||||
get_presence(timestamp, user)
|
||||
in {Presence.WATCHING, Presence.NOTWATCHING}
|
||||
)
|
||||
|
||||
def is_visible(timestamp, messages, user):
|
||||
def user_left_messages():
|
||||
return any(
|
||||
message['token'] == user['token']
|
||||
for message in messages
|
||||
)
|
||||
return is_listed(timestamp, user) or user_left_messages()
|
||||
|
|
|
@ -3,10 +3,10 @@ from quart import current_app, request, render_template, redirect, url_for, esca
|
|||
from anonstream.captcha import get_random_captcha_digest_for
|
||||
from anonstream.chat import add_chat_message, Rejected
|
||||
from anonstream.stream import get_stream_title, get_stream_uptime, get_stream_viewership
|
||||
from anonstream.user import add_state, pop_state, try_change_appearance, get_users_by_presence, verify, deverify, BadCaptcha
|
||||
from anonstream.user import add_state, pop_state, try_change_appearance, get_users_by_presence, Presence, verify, deverify, BadCaptcha
|
||||
from anonstream.routes.wrappers import with_user_from, render_template_with_etag
|
||||
from anonstream.helpers.chat import get_scrollback
|
||||
from anonstream.helpers.user import get_default_name, Presence
|
||||
from anonstream.helpers.user import get_default_name
|
||||
from anonstream.utils.chat import generate_nonce
|
||||
from anonstream.utils.user import concatenate_for_notice
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ from werkzeug.security import check_password_hash
|
|||
from anonstream.broadcast import broadcast
|
||||
from anonstream.user import see
|
||||
from anonstream.helpers.user import generate_user
|
||||
from anonstream.utils.user import generate_token
|
||||
from anonstream.utils.user import generate_token, Presence
|
||||
|
||||
CONFIG = current_app.config
|
||||
MESSAGES = current_app.messages
|
||||
|
@ -68,6 +68,7 @@ def with_user_from(context):
|
|||
timestamp=timestamp,
|
||||
token=token,
|
||||
broadcaster=broadcaster,
|
||||
presence=Presence.NOTWATCHING,
|
||||
)
|
||||
USERS_BY_TOKEN[token] = user
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import itertools
|
||||
import operator
|
||||
import time
|
||||
|
||||
import aiofiles
|
||||
|
@ -5,7 +7,7 @@ from quart import current_app
|
|||
|
||||
from anonstream.segments import get_playlist, Offline
|
||||
from anonstream.wrappers import ttl_cache_async, with_timestamp
|
||||
from anonstream.helpers.user import is_watching
|
||||
from anonstream.user import get_watching_users
|
||||
|
||||
CONFIG = current_app.config
|
||||
USERS = current_app.users
|
||||
|
@ -36,7 +38,11 @@ def get_stream_uptime(rounded=True):
|
|||
|
||||
@with_timestamp
|
||||
def get_stream_viewership(timestamp):
|
||||
return sum(map(lambda user: is_watching(timestamp, user), USERS))
|
||||
users = get_watching_users(timestamp)
|
||||
return max(
|
||||
map(operator.itemgetter(0), zip(itertools.count(1), users)),
|
||||
default=0,
|
||||
)
|
||||
|
||||
def get_stream_viewership_or_none(uptime):
|
||||
viewership = get_stream_viewership()
|
||||
|
|
|
@ -6,8 +6,8 @@ from quart import current_app
|
|||
|
||||
from anonstream.broadcast import broadcast, broadcast_users_update
|
||||
from anonstream.stream import is_online, get_stream_title, get_stream_uptime, get_stream_viewership_or_none
|
||||
from anonstream.user import get_sunsettable_users
|
||||
from anonstream.wrappers import with_timestamp
|
||||
from anonstream.helpers.user import is_visible
|
||||
|
||||
CONFIG = current_app.config
|
||||
MESSAGES = current_app.messages
|
||||
|
@ -46,21 +46,16 @@ async def t_sunset_users(timestamp, iteration):
|
|||
if iteration == 0:
|
||||
return
|
||||
|
||||
tokens = []
|
||||
for token in USERS_BY_TOKEN:
|
||||
user = USERS_BY_TOKEN[token]
|
||||
if not is_visible(timestamp, MESSAGES, user):
|
||||
tokens.append(token)
|
||||
# Broadcast a users update, in case any users being
|
||||
# removed have been mutated or are new.
|
||||
broadcast_users_update()
|
||||
|
||||
token_hashes = []
|
||||
while tokens:
|
||||
token = tokens.pop()
|
||||
token_hash = USERS_BY_TOKEN.pop(token)['token_hash']
|
||||
token_hashes.append(token_hash)
|
||||
|
||||
# Broadcast a users update, in case any users being
|
||||
# removed have been mutated or are new.
|
||||
broadcast_users_update()
|
||||
users = list(get_sunsettable_users(timestamp))
|
||||
while users:
|
||||
user = users.pop()
|
||||
USERS_BY_TOKEN.pop(user['token'])
|
||||
token_hashes.append(user['token_hash'])
|
||||
|
||||
if token_hashes:
|
||||
broadcast(
|
||||
|
|
|
@ -4,11 +4,11 @@ from math import inf
|
|||
from quart import current_app
|
||||
|
||||
from anonstream.wrappers import try_except_log, with_timestamp
|
||||
from anonstream.helpers.user import is_visible, get_presence, Presence
|
||||
from anonstream.helpers.user import get_presence, Presence
|
||||
from anonstream.helpers.captcha import check_captcha_digest, Answer
|
||||
from anonstream.helpers.tripcode import generate_tripcode
|
||||
from anonstream.utils.colour import color_to_colour, get_contrast, NotAColor
|
||||
from anonstream.utils.user import get_user_for_websocket
|
||||
from anonstream.utils.user import get_user_for_websocket, trilean
|
||||
|
||||
CONFIG = current_app.config
|
||||
MESSAGES = current_app.messages
|
||||
|
@ -117,13 +117,9 @@ def watched(timestamp, user):
|
|||
|
||||
@with_timestamp
|
||||
def get_all_users_for_websocket(timestamp):
|
||||
visible_users = filter(
|
||||
lambda user: is_visible(timestamp, MESSAGES, user),
|
||||
USERS,
|
||||
)
|
||||
return {
|
||||
user['token_hash']: get_user_for_websocket(user)
|
||||
for user in visible_users
|
||||
for user in get_unsunsettable_users(timestamp)
|
||||
}
|
||||
|
||||
def verify(user, digest, answer):
|
||||
|
@ -159,6 +155,46 @@ def deverify(timestamp, user):
|
|||
if n_user_messages >= CONFIG['FLOOD_THRESHOLD']:
|
||||
user['verified'] = False
|
||||
|
||||
def get_users_and_update_presence(timestamp):
|
||||
for user in USERS:
|
||||
old, user['presence'] = user['presence'], get_presence(timestamp, user)
|
||||
if trilean(user['presence']) != trilean(old):
|
||||
USERS_UPDATE_BUFFER.add(user['token'])
|
||||
yield user
|
||||
|
||||
def get_watching_users(timestamp):
|
||||
return filter(
|
||||
lambda user: user['presence'] == Presence.WATCHING,
|
||||
get_users_and_update_presence(timestamp),
|
||||
)
|
||||
|
||||
def get_absent_users(timestamp):
|
||||
return filter(
|
||||
lambda user: user['presence'] == Presence.ABSENT,
|
||||
get_users_and_update_presence(timestamp),
|
||||
)
|
||||
|
||||
def is_sunsettable(user):
|
||||
return user['presence'] == Presence.ABSENT and not has_left_messages(user)
|
||||
|
||||
def has_left_messages(user):
|
||||
return any(
|
||||
message['token'] == user['token']
|
||||
for message in MESSAGES
|
||||
)
|
||||
|
||||
def get_sunsettable_users(timestamp):
|
||||
return filter(
|
||||
is_sunsettable,
|
||||
get_users_and_update_presence(timestamp),
|
||||
)
|
||||
|
||||
def get_unsunsettable_users(timestamp):
|
||||
return filter(
|
||||
lambda user: not is_sunsettable(user),
|
||||
get_users_and_update_presence(timestamp),
|
||||
)
|
||||
|
||||
@with_timestamp
|
||||
def get_users_by_presence(timestamp):
|
||||
users_by_presence = {
|
||||
|
@ -167,7 +203,6 @@ def get_users_by_presence(timestamp):
|
|||
Presence.TENTATIVE: [],
|
||||
Presence.ABSENT: [],
|
||||
}
|
||||
for user in USERS:
|
||||
presence = get_presence(timestamp, user)
|
||||
users_by_presence[presence].append(user)
|
||||
for user in get_users_and_update_presence(timestamp):
|
||||
users_by_presence[user['presence']].append(user)
|
||||
return users_by_presence
|
||||
|
|
|
@ -2,17 +2,24 @@ import base64
|
|||
import hashlib
|
||||
import secrets
|
||||
from collections import OrderedDict
|
||||
from enum import Enum
|
||||
from math import inf
|
||||
|
||||
from quart import escape, Markup
|
||||
|
||||
Presence = Enum(
|
||||
'Presence',
|
||||
names=(
|
||||
'WATCHING',
|
||||
'NOTWATCHING',
|
||||
'TENTATIVE',
|
||||
'ABSENT',
|
||||
)
|
||||
)
|
||||
|
||||
def generate_token():
|
||||
return secrets.token_hex(16)
|
||||
|
||||
def get_user_for_websocket(user):
|
||||
keys = ['broadcaster', 'name', 'color', 'tripcode', 'tag']
|
||||
return {key: user[key] for key in keys}
|
||||
|
||||
def concatenate_for_notice(string, *tuples):
|
||||
if not tuples:
|
||||
return string
|
||||
|
@ -23,3 +30,19 @@ def concatenate_for_notice(string, *tuples):
|
|||
)
|
||||
)
|
||||
return string + markup
|
||||
|
||||
def trilean(presence):
|
||||
match presence:
|
||||
case Presence.WATCHING:
|
||||
return True
|
||||
case Presence.NOTWATCHING:
|
||||
return False
|
||||
case _:
|
||||
return None
|
||||
|
||||
def get_user_for_websocket(user):
|
||||
keys = ('broadcaster', 'name', 'color', 'tripcode', 'tag')
|
||||
return {
|
||||
**{key: user[key] for key in keys},
|
||||
'watching': trilean(user['presence']),
|
||||
}
|
||||
|
|
読み込み中…
新しいイシューから参照