From 5d0cac0b0f0e6930410967a05359ffe50defad58 Mon Sep 17 00:00:00 2001 From: n9k Date: Sun, 13 Feb 2022 09:25:02 +0000 Subject: [PATCH] Concatenate segments Add small js failsafe, other minor changes --- .gitignore | 1 + anonstream/__init__.py | 4 +- anonstream/routes.py | 19 +++++++-- anonstream/segments.py | 74 +++++++++++++++++++++++++++++++++ anonstream/static/anonstream.js | 11 +++-- anonstream/websocket.py | 2 +- config.toml | 3 +- 7 files changed, 104 insertions(+), 10 deletions(-) create mode 100644 anonstream/segments.py diff --git a/.gitignore b/.gitignore index c18dd8d..408c1f9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ __pycache__/ +stream/ diff --git a/anonstream/__init__.py b/anonstream/__init__.py index 8268c2d..77e4026 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -6,6 +6,7 @@ from quart import Quart from werkzeug.security import generate_password_hash from anonstream.utils.token import generate_token +from anonstream.segments import DirectoryCache async def create_app(): with open('config.toml') as fp: @@ -17,12 +18,13 @@ async def create_app(): print('Broadcaster password:', auth_password) app = Quart('anonstream') - app.config['SECRET_KEY'] = config['secret'].encode() + app.config['SECRET_KEY'] = config['secret_key'].encode() app.config['AUTH_USERNAME'] = config['auth_username'] app.config['AUTH_PWHASH'] = auth_pwhash app.config['AUTH_TOKEN'] = generate_token() app.chat = OrderedDict() app.websockets = {} + app.segments_directory_cache = DirectoryCache(config['segments_dir']) async with app.app_context(): import anonstream.routes diff --git a/anonstream/routes.py b/anonstream/routes.py index ff21818..630ea4d 100644 --- a/anonstream/routes.py +++ b/anonstream/routes.py @@ -1,7 +1,8 @@ import asyncio -from quart import current_app, request, render_template, redirect, websocket +from quart import current_app, request, render_template, make_response, redirect, websocket +from anonstream.segments import CatSegments from anonstream.wrappers import with_token_from, auth_required from anonstream.websocket import websocket_outbound, websocket_inbound @@ -10,6 +11,18 @@ from anonstream.websocket import websocket_outbound, websocket_inbound async def home(token): return await render_template('home.html', token=token) +@current_app.route('/stream.mp4') +@with_token_from(request) +async def stream(token): + try: + cat_segments = CatSegments(current_app.segments_directory_cache, token) + except ValueError: + return 'offline', 404 + response = await make_response(cat_segments.stream()) + response.headers['Content-Type'] = 'video/mp4' + response.timeout = None + return response + @current_app.route('/login') @auth_required async def login(): @@ -23,10 +36,10 @@ async def live(token): producer = websocket_outbound(queue) consumer = websocket_inbound( - secret=current_app.config['SECRET_KEY'], connected_websockets=current_app.websockets, - chat=current_app.chat, token=token, + secret=current_app.config['SECRET_KEY'], + chat=current_app.chat, ) try: await asyncio.gather(producer, consumer) diff --git a/anonstream/segments.py b/anonstream/segments.py new file mode 100644 index 0000000..7f178b1 --- /dev/null +++ b/anonstream/segments.py @@ -0,0 +1,74 @@ +import asyncio +import os +import re +import time +from collections import OrderedDict + +import aiofiles + +RE_SEGMENT = re.compile(r'^(?P\d+)\.ts$') + +class DirectoryCache: + def __init__(self, directory, ttl=0.5): + self.directory = directory + self.ttl = ttl + self.expires = None + self.files = None + + def timer(self): + return time.monotonic() + + def listdir(self): + if self.expires is None or self.timer() >= self.expires: + print(f'[debug @ {time.time():.4f}] listdir()') + self.files = os.listdir(self.directory) + self.expires = self.timer() + self.ttl + return self.files + + def segments(self): + segments = [] + for fn in self.listdir(): + match = RE_SEGMENT.match(fn) + if match: + segments.append((int(match.group('index')), fn)) + segments.sort() + return OrderedDict(segments) + + def path(self, fn): + return os.path.join(self.directory, fn) + +class CatSegments: + def __init__(self, directory_cache, token): + self.directory_cache = directory_cache + self.token = token + self.index = max(self.directory_cache.segments()) + + async def stream(self): + while True: + print( + f'[debug @ {time.time():.4f}: {self.token}] ' + f'index={self.index} ' + f'segments={tuple(self.directory_cache.segments())}' + ) + # search for current segment + for i in range(21): + segment = self.directory_cache.segments().get(self.index) + if segment is not None: + break + if i != 20: + await asyncio.sleep(0.2) + else: + print( + f'[debug @ {time.time():.4f}: {self.token}] could not ' + f'find segment #{self.index} after at least 4 seconds' + ) + return + + # read current segment + fn = self.directory_cache.path(segment) + async with aiofiles.open(fn, 'rb') as fp: + while chunk := await fp.read(8192): + yield chunk + + # increment segment index + self.index += 1 diff --git a/anonstream/static/anonstream.js b/anonstream/static/anonstream.js index f79b2aa..46cf110 100644 --- a/anonstream/static/anonstream.js +++ b/anonstream/static/anonstream.js @@ -112,18 +112,21 @@ const connect_websocket = () => { websocket_backoff = 2000; // 2 seconds }); ws.addEventListener("close", (event) => { + console.log("websocket closed", event); chat_form_submit.disabled = true; chat_live_ball.style.borderColor = "maroon"; chat_live_status.innerText = "Disconnected from chat"; - setTimeout(connect_websocket, websocket_backoff); - websocket_backoff = Math.min(32000, websocket_backoff * 2); - console.log("websocket closed", event); + if (!ws.successor) { + ws.successor = true; + setTimeout(connect_websocket, websocket_backoff); + websocket_backoff = Math.min(32000, websocket_backoff * 2); + } }); ws.addEventListener("error", (event) => { + console.log("websocket error", event); chat_form_submit.disabled = true; chat_live_ball.style.borderColor = "maroon"; chat_live_status.innerText = "Error connecting to chat"; - console.log("websocket error", event); }); ws.addEventListener("message", on_websocket_message); } diff --git a/anonstream/websocket.py b/anonstream/websocket.py index 411c3c4..7fe0453 100644 --- a/anonstream/websocket.py +++ b/anonstream/websocket.py @@ -19,7 +19,7 @@ async def websocket_outbound(queue): payload = await queue.get() await websocket.send_json(payload) -async def websocket_inbound(secret, connected_websockets, chat, token): +async def websocket_inbound(connected_websockets, token, secret, chat): while True: receipt = await websocket.receive_json() receipt, error = parse(chat.keys(), secret, receipt) diff --git a/config.toml b/config.toml index f511f85..b587406 100644 --- a/config.toml +++ b/config.toml @@ -1,2 +1,3 @@ -secret = "test" +secret_key = "test" auth_username = "broadcaster" +segments_dir = "stream/"