Segment streaming redux, accurate stream uptime

このコミットが含まれているのは:
n9k 2022-02-22 03:57:48 +00:00
コミット cc6ed63764
9個のファイルの変更225行の追加93行の削除

ファイルの表示

@ -1,3 +1,4 @@
import os
import secrets
import toml
from collections import OrderedDict
@ -5,7 +6,6 @@ from collections import OrderedDict
from quart import Quart
from werkzeug.security import generate_password_hash
from anonstream.segments import DirectoryCache
from anonstream.utils.captcha import create_captcha_factory, create_captcha_signer
from anonstream.utils.colour import color_to_colour
from anonstream.utils.user import generate_token
@ -25,6 +25,13 @@ def create_app(config_file):
'AUTH_USERNAME': config['auth']['username'],
'AUTH_PWHASH': auth_pwhash,
'AUTH_TOKEN': generate_token(),
'SEGMENT_DIRECTORY': os.path.realpath(config['segments']['directory']),
'SEGMENT_PLAYLIST': os.path.join(os.path.realpath(config['segments']['directory']), config['segments']['playlist']),
'SEGMENT_PLAYLIST_CACHE_LIFETIME': config['segments']['playlist_cache_lifetime'],
'SEGMENT_PLAYLIST_STALE_THRESHOLD': config['segments']['playlist_stale_threshold'],
'SEGMENT_SEARCH_COOLDOWN': config['segments']['search_cooldown'],
'SEGMENT_SEARCH_TIMEOUT': config['segments']['search_timeout'],
'SEGMENT_STREAM_INITIAL_BUFFER': config['segments']['stream_initial_buffer'],
'DEFAULT_HOST_NAME': config['names']['broadcaster'],
'DEFAULT_ANON_NAME': config['names']['anonymous'],
'MAX_STATES': config['memory']['states'],
@ -69,8 +76,6 @@ def create_app(config_file):
app.users_by_token = {}
app.users = app.users_by_token.values()
app.segments_directory_cache = DirectoryCache(config['stream']['segments_dir'])
app.captchas = OrderedDict()
app.captcha_factory = create_captcha_factory(app.config['CAPTCHA_FONTS'])
app.captcha_signer = create_captcha_signer(app.config['SECRET_KEY'])

ファイルの表示

@ -1,7 +1,9 @@
from quart import current_app, request, render_template, redirect, url_for, abort
from quart import current_app, request, render_template, abort, make_response, redirect, url_for, abort
from anonstream.captcha import get_captcha_image
from anonstream.segments import CatSegments, Offline
from anonstream.segments import segments
from anonstream.stream import is_online, get_stream_uptime
from anonstream.user import watched
from anonstream.routes.wrappers import with_user_from, auth_required
@current_app.route('/')
@ -12,14 +14,15 @@ async def home(user):
@current_app.route('/stream.mp4')
@with_user_from(request)
async def stream(user):
try:
cat_segments = CatSegments(
directory_cache=current_app.segments_directory_cache,
token=user['token']
)
except Offline:
return 'offline', 404
response = await make_response(cat_segments.stream())
if not is_online():
return abort(404)
def segment_read_hook(uri):
print(f'{uri}: {user["token"]}')
watched(user)
generator = segments(segment_read_hook, token=user['token'])
response = await make_response(generator)
response.headers['Content-Type'] = 'video/mp4'
response.timeout = None
return response

ファイルの表示

@ -53,7 +53,11 @@ def with_user_from(context):
if broadcaster:
token = CONFIG['AUTH_TOKEN']
else:
token = context.args.get('token') or context.cookies.get('token') or generate_token()
token = (
context.args.get('token')
or context.cookies.get('token')
or generate_token()
)
# Update / create user
user = USERS_BY_TOKEN.get(token)

ファイルの表示

