Add websocket ping/pong

Client and server both close the connection if they don't hear from the
other party after a timeout period. This is a failsafe and should
improve reliability.
このコミットが含まれているのは:
n9k 2022-04-02 04:46:24 +00:00
コミット 4b68023cf2
8個のファイルの変更80行の追加13行の削除

ファイルの表示

@ -53,6 +53,8 @@ def create_app(config_file):
'MAX_CHAT_SCROLLBACK': config['memory']['chat_scrollback'],
'TASK_PERIOD_ROTATE_USERS': config['tasks']['rotate_users'],
'TASK_PERIOD_ROTATE_CAPTCHAS': config['tasks']['rotate_captchas'],
'TASK_PERIOD_ROTATE_WEBSOCKETS': config['tasks']['rotate_websockets'],
'TASK_PERIOD_BROADCAST_PING': config['tasks']['broadcast_ping'],
'TASK_PERIOD_BROADCAST_USERS_UPDATE': config['tasks']['broadcast_users_update'],
'TASK_PERIOD_BROADCAST_STREAM_INFO_UPDATE': config['tasks']['broadcast_stream_info_update'],
'THRESHOLD_USER_NOTWATCHING': config['thresholds']['user_notwatching'],

ファイルの表示

@ -35,7 +35,7 @@ def generate_user(timestamp, token, broadcaster, presence):
'tag': tag,
'broadcaster': broadcaster,
'verified': broadcaster,
'websockets': set(),
'websockets': {},
'name': None,
'color': colour_to_color(colour),
'tripcode': None,

ファイルの表示

@ -3,6 +3,8 @@
import asyncio
from math import inf
from quart import current_app, websocket
from anonstream.user import see
@ -13,7 +15,7 @@ from anonstream.routes.wrappers import with_user_from
@with_user_from(websocket)
async def live(user):
queue = asyncio.Queue(maxsize=0)
user['websockets'].add(queue)
user['websockets'][queue] = -inf
producer = websocket_outbound(queue, user)
consumer = websocket_inbound(queue, user)
@ -21,8 +23,8 @@ async def live(user):
await asyncio.gather(producer, consumer)
finally:
see(user)
user['websockets'].remove(queue)
user['websockets'].pop(queue)
try:
await websocket.close(1000)
await websocket.close(1001)
except RuntimeError:
pass

ファイルの表示

@ -269,6 +269,9 @@ let stats = null;
let stats_received = null;
let default_name = {true: "Broadcaster", false: "Anonymous"};
let max_chat_scrollback = 256;
let pingpong_period = 8.0;
let ping = null;
const pingpong_timeout = () => pingpong_period * 1.5 + 4.0;
const tidy_stylesheet = ({stylesheet, selector_regex, ignore_condition}) => {
const to_delete = [];
const to_ignore = new Set();
@ -592,7 +595,7 @@ const on_websocket_message = (event) => {
case "init":
console.log("ws init", receipt);
// set title
pingpong_period = receipt.pingpong;
set_title(receipt.title);
// update stats (uptime/viewership)
@ -775,6 +778,13 @@ const on_websocket_message = (event) => {
break;
case "ping":
console.log("ws ping");
ping = new Date();
const payload = {type: "pong"};
ws.send(JSON.stringify(payload));
break;
default:
console.log("incomprehensible websocket message", receipt);
}
@ -906,3 +916,14 @@ const chat_messages_unlock = document.getElementById("chat-messages-unlock");
chat_messages_unlock.addEventListener("click", (event) => {
chat_messages.scrollTop = chat_messages.scrollTopMax;
});
/* close websocket after prolonged absence of pings */
const rotate_websocket = () => {
const this_pingpong_timeout = pingpong_timeout();
if (ping === null || (new Date() - ping) / 1000 > this_pingpong_timeout) {
console.log(`no pings heard in ${this_pingpong_timeout} seconds, closing websocket...`);
ws.close();
}
setTimeout(rotate_websocket, this_pingpong_timeout * 1000);
};
setTimeout(rotate_websocket, pingpong_timeout() * 1000);

ファイルの表示

@ -5,7 +5,7 @@ import asyncio
import itertools
from functools import wraps
from quart import current_app
from quart import current_app, websocket
from anonstream.broadcast import broadcast, broadcast_users_update
from anonstream.stream import is_online, get_stream_title, get_stream_uptime_and_viewership
@ -86,6 +86,27 @@ async def t_expire_captchas(iteration):
for digest in to_delete:
CAPTCHAS.pop(digest)
@with_period(CONFIG['TASK_PERIOD_ROTATE_WEBSOCKETS'])
@with_timestamp
async def t_close_websockets(timestamp, iteration):
THRESHOLD = CONFIG['TASK_PERIOD_BROADCAST_PING'] * 1.5 + 4.0
if iteration == 0:
return
else:
for user in USERS:
for queue in user['websockets']:
last_pong = user['websockets'][queue]
last_pong_ago = timestamp - last_pong
if last_pong_ago > THRESHOLD:
queue.put_nowait({'type': 'close'})
@with_period(CONFIG['TASK_PERIOD_BROADCAST_PING'])
async def t_broadcast_ping(iteration):
if iteration == 0:
return
else:
broadcast(USERS, payload={'type': 'ping'})
@with_period(CONFIG['TASK_PERIOD_BROADCAST_USERS_UPDATE'])
async def t_broadcast_users_update(iteration):
if iteration == 0:
@ -147,5 +168,7 @@ async def t_broadcast_stream_info_update(iteration):
current_app.add_background_task(t_sunset_users)
current_app.add_background_task(t_expire_captchas)
current_app.add_background_task(t_close_websockets)
current_app.add_background_task(t_broadcast_ping)
current_app.add_background_task(t_broadcast_users_update)
current_app.add_background_task(t_broadcast_stream_info_update)

ファイルの表示

@ -3,7 +3,7 @@
from enum import Enum
WS = Enum('WS', names=('MESSAGE, CAPTCHA, APPEARANCE'))
WS = Enum('WS', names=('PONG', 'MESSAGE', 'CAPTCHA', 'APPEARANCE'))
class Malformed(Exception):
pass
@ -48,5 +48,8 @@ def parse_websocket_data(receipt):
case 'captcha':
return WS.CAPTCHA, ()
case 'pong':
return WS.PONG, ()
case _:
raise Malformed('malformed type')

ファイルの表示

@ -10,6 +10,7 @@ from anonstream.stream import get_stream_title, get_stream_uptime_and_viewership
from anonstream.captcha import get_random_captcha_digest_for
from anonstream.chat import get_all_messages_for_websocket, add_chat_message, Rejected
from anonstream.user import get_all_users_for_websocket, see, verify, deverify, BadCaptcha, try_change_appearance
from anonstream.wrappers import with_timestamp
from anonstream.utils.chat import generate_nonce
from anonstream.utils.websocket import parse_websocket_data, Malformed, WS
@ -29,11 +30,16 @@ async def websocket_outbound(queue, user):
},
'scrollback': CONFIG['MAX_CHAT_SCROLLBACK'],
'digest': get_random_captcha_digest_for(user),
'pingpong': CONFIG['TASK_PERIOD_BROADCAST_PING'],
}
await websocket.send_json(payload)
while True:
payload = await queue.get()
await websocket.send_json(payload)
if payload['type'] == 'close':
await websocket.close(1011)
break
else:
await websocket.send_json(payload)
async def websocket_inbound(queue, user):
while True:
@ -59,17 +65,25 @@ async def websocket_inbound(queue, user):
handle = handle_inbound_appearance
case WS.CAPTCHA:
handle = handle_inbound_captcha
payload = handle(user, *parsed)
case WS.PONG:
handle = handle_inbound_pong
payload = handle(queue, user, *parsed)
queue.put_nowait(payload)
if payload is not None:
queue.put_nowait(payload)
def handle_inbound_captcha(user):
@with_timestamp
def handle_inbound_pong(timestamp, queue, user):
user['websockets'][queue] = timestamp
return None
def handle_inbound_captcha(queue, user):
return {
'type': 'captcha',
'digest': get_random_captcha_digest_for(user),
}
def handle_inbound_appearance(user, name, color, password, want_tripcode):
def handle_inbound_appearance(queue, user, name, color, password, want_tripcode):
errors = try_change_appearance(user, name, color, password, want_tripcode)
if errors:
return {
@ -85,7 +99,7 @@ def handle_inbound_appearance(user, name, color, password, want_tripcode):
#'tripcode': user['tripcode'],
}
def handle_inbound_message(user, nonce, comment, digest, answer):
def handle_inbound_message(queue, user, nonce, comment, digest, answer):
try:
verification_happened = verify(user, digest, answer)
except BadCaptcha as e:

ファイルの表示

@ -33,6 +33,8 @@ chat_scrollback = 256
[tasks]
rotate_users = 60.0
rotate_captchas = 60.0
rotate_websockets = 2.0
broadcast_ping = 8.0
broadcast_users_update = 4.0
broadcast_stream_info_update = 3.0