diff --git a/anonstream/helpers/user.py b/anonstream/helpers/user.py index 82442c5..5dbc64e 100644 --- a/anonstream/helpers/user.py +++ b/anonstream/helpers/user.py @@ -46,6 +46,10 @@ def generate_user(timestamp, token, broadcaster, presence): }, 'presence': presence, 'linespan': deque(), + 'eyes': { + 'total': 0, + 'current': {}, + }, } def get_default_name(user): diff --git a/anonstream/routes/core.py b/anonstream/routes/core.py index 36366be..42149cc 100644 --- a/anonstream/routes/core.py +++ b/anonstream/routes/core.py @@ -4,9 +4,9 @@ from quart import current_app, request, render_template, abort, make_response, redirect, url_for, abort from anonstream.captcha import get_captcha_image -from anonstream.segments import segments +from anonstream.segments import segments, StopSendingSegments from anonstream.stream import is_online, get_stream_uptime -from anonstream.user import watched +from anonstream.user import watched, create_eyes, renew_eyes, ExpiredEyes from anonstream.routes.wrappers import with_user_from, auth_required from anonstream.utils.security import generate_csp @@ -25,8 +25,13 @@ async def stream(user): if not is_online(): return abort(404) + eyes_id = create_eyes(user, dict(request.headers)) def segment_read_hook(uri): - print(f'{uri}: {user["token"]}') + try: + renew_eyes(user, eyes_id) + except ExpiredEyes as e: + raise StopSendingSegments(f'eyes {eyes_id} expired: {e}') from e + print(f'{uri}: {eyes_id}~{user["token"]}') watched(user) generator = segments(segment_read_hook, token=user['token']) diff --git a/anonstream/segments.py b/anonstream/segments.py index cf7c429..2267818 100644 --- a/anonstream/segments.py +++ b/anonstream/segments.py @@ -22,6 +22,9 @@ class Stale(Exception): class UnsafePath(Exception): pass +class StopSendingSegments(Exception): + pass + def get_mtime(): try: mtime = os.path.getmtime(CONFIG['SEGMENT_PLAYLIST']) @@ -148,7 +151,15 @@ async def segments(segment_read_hook=lambda uri: None, token=None): ) break - segment_read_hook(uri) + try: + segment_read_hook(uri) + except StopSendingSegments as e: + reason, *_ = e.args + print( + f'[debug @ {time.time():.3f}: {token=}] ' + f'told to stop sending segments: {reason}' + ) + break try: async with aiofiles.open(path, 'rb') as fp: while chunk := await fp.read(8192): diff --git a/anonstream/user.py b/anonstream/user.py index 84a7a2d..6a6cc6a 100644 --- a/anonstream/user.py +++ b/anonstream/user.py @@ -25,6 +25,9 @@ class BadAppearance(ValueError): class BadCaptcha(ValueError): pass +class ExpiredEyes(Exception): + pass + def add_state(user, **state): state_id = time.time_ns() // 1_000_000 user['state'][state_id] = state @@ -219,3 +222,28 @@ def get_users_by_presence(timestamp): for user in get_users_and_update_presence(timestamp): users_by_presence[user['presence']].append(user) return users_by_presence + +@with_timestamp +def create_eyes(timestamp, user, headers): + eyes_id = user['eyes']['total'] + user['eyes']['total'] += 1 + user['eyes']['current'][eyes_id] = { + 'id': eyes_id, + 'token': user['token'], + 'n_segments': 0, + 'headers': headers, + 'created': timestamp, + 'renewed': timestamp, + } + return eyes_id + +@with_timestamp +def renew_eyes(timestamp, user, eyes_id): + try: + eyes = user['eyes']['current'][eyes_id] + except KeyError: + raise ExpiredEyes(None) + if timestamp - eyes['renewed'] >= 20.0: # TODO remove magic number + user['eyes']['current'].pop(eyes_id) + raise ExpiredEyes(eyes) + eyes['renewed'] = timestamp