Concatenate segments

Add small js failsafe, other minor changes
このコミットが含まれているのは:
n9k 2022-02-13 09:25:02 +00:00
コミット 5d0cac0b0f
7個のファイルの変更104行の追加10行の削除

1
.gitignore vendored
ファイルの表示

@ -1 +1,2 @@
__pycache__/
stream/

ファイルの表示

@ -6,6 +6,7 @@ from quart import Quart
from werkzeug.security import generate_password_hash
from anonstream.utils.token import generate_token
from anonstream.segments import DirectoryCache
async def create_app():
with open('config.toml') as fp:
@ -17,12 +18,13 @@ async def create_app():
print('Broadcaster password:', auth_password)
app = Quart('anonstream')
app.config['SECRET_KEY'] = config['secret'].encode()
app.config['SECRET_KEY'] = config['secret_key'].encode()
app.config['AUTH_USERNAME'] = config['auth_username']
app.config['AUTH_PWHASH'] = auth_pwhash
app.config['AUTH_TOKEN'] = generate_token()
app.chat = OrderedDict()
app.websockets = {}
app.segments_directory_cache = DirectoryCache(config['segments_dir'])
async with app.app_context():
import anonstream.routes

ファイルの表示

@ -1,7 +1,8 @@
import asyncio
from quart import current_app, request, render_template, redirect, websocket
from quart import current_app, request, render_template, make_response, redirect, websocket
from anonstream.segments import CatSegments
from anonstream.wrappers import with_token_from, auth_required
from anonstream.websocket import websocket_outbound, websocket_inbound
@ -10,6 +11,18 @@ from anonstream.websocket import websocket_outbound, websocket_inbound
async def home(token):
return await render_template('home.html', token=token)
@current_app.route('/stream.mp4')
@with_token_from(request)
async def stream(token):
try:
cat_segments = CatSegments(current_app.segments_directory_cache, token)
except ValueError:
return 'offline', 404
response = await make_response(cat_segments.stream())
response.headers['Content-Type'] = 'video/mp4'
response.timeout = None
return response
@current_app.route('/login')
@auth_required
async def login():
@ -23,10 +36,10 @@ async def live(token):
producer = websocket_outbound(queue)
consumer = websocket_inbound(
secret=current_app.config['SECRET_KEY'],
connected_websockets=current_app.websockets,
chat=current_app.chat,
token=token,
secret=current_app.config['SECRET_KEY'],
chat=current_app.chat,
)
try:
await asyncio.gather(producer, consumer)

74
anonstream/segments.py ノーマルファイル
ファイルの表示

@ -0,0 +1,74 @@
import asyncio
import os
import re
import time
from collections import OrderedDict
import aiofiles
RE_SEGMENT = re.compile(r'^(?P<index>\d+)\.ts$')
class DirectoryCache:
def __init__(self, directory, ttl=0.5):
self.directory = directory
self.ttl = ttl
self.expires = None
self.files = None
def timer(self):
return time.monotonic()
def listdir(self):
if self.expires is None or self.timer() >= self.expires:
print(f'[debug @ {time.time():.4f}] listdir()')
self.files = os.listdir(self.directory)
self.expires = self.timer() + self.ttl
return self.files
def segments(self):
segments = []
for fn in self.listdir():
match = RE_SEGMENT.match(fn)
if match:
segments.append((int(match.group('index')), fn))
segments.sort()
return OrderedDict(segments)
def path(self, fn):
return os.path.join(self.directory, fn)
class CatSegments:
def __init__(self, directory_cache, token):
self.directory_cache = directory_cache
self.token = token
self.index = max(self.directory_cache.segments())
async def stream(self):
while True:
print(
f'[debug @ {time.time():.4f}: {self.token}] '
f'index={self.index} '
f'segments={tuple(self.directory_cache.segments())}'
)
# search for current segment
for i in range(21):
segment = self.directory_cache.segments().get(self.index)
if segment is not None:
break
if i != 20:
await asyncio.sleep(0.2)
else:
print(
f'[debug @ {time.time():.4f}: {self.token}] could not '
f'find segment #{self.index} after at least 4 seconds'
)
return
# read current segment
fn = self.directory_cache.path(segment)
async with aiofiles.open(fn, 'rb') as fp:
while chunk := await fp.read(8192):
yield chunk
# increment segment index
self.index += 1

ファイルの表示

@ -112,18 +112,21 @@ const connect_websocket = () => {
websocket_backoff = 2000; // 2 seconds
});
ws.addEventListener("close", (event) => {
console.log("websocket closed", event);
chat_form_submit.disabled = true;
chat_live_ball.style.borderColor = "maroon";
chat_live_status.innerText = "Disconnected from chat";
setTimeout(connect_websocket, websocket_backoff);
websocket_backoff = Math.min(32000, websocket_backoff * 2);
console.log("websocket closed", event);
if (!ws.successor) {
ws.successor = true;
setTimeout(connect_websocket, websocket_backoff);
websocket_backoff = Math.min(32000, websocket_backoff * 2);
}
});
ws.addEventListener("error", (event) => {
console.log("websocket error", event);
chat_form_submit.disabled = true;
chat_live_ball.style.borderColor = "maroon";
chat_live_status.innerText = "Error connecting to chat";
console.log("websocket error", event);
});
ws.addEventListener("message", on_websocket_message);
}

ファイルの表示

@ -19,7 +19,7 @@ async def websocket_outbound(queue):
payload = await queue.get()
await websocket.send_json(payload)
async def websocket_inbound(secret, connected_websockets, chat, token):
async def websocket_inbound(connected_websockets, token, secret, chat):
while True:
receipt = await websocket.receive_json()
receipt, error = parse(chat.keys(), secret, receipt)

ファイルの表示

@ -1,2 +1,3 @@
secret = "test"
secret_key = "test"
auth_username = "broadcaster"
segments_dir = "stream/"