From 4b68023cf215dbf5a321d9ac3e68ced6707d1700 Mon Sep 17 00:00:00 2001 From: n9k Date: Sat, 2 Apr 2022 04:46:24 +0000 Subject: [PATCH] Add websocket ping/pong Client and server both close the connection if they don't hear from the other party after a timeout period. This is a failsafe and should improve reliability. --- anonstream/__init__.py | 2 ++ anonstream/helpers/user.py | 2 +- anonstream/routes/websocket.py | 8 +++++--- anonstream/static/anonstream.js | 23 ++++++++++++++++++++++- anonstream/tasks.py | 25 ++++++++++++++++++++++++- anonstream/utils/websocket.py | 5 ++++- anonstream/websocket.py | 26 ++++++++++++++++++++------ config.toml | 2 ++ 8 files changed, 80 insertions(+), 13 deletions(-) diff --git a/anonstream/__init__.py b/anonstream/__init__.py index e8c0b14..ce9dd30 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -53,6 +53,8 @@ def create_app(config_file): 'MAX_CHAT_SCROLLBACK': config['memory']['chat_scrollback'], 'TASK_PERIOD_ROTATE_USERS': config['tasks']['rotate_users'], 'TASK_PERIOD_ROTATE_CAPTCHAS': config['tasks']['rotate_captchas'], + 'TASK_PERIOD_ROTATE_WEBSOCKETS': config['tasks']['rotate_websockets'], + 'TASK_PERIOD_BROADCAST_PING': config['tasks']['broadcast_ping'], 'TASK_PERIOD_BROADCAST_USERS_UPDATE': config['tasks']['broadcast_users_update'], 'TASK_PERIOD_BROADCAST_STREAM_INFO_UPDATE': config['tasks']['broadcast_stream_info_update'], 'THRESHOLD_USER_NOTWATCHING': config['thresholds']['user_notwatching'], diff --git a/anonstream/helpers/user.py b/anonstream/helpers/user.py index 08f897b..dbe48d4 100644 --- a/anonstream/helpers/user.py +++ b/anonstream/helpers/user.py @@ -35,7 +35,7 @@ def generate_user(timestamp, token, broadcaster, presence): 'tag': tag, 'broadcaster': broadcaster, 'verified': broadcaster, - 'websockets': set(), + 'websockets': {}, 'name': None, 'color': colour_to_color(colour), 'tripcode': None, diff --git a/anonstream/routes/websocket.py b/anonstream/routes/websocket.py index 158561a..665085a 100644 --- a/anonstream/routes/websocket.py +++ b/anonstream/routes/websocket.py @@ -3,6 +3,8 @@ import asyncio +from math import inf + from quart import current_app, websocket from anonstream.user import see @@ -13,7 +15,7 @@ from anonstream.routes.wrappers import with_user_from @with_user_from(websocket) async def live(user): queue = asyncio.Queue(maxsize=0) - user['websockets'].add(queue) + user['websockets'][queue] = -inf producer = websocket_outbound(queue, user) consumer = websocket_inbound(queue, user) @@ -21,8 +23,8 @@ async def live(user): await asyncio.gather(producer, consumer) finally: see(user) - user['websockets'].remove(queue) + user['websockets'].pop(queue) try: - await websocket.close(1000) + await websocket.close(1001) except RuntimeError: pass diff --git a/anonstream/static/anonstream.js b/anonstream/static/anonstream.js index 08eed06..e756354 100644 --- a/anonstream/static/anonstream.js +++ b/anonstream/static/anonstream.js @@ -269,6 +269,9 @@ let stats = null; let stats_received = null; let default_name = {true: "Broadcaster", false: "Anonymous"}; let max_chat_scrollback = 256; +let pingpong_period = 8.0; +let ping = null; +const pingpong_timeout = () => pingpong_period * 1.5 + 4.0; const tidy_stylesheet = ({stylesheet, selector_regex, ignore_condition}) => { const to_delete = []; const to_ignore = new Set(); @@ -592,7 +595,7 @@ const on_websocket_message = (event) => { case "init": console.log("ws init", receipt); - // set title + pingpong_period = receipt.pingpong; set_title(receipt.title); // update stats (uptime/viewership) @@ -775,6 +778,13 @@ const on_websocket_message = (event) => { break; + case "ping": + console.log("ws ping"); + ping = new Date(); + const payload = {type: "pong"}; + ws.send(JSON.stringify(payload)); + break; + default: console.log("incomprehensible websocket message", receipt); } @@ -906,3 +916,14 @@ const chat_messages_unlock = document.getElementById("chat-messages-unlock"); chat_messages_unlock.addEventListener("click", (event) => { chat_messages.scrollTop = chat_messages.scrollTopMax; }); + +/* close websocket after prolonged absence of pings */ +const rotate_websocket = () => { + const this_pingpong_timeout = pingpong_timeout(); + if (ping === null || (new Date() - ping) / 1000 > this_pingpong_timeout) { + console.log(`no pings heard in ${this_pingpong_timeout} seconds, closing websocket...`); + ws.close(); + } + setTimeout(rotate_websocket, this_pingpong_timeout * 1000); +}; +setTimeout(rotate_websocket, pingpong_timeout() * 1000); diff --git a/anonstream/tasks.py b/anonstream/tasks.py index 1ee79d7..694b23c 100644 --- a/anonstream/tasks.py +++ b/anonstream/tasks.py @@ -5,7 +5,7 @@ import asyncio import itertools from functools import wraps -from quart import current_app +from quart import current_app, websocket from anonstream.broadcast import broadcast, broadcast_users_update from anonstream.stream import is_online, get_stream_title, get_stream_uptime_and_viewership @@ -86,6 +86,27 @@ async def t_expire_captchas(iteration): for digest in to_delete: CAPTCHAS.pop(digest) +@with_period(CONFIG['TASK_PERIOD_ROTATE_WEBSOCKETS']) +@with_timestamp +async def t_close_websockets(timestamp, iteration): + THRESHOLD = CONFIG['TASK_PERIOD_BROADCAST_PING'] * 1.5 + 4.0 + if iteration == 0: + return + else: + for user in USERS: + for queue in user['websockets']: + last_pong = user['websockets'][queue] + last_pong_ago = timestamp - last_pong + if last_pong_ago > THRESHOLD: + queue.put_nowait({'type': 'close'}) + +@with_period(CONFIG['TASK_PERIOD_BROADCAST_PING']) +async def t_broadcast_ping(iteration): + if iteration == 0: + return + else: + broadcast(USERS, payload={'type': 'ping'}) + @with_period(CONFIG['TASK_PERIOD_BROADCAST_USERS_UPDATE']) async def t_broadcast_users_update(iteration): if iteration == 0: @@ -147,5 +168,7 @@ async def t_broadcast_stream_info_update(iteration): current_app.add_background_task(t_sunset_users) current_app.add_background_task(t_expire_captchas) +current_app.add_background_task(t_close_websockets) +current_app.add_background_task(t_broadcast_ping) current_app.add_background_task(t_broadcast_users_update) current_app.add_background_task(t_broadcast_stream_info_update) diff --git a/anonstream/utils/websocket.py b/anonstream/utils/websocket.py index dbd6142..294b75f 100644 --- a/anonstream/utils/websocket.py +++ b/anonstream/utils/websocket.py @@ -3,7 +3,7 @@ from enum import Enum -WS = Enum('WS', names=('MESSAGE, CAPTCHA, APPEARANCE')) +WS = Enum('WS', names=('PONG', 'MESSAGE', 'CAPTCHA', 'APPEARANCE')) class Malformed(Exception): pass @@ -48,5 +48,8 @@ def parse_websocket_data(receipt): case 'captcha': return WS.CAPTCHA, () + case 'pong': + return WS.PONG, () + case _: raise Malformed('malformed type') diff --git a/anonstream/websocket.py b/anonstream/websocket.py index cc4026b..739af37 100644 --- a/anonstream/websocket.py +++ b/anonstream/websocket.py @@ -10,6 +10,7 @@ from anonstream.stream import get_stream_title, get_stream_uptime_and_viewership from anonstream.captcha import get_random_captcha_digest_for from anonstream.chat import get_all_messages_for_websocket, add_chat_message, Rejected from anonstream.user import get_all_users_for_websocket, see, verify, deverify, BadCaptcha, try_change_appearance +from anonstream.wrappers import with_timestamp from anonstream.utils.chat import generate_nonce from anonstream.utils.websocket import parse_websocket_data, Malformed, WS @@ -29,11 +30,16 @@ async def websocket_outbound(queue, user): }, 'scrollback': CONFIG['MAX_CHAT_SCROLLBACK'], 'digest': get_random_captcha_digest_for(user), + 'pingpong': CONFIG['TASK_PERIOD_BROADCAST_PING'], } await websocket.send_json(payload) while True: payload = await queue.get() - await websocket.send_json(payload) + if payload['type'] == 'close': + await websocket.close(1011) + break + else: + await websocket.send_json(payload) async def websocket_inbound(queue, user): while True: @@ -59,17 +65,25 @@ async def websocket_inbound(queue, user): handle = handle_inbound_appearance case WS.CAPTCHA: handle = handle_inbound_captcha - payload = handle(user, *parsed) + case WS.PONG: + handle = handle_inbound_pong + payload = handle(queue, user, *parsed) - queue.put_nowait(payload) + if payload is not None: + queue.put_nowait(payload) -def handle_inbound_captcha(user): +@with_timestamp +def handle_inbound_pong(timestamp, queue, user): + user['websockets'][queue] = timestamp + return None + +def handle_inbound_captcha(queue, user): return { 'type': 'captcha', 'digest': get_random_captcha_digest_for(user), } -def handle_inbound_appearance(user, name, color, password, want_tripcode): +def handle_inbound_appearance(queue, user, name, color, password, want_tripcode): errors = try_change_appearance(user, name, color, password, want_tripcode) if errors: return { @@ -85,7 +99,7 @@ def handle_inbound_appearance(user, name, color, password, want_tripcode): #'tripcode': user['tripcode'], } -def handle_inbound_message(user, nonce, comment, digest, answer): +def handle_inbound_message(queue, user, nonce, comment, digest, answer): try: verification_happened = verify(user, digest, answer) except BadCaptcha as e: diff --git a/config.toml b/config.toml index f34542a..b5cf4b9 100644 --- a/config.toml +++ b/config.toml @@ -33,6 +33,8 @@ chat_scrollback = 256 [tasks] rotate_users = 60.0 rotate_captchas = 60.0 +rotate_websockets = 2.0 +broadcast_ping = 8.0 broadcast_users_update = 4.0 broadcast_stream_info_update = 3.0