From cc6ed637646f8ffaeb383048500f392d4521efa4 Mon Sep 17 00:00:00 2001 From: n9k Date: Tue, 22 Feb 2022 03:57:48 +0000 Subject: [PATCH] Segment streaming redux, accurate stream uptime --- anonstream/__init__.py | 11 +- anonstream/routes/core.py | 23 ++-- anonstream/routes/wrappers.py | 6 +- anonstream/segments.py | 186 ++++++++++++++++++++++----------- anonstream/stream.py | 28 ++++- anonstream/templates/home.html | 6 +- anonstream/user.py | 10 +- anonstream/wrappers.py | 22 ++++ config.toml | 26 +++-- 9 files changed, 225 insertions(+), 93 deletions(-) diff --git a/anonstream/__init__.py b/anonstream/__init__.py index a66403e..313910c 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -1,3 +1,4 @@ +import os import secrets import toml from collections import OrderedDict @@ -5,7 +6,6 @@ from collections import OrderedDict from quart import Quart from werkzeug.security import generate_password_hash -from anonstream.segments import DirectoryCache from anonstream.utils.captcha import create_captcha_factory, create_captcha_signer from anonstream.utils.colour import color_to_colour from anonstream.utils.user import generate_token @@ -25,6 +25,13 @@ def create_app(config_file): 'AUTH_USERNAME': config['auth']['username'], 'AUTH_PWHASH': auth_pwhash, 'AUTH_TOKEN': generate_token(), + 'SEGMENT_DIRECTORY': os.path.realpath(config['segments']['directory']), + 'SEGMENT_PLAYLIST': os.path.join(os.path.realpath(config['segments']['directory']), config['segments']['playlist']), + 'SEGMENT_PLAYLIST_CACHE_LIFETIME': config['segments']['playlist_cache_lifetime'], + 'SEGMENT_PLAYLIST_STALE_THRESHOLD': config['segments']['playlist_stale_threshold'], + 'SEGMENT_SEARCH_COOLDOWN': config['segments']['search_cooldown'], + 'SEGMENT_SEARCH_TIMEOUT': config['segments']['search_timeout'], + 'SEGMENT_STREAM_INITIAL_BUFFER': config['segments']['stream_initial_buffer'], 'DEFAULT_HOST_NAME': config['names']['broadcaster'], 'DEFAULT_ANON_NAME': config['names']['anonymous'], 'MAX_STATES': config['memory']['states'], @@ -69,8 +76,6 @@ def create_app(config_file): app.users_by_token = {} app.users = app.users_by_token.values() - app.segments_directory_cache = DirectoryCache(config['stream']['segments_dir']) - app.captchas = OrderedDict() app.captcha_factory = create_captcha_factory(app.config['CAPTCHA_FONTS']) app.captcha_signer = create_captcha_signer(app.config['SECRET_KEY']) diff --git a/anonstream/routes/core.py b/anonstream/routes/core.py index ab9bdb5..08bf202 100644 --- a/anonstream/routes/core.py +++ b/anonstream/routes/core.py @@ -1,7 +1,9 @@ -from quart import current_app, request, render_template, redirect, url_for, abort +from quart import current_app, request, render_template, abort, make_response, redirect, url_for, abort from anonstream.captcha import get_captcha_image -from anonstream.segments import CatSegments, Offline +from anonstream.segments import segments +from anonstream.stream import is_online, get_stream_uptime +from anonstream.user import watched from anonstream.routes.wrappers import with_user_from, auth_required @current_app.route('/') @@ -12,14 +14,15 @@ async def home(user): @current_app.route('/stream.mp4') @with_user_from(request) async def stream(user): - try: - cat_segments = CatSegments( - directory_cache=current_app.segments_directory_cache, - token=user['token'] - ) - except Offline: - return 'offline', 404 - response = await make_response(cat_segments.stream()) + if not is_online(): + return abort(404) + + def segment_read_hook(uri): + print(f'{uri}: {user["token"]}') + watched(user) + + generator = segments(segment_read_hook, token=user['token']) + response = await make_response(generator) response.headers['Content-Type'] = 'video/mp4' response.timeout = None return response diff --git a/anonstream/routes/wrappers.py b/anonstream/routes/wrappers.py index 340cd7e..f387f0e 100644 --- a/anonstream/routes/wrappers.py +++ b/anonstream/routes/wrappers.py @@ -53,7 +53,11 @@ def with_user_from(context): if broadcaster: token = CONFIG['AUTH_TOKEN'] else: - token = context.args.get('token') or context.cookies.get('token') or generate_token() + token = ( + context.args.get('token') + or context.cookies.get('token') + or generate_token() + ) # Update / create user user = USERS_BY_TOKEN.get(token) diff --git a/anonstream/segments.py b/anonstream/segments.py index 09cc85b..54a7101 100644 --- a/anonstream/segments.py +++ b/anonstream/segments.py @@ -1,80 +1,142 @@ import asyncio import os -import re import time -from collections import OrderedDict import aiofiles +import m3u8 +from quart import current_app -RE_SEGMENT = re.compile(r'^(?P\d+)\.ts$') +from anonstream.wrappers import ttl_cache, with_timestamp + +CONFIG = current_app.config class Offline(Exception): pass -class DirectoryCache: - def __init__(self, directory, ttl=1.0): - self.directory = directory - self.ttl = ttl - self.expires = None - self.files = None +class Stale(Exception): + pass - def timer(self): - return time.monotonic() +class UnsafePath(Exception): + pass - def listdir(self): - if self.expires is None or self.timer() >= self.expires: - print(f'[debug @ {time.time():.4f}] listdir()') - self.files = os.listdir(self.directory) - self.expires = self.timer() + self.ttl - return self.files +def get_mtime(): + try: + mtime = os.path.getmtime(CONFIG['SEGMENT_PLAYLIST']) + except FileNotFoundError as e: + raise Stale from e + else: + if time.time() - mtime >= CONFIG['SEGMENT_PLAYLIST_STALE_THRESHOLD']: + raise Stale + return mtime - def segments(self): - segments = [] - for fn in self.listdir(): - match = RE_SEGMENT.match(fn) - if match: - segments.append((int(match.group('index')), fn)) - segments.sort() - return OrderedDict(segments) - - def path(self, fn): - return os.path.join(self.directory, fn) - -class CatSegments: - def __init__(self, directory_cache, token): - self.directory_cache = directory_cache - self.token = token - try: - self.index = max(self.directory_cache.segments()) - except ValueError: # max of empty sequence, i.e. there are no segments +@ttl_cache(CONFIG['SEGMENT_PLAYLIST_CACHE_LIFETIME']) +def get_playlist(): + #print(f'[debug @ {time.time():.3f}] get_playlist()') + try: + mtime = get_mtime() + except Stale as e: + raise Offline from e + else: + playlist = m3u8._load_from_file(CONFIG['SEGMENT_PLAYLIST']) + if playlist.is_endlist: + raise Offline + if len(playlist.segments) == 0: raise Offline - async def stream(self): - while True: - print( - f'[debug @ {time.time():.4f}: {self.token}] ' - f'index={self.index} ' - f'segments={tuple(self.directory_cache.segments())}' - ) - # search for current segment - for i in range(21): - segment = self.directory_cache.segments().get(self.index) - if segment is not None: - break - if i != 20: - await asyncio.sleep(0.2) - else: - print( - f'[debug @ {time.time():.4f}: {self.token}] could not ' - f'find segment #{self.index} after at least 4 seconds' - ) - return + return playlist, mtime - # read current segment - fn = self.directory_cache.path(segment) - async with aiofiles.open(fn, 'rb') as fp: +def get_starting_segment(): + ''' + Instead of choosing the most recent segment, try choosing a segment that + preceeds the most recent one by a little bit. Doing this increases the + buffer of initially available video, which makes playback more stable. + ''' + print(f'[debug @ {time.time():.3f}] get_starting_segment()') + playlist, _ = get_playlist() + index = max(0, len(playlist.segments) - CONFIG['SEGMENT_STREAM_INITIAL_BUFFER']) + return playlist.segments[index] + +def get_next_segment(uri): + ''' + Look for the segment with uri `uri` and return the segment that + follows it, or None if no such segment exists. + ''' + #print(f'[debug @ {time.time():.3f}] get_next_segment({uri!r})') + playlist, _ = get_playlist() + found = False + for segment in playlist.segments: + if found: + break + elif segment.uri == uri: + found = True + else: + segment = None + return segment + +async def get_segment_uris(): + try: + segment = get_starting_segment() + except Offline: + return + else: + yield segment.init_section.uri + + while True: + yield segment.uri + + t0 = time.monotonic() + while True: + try: + next_segment = get_next_segment(segment.uri) + except Offline: + return + else: + if next_segment is not None: + segment = next_segment + break + elif time.monotonic() - t0 >= CONFIG['SEGMENT_SEARCH_TIMEOUT']: + return + else: + await asyncio.sleep(CONFIG['SEGMENT_SEARCH_COOLDOWN']) + +def path_for(uri): + path = os.path.normpath( + os.path.join(CONFIG['SEGMENT_DIRECTORY'], uri) + ) + if os.path.dirname(path) != CONFIG['SEGMENT_DIRECTORY']: + raise UnsafePath(path) + return path + +async def segments(segment_read_hook=lambda uri: None, token=None): + print(f'[debug @ {time.time():.3f}: {token=}] entering segment generator') + uri = None + async for uri in get_segment_uris(): + #print(f'[debug @ {time.time():.3f}: {token=}] {uri=}') + try: + path = path_for(uri) + except UnsafePath as e: + unsafe_path, *_ = e.args + print( + f'[debug @ {time.time():.3f}: {token=}] ' + f'segment {uri=} has unsafe {path=}' + ) + break + + segment_read_hook(uri) + try: + async with aiofiles.open(path, 'rb') as fp: while chunk := await fp.read(8192): yield chunk - - # increment segment index - self.index += 1 + except FileNotFoundError: + print( + f'[debug @ {time.time():.3f}: {token=}] ' + f'segment {uri=} at {path=} unexpectedly does not exist' + ) + break + else: + print( + f'[debug @ {time.time():.3f}: {token=}] ' + f'could not find segment following {uri=} after at least ' + f'{CONFIG["SEGMENT_SEARCH_TIMEOUT"]} seconds' + ) + print(f'[debug @ {time.time():.3f}: {token=}] exiting segment generator') diff --git a/anonstream/stream.py b/anonstream/stream.py index d650eb1..d14942e 100644 --- a/anonstream/stream.py +++ b/anonstream/stream.py @@ -1,5 +1,29 @@ +import time + +from anonstream.segments import get_playlist, Offline + def get_stream_title(): return 'Stream title' -def get_stream_uptime(): - return None +def get_stream_uptime(rounded=True): + try: + playlist, mtime = get_playlist() + except Offline: + return None + else: + last_modified_ago = time.time() - mtime + + n_segments = playlist.media_sequence + len(playlist.segments) + duration = playlist.target_duration * n_segments + + uptime = duration + last_modified_ago + uptime = round(uptime, 2) if rounded else uptime + return uptime + +def is_online(): + try: + get_playlist() + except Offline: + return False + else: + return True diff --git a/anonstream/templates/home.html b/anonstream/templates/home.html index eb5b3c6..55d7ac1 100644 --- a/anonstream/templates/home.html +++ b/anonstream/templates/home.html @@ -2,10 +2,10 @@ - + - +
@@ -24,6 +24,6 @@ both - + diff --git a/anonstream/user.py b/anonstream/user.py index fa07cfb..ac9da56 100644 --- a/anonstream/user.py +++ b/anonstream/user.py @@ -101,8 +101,14 @@ def change_tripcode(user, password, dry_run=False): def delete_tripcode(user): user['tripcode'] = None -def see(user): - user['last']['seen'] = int(time.time()) +@with_timestamp +def see(timestamp, user): + user['last']['seen'] = timestamp + +@with_timestamp +def watched(timestamp, user): + user['last']['seen'] = timestamp + user['last']['watching'] = timestamp @with_timestamp def get_all_users_for_websocket(timestamp): diff --git a/anonstream/wrappers.py b/anonstream/wrappers.py index 6a6cf94..09d2769 100644 --- a/anonstream/wrappers.py +++ b/anonstream/wrappers.py @@ -31,3 +31,25 @@ def try_except_log(errors, exception_class): return wrapper return try_except_log_specific + +def ttl_cache(ttl): + ''' + Expiring cache with exactly one slot. Only wraps + functions that take no arguments. + ''' + def ttl_cache_specific(f): + value, expires = None, None + + @wraps(f) + def wrapper(): + nonlocal value, expires + + if expires is None or time.monotonic() >= expires: + value = f() + expires = time.monotonic() + ttl + + return value + + return wrapper + + return ttl_cache_specific diff --git a/config.toml b/config.toml index b5cc138..3d44def 100644 --- a/config.toml +++ b/config.toml @@ -3,8 +3,14 @@ secret_key = "test" [auth] username = "broadcaster" -[stream] -segments_dir = "stream/" +[segments] +directory = "stream/" +playlist = "stream.m3u8" +playlist_stale_threshold = 8.0 +playlist_cache_lifetime = 0.2 +search_cooldown = 0.25 +search_timeout = 5.0 +stream_initial_buffer = 3 [captcha] lifetime = 1800 @@ -21,9 +27,9 @@ chat_messages = 8192 chat_scrollback = 256 [tasks] -rotate_users = 60 -rotate_captchas = 60 -broadcast_users_update = 4 +rotate_users = 60.0 +rotate_captchas = 60.0 +broadcast_users_update = 4.0 [names] broadcaster = "Broadcaster" @@ -36,11 +42,11 @@ min_name_contrast = 3.0 background_color = "#232327" [flood] -duration = 20 +duration = 20.0 threshold = 4 [thresholds] -user_notwatching = 8 -user_tentative = 20 -user_absent = 360 -nojs_chat_timeout = 30 +user_notwatching = 8.0 +user_tentative = 20.0 +user_absent = 360.0 +nojs_chat_timeout = 30.0