@ -1,80 +1,142 @@
import asyncio
import os
import re
import time
from collections import OrderedDict
import aiofiles
import m3u8
from quart import current_app
RE_SEGMENT = re.compile(r'^(?P<index>\d+)\.ts$')
from anonstream.wrappers import ttl_cache, with_timestamp
CONFIG = current_app.config
class Offline(Exception):
pass
class DirectoryCache:
def __init__(self, directory, ttl=1.0):
self.directory = directory
self.ttl = ttl
self.expires = None
self.files = None
class Stale(Exception):
pass
def timer(self):
return time.monotonic()
class UnsafePath(Exception):
pass
def listdir(self):
if self.expires is None or self.timer() >= self.expires:
print(f'[debug @ {time.time():.4f}] listdir()')
self.files = os.listdir(self.directory)
self.expires = self.timer() + self.ttl
return self.files
def get_mtime():
try:
mtime = os.path.getmtime(CONFIG['SEGMENT_PLAYLIST'])
except FileNotFoundError as e:
raise Stale from e
else:
if time.time() - mtime >= CONFIG['SEGMENT_PLAYLIST_STALE_THRESHOLD']:
raise Stale
return mtime
def segments(self):
segments = []
for fn in self.listdir():
match = RE_SEGMENT.match(fn)
if match:
segments.append((int(match.group('index')), fn))
segments.sort()
return OrderedDict(segments)
def path(self, fn):
return os.path.join(self.directory, fn)
class CatSegments:
def __init__(self, directory_cache, token):
self.directory_cache = directory_cache
self.token = token
try:
self.index = max(self.directory_cache.segments())
except ValueError: # max of empty sequence, i.e. there are no segments
@ttl_cache(CONFIG['SEGMENT_PLAYLIST_CACHE_LIFETIME'])
def get_playlist():
#print(f'[debug @ {time.time():.3f}] get_playlist()')
try:
mtime = get_mtime()
except Stale as e:
raise Offline from e
else:
playlist = m3u8._load_from_file(CONFIG['SEGMENT_PLAYLIST'])
if playlist.is_endlist:
raise Offline
if len(playlist.segments) == 0:
raise Offline
async def stream(self):
while True:
print(
f'[debug @ {time.time():.4f}: {self.token}] '
f'index={self.index} '
f'segments={tuple(self.directory_cache.segments())}'
)
# search for current segment
for i in range(21):
segment = self.directory_cache.segments().get(self.index)
if segment is not None:
break
if i != 20:
await asyncio.sleep(0.2)
else:
print(
f'[debug @ {time.time():.4f}: {self.token}] could not '
f'find segment #{self.index} after at least 4 seconds'
)
return
return playlist, mtime
# read current segment
fn = self.directory_cache.path(segment)
async with aiofiles.open(fn, 'rb') as fp:
def get_starting_segment():
'''
Instead of choosing the most recent segment, try choosing a segment that
preceeds the most recent one by a little bit. Doing this increases the
buffer of initially available video, which makes playback more stable.
'''
print(f'[debug @ {time.time():.3f}] get_starting_segment()')
playlist, _ = get_playlist()
index = max(0, len(playlist.segments) - CONFIG['SEGMENT_STREAM_INITIAL_BUFFER'])
return playlist.segments[index]
def get_next_segment(uri):
'''
Look for the segment with uri `uri` and return the segment that
follows it, or None if no such segment exists.
'''
#print(f'[debug @ {time.time():.3f}] get_next_segment({uri!r})')
playlist, _ = get_playlist()
found = False
for segment in playlist.segments:
if found:
break
elif segment.uri == uri:
found = True
else:
segment = None
return segment
async def get_segment_uris():
try:
segment = get_starting_segment()
except Offline:
return
else:
yield segment.init_section.uri
while True:
yield segment.uri
t0 = time.monotonic()
while True:
try:
next_segment = get_next_segment(segment.uri)
except Offline:
return
else:
if next_segment is not None:
segment = next_segment
break
elif time.monotonic() - t0 >= CONFIG['SEGMENT_SEARCH_TIMEOUT']:
return
else:
await asyncio.sleep(CONFIG['SEGMENT_SEARCH_COOLDOWN'])
def path_for(uri):
path = os.path.normpath(
os.path.join(CONFIG['SEGMENT_DIRECTORY'], uri)
)
if os.path.dirname(path) != CONFIG['SEGMENT_DIRECTORY']:
raise UnsafePath(path)
return path
async def segments(segment_read_hook=lambda uri: None, token=None):
print(f'[debug @ {time.time():.3f}: {token=}] entering segment generator')
uri = None
async for uri in get_segment_uris():
#print(f'[debug @ {time.time():.3f}: {token=}] {uri=}')
try:
path = path_for(uri)
except UnsafePath as e:
unsafe_path, *_ = e.args
print(
f'[debug @ {time.time():.3f}: {token=}] '
f'segment {uri=} has unsafe {path=}'
)
break
segment_read_hook(uri)
try:
async with aiofiles.open(path, 'rb') as fp:
while chunk := await fp.read(8192):
yield chunk
# increment segment index
self.index += 1
except FileNotFoundError:
print(
f'[debug @ {time.time():.3f}: {token=}] '
f'segment {uri=} at {path=} unexpectedly does not exist'
)
break
else:
print(
f'[debug @ {time.time():.3f}: {token=}] '
f'could not find segment following {uri=} after at least '
f'{CONFIG["SEGMENT_SEARCH_TIMEOUT"]} seconds'
)
print(f'[debug @ {time.time():.3f}: {token=}] exiting segment generator')

ファイルの表示

@ -1,5 +1,29 @@
import time
from anonstream.segments import get_playlist, Offline
def get_stream_title():
return 'Stream title'
def get_stream_uptime():
return None
def get_stream_uptime(rounded=True):
try:
playlist, mtime = get_playlist()
except Offline:
return None
else:
last_modified_ago = time.time() - mtime
n_segments = playlist.media_sequence + len(playlist.segments)
duration = playlist.target_duration * n_segments
uptime = duration + last_modified_ago
uptime = round(uptime, 2) if rounded else uptime
return uptime
def is_online():
try:
get_playlist()
except Offline:
return False
else:
return True

ファイルの表示

@ -2,10 +2,10 @@
<html id="nochat">
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="/static/style.css" type="text/css">
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}" type="text/css">
</head>
<body id="both" data-token="{{ user.token }}">
<video id="stream" src="{{ url_for('stream', token=user.token) }}" controls></video>
<video id="stream" src="{{ url_for('stream', token=user.token) }}" autoplay controls></video>
<article id="info">
<noscript><iframe id="info_nojs" src="{{ url_for('nojs_info', token=user.token) }}" data-js="false"></iframe></noscript>
</article>
@ -24,6 +24,6 @@
<a href="#both">both</a>
</nav>
<footer>anonstream 1.0.0 &mdash; <a href="#" target="_blank">source</a></footer>
<script src="/static/anonstream.js" type="text/javascript"></script>
<script src="{{ url_for('static', filename='anonstream.js') }}" type="text/javascript"></script>
</body>
</html>

ファイルの表示

@ -101,8 +101,14 @@ def change_tripcode(user, password, dry_run=False):
def delete_tripcode(user):
user['tripcode'] = None
def see(user):
user['last']['seen'] = int(time.time())
@with_timestamp
def see(timestamp, user):
user['last']['seen'] = timestamp
@with_timestamp
def watched(timestamp, user):
user['last']['seen'] = timestamp
user['last']['watching'] = timestamp
@with_timestamp
def get_all_users_for_websocket(timestamp):

ファイルの表示

@ -31,3 +31,25 @@ def try_except_log(errors, exception_class):
return wrapper
return try_except_log_specific
def ttl_cache(ttl):
'''
Expiring cache with exactly one slot. Only wraps
functions that take no arguments.
'''
def ttl_cache_specific(f):
value, expires = None, None
@wraps(f)
def wrapper():
nonlocal value, expires
if expires is None or time.monotonic() >= expires:
value = f()
expires = time.monotonic() + ttl
return value
return wrapper
return ttl_cache_specific

ファイルの表示

@ -3,8 +3,14 @@ secret_key = "test"
[auth]
username = "broadcaster"
[stream]
segments_dir = "stream/"
[segments]
directory = "stream/"
playlist = "stream.m3u8"
playlist_stale_threshold = 8.0
playlist_cache_lifetime = 0.2
search_cooldown = 0.25
search_timeout = 5.0
stream_initial_buffer = 3
[captcha]
lifetime = 1800
@ -21,9 +27,9 @@ chat_messages = 8192
chat_scrollback = 256
[tasks]
rotate_users = 60
rotate_captchas = 60
broadcast_users_update = 4
rotate_users = 60.0
rotate_captchas = 60.0
broadcast_users_update = 4.0
[names]
broadcaster = "Broadcaster"
@ -36,11 +42,11 @@ min_name_contrast = 3.0
background_color = "#232327"
[flood]
duration = 20
duration = 20.0
threshold = 4
[thresholds]
user_notwatching = 8
user_tentative = 20
user_absent = 360
nojs_chat_timeout = 30
user_notwatching = 8.0
user_tentative = 20.0
user_absent = 360.0
nojs_chat_timeout = 30.0