diff --git a/anonstream/segments.py b/anonstream/segments.py index 2267818..cc4edca 100644 --- a/anonstream/segments.py +++ b/anonstream/segments.py @@ -9,7 +9,7 @@ import aiofiles import m3u8 from quart import current_app -from anonstream.wrappers import ttl_cache, with_timestamp +from anonstream.wrappers import ttl_cache CONFIG = current_app.config diff --git a/anonstream/stream.py b/anonstream/stream.py index 6f130c0..0ee9e52 100644 --- a/anonstream/stream.py +++ b/anonstream/stream.py @@ -44,7 +44,7 @@ def get_stream_uptime(rounded=True): uptime = round(uptime, 2) if rounded else uptime return uptime -@with_timestamp +@with_timestamp() def get_raw_viewership(timestamp): users = get_watching_users(timestamp) return max( @@ -52,8 +52,8 @@ def get_raw_viewership(timestamp): default=0, ) -def get_stream_uptime_and_viewership(for_websocket=False): - uptime = get_stream_uptime() +def get_stream_uptime_and_viewership(rounded=True, for_websocket=False): + uptime = get_stream_uptime(rounded=rounded) if not for_websocket: viewership = None if uptime is None else get_raw_viewership() result = (uptime, viewership) diff --git a/anonstream/tasks.py b/anonstream/tasks.py index f25a2fc..73a714c 100644 --- a/anonstream/tasks.py +++ b/anonstream/tasks.py @@ -44,7 +44,7 @@ def with_period(period): return periodically @with_period(CONFIG['TASK_ROTATE_EYES']) -@with_timestamp +@with_timestamp() async def t_delete_eyes(timestamp, iteration): if iteration == 0: return @@ -59,7 +59,7 @@ async def t_delete_eyes(timestamp, iteration): user['eyes']['current'].pop(eyes_id) @with_period(CONFIG['TASK_ROTATE_USERS']) -@with_timestamp +@with_timestamp() async def t_sunset_users(timestamp, iteration): if iteration == 0: return @@ -102,7 +102,7 @@ async def t_expire_captchas(iteration): CAPTCHAS.pop(digest) @with_period(CONFIG['TASK_ROTATE_WEBSOCKETS']) -@with_timestamp +@with_timestamp() async def t_close_websockets(timestamp, iteration): THRESHOLD = CONFIG['TASK_BROADCAST_PING'] * 1.5 + 4.0 if iteration == 0: diff --git a/anonstream/user.py b/anonstream/user.py index 75c50eb..32ea0a7 100644 --- a/anonstream/user.py +++ b/anonstream/user.py @@ -127,16 +127,16 @@ def change_tripcode(user, password, dry_run=False): def delete_tripcode(user): user['tripcode'] = None -@with_timestamp +@with_timestamp() def see(timestamp, user): user['last']['seen'] = timestamp -@with_timestamp +@with_timestamp() def watched(timestamp, user): user['last']['seen'] = timestamp user['last']['watching'] = timestamp -@with_timestamp +@with_timestamp() def get_all_users_for_websocket(timestamp): return { user['token_hash']: get_user_for_websocket(user) @@ -160,7 +160,7 @@ def verify(user, digest, answer): return verification_happened -@with_timestamp +@with_timestamp() def deverify(timestamp, user): if not user['verified']: return @@ -182,7 +182,7 @@ def _update_presence(timestamp, user): USERS_UPDATE_BUFFER.add(user['token']) return user['presence'] -@with_timestamp +@with_timestamp() def update_presence(timestamp, user): return _update_presence(timestamp, user) @@ -224,7 +224,7 @@ def get_unsunsettable_users(timestamp): get_users_and_update_presence(timestamp), ) -@with_timestamp +@with_timestamp() def get_users_by_presence(timestamp): users_by_presence = { Presence.WATCHING: [], @@ -236,7 +236,7 @@ def get_users_by_presence(timestamp): users_by_presence[user['presence']].append(user) return users_by_presence -@with_timestamp +@with_timestamp(precise=True) def create_eyes(timestamp, user, headers): # Enforce cooldown last_created_ago = timestamp - user['last']['eyes'] @@ -271,7 +271,7 @@ def create_eyes(timestamp, user, headers): } return eyes_id -@with_timestamp +@with_timestamp(precise=True) def renew_eyes(timestamp, user, eyes_id, just_read_new_segment=False): try: eyes = user['eyes']['current'][eyes_id] diff --git a/anonstream/websocket.py b/anonstream/websocket.py index d9bb0cc..4878e1e 100644 --- a/anonstream/websocket.py +++ b/anonstream/websocket.py @@ -72,7 +72,7 @@ async def websocket_inbound(queue, user): if payload is not None: queue.put_nowait(payload) -@with_timestamp +@with_timestamp() def handle_inbound_pong(timestamp, queue, user): print(f'[pong] {user["token"]}') user['websockets'][queue] = timestamp diff --git a/anonstream/wrappers.py b/anonstream/wrappers.py index 6e1e857..3a8ead1 100644 --- a/anonstream/wrappers.py +++ b/anonstream/wrappers.py @@ -4,23 +4,25 @@ import time from functools import wraps -def with_timestamp(f): - @wraps(f) - def wrapper(*args, **kwargs): - timestamp = int(time.time()) - return f(timestamp, *args, **kwargs) - - return wrapper - -def with_first_argument(x): - def with_x(f): +def with_function_call(fn, *fn_args, **fn_kwargs): + def with_result(f): @wraps(f) def wrapper(*args, **kwargs): - return f(x, *args, **kwargs) - + result = fn(*fn_args, **fn_kwargs) + return f(result, *args, **kwargs) return wrapper + return with_result - return with_x +def with_constant(x): + return with_function_call(lambda: x) + +def with_timestamp(monotonic=False, precise=False): + n = 1_000_000_000 + if monotonic: + fn = precise and time.monotonic or (lambda: time.monotonic_ns() // n) + else: + fn = precise and time.time or (lambda: time.time_ns() // n) + return with_function_call(fn) def try_except_log(errors, exception_class): def try_except_log_specific(f): @@ -30,9 +32,7 @@ def try_except_log(errors, exception_class): return f(*args, **kwargs) except exception_class as e: errors.append(e) - return wrapper - return try_except_log_specific def ttl_cache(ttl): @@ -42,19 +42,14 @@ def ttl_cache(ttl): ''' def ttl_cache_specific(f): value, expires = None, None - @wraps(f) def wrapper(): nonlocal value, expires - if expires is None or time.monotonic() >= expires: value = f() expires = time.monotonic() + ttl - return value - return wrapper - return ttl_cache_specific def ttl_cache_async(ttl): @@ -63,17 +58,12 @@ def ttl_cache_async(ttl): ''' def ttl_cache_specific(f): value, expires = None, None - @wraps(f) async def wrapper(): nonlocal value, expires - if expires is None or time.monotonic() >= expires: value = await f() expires = time.monotonic() + ttl - return value - return wrapper - return ttl_cache_specific