Buffer new and mutated users before sending to websockets

By default the buffer is exhausted every 4 seconds. This should defend against
a potential DoS against clients with JavaScript enabled. Before this, any
request with no token would generate a new user and immediately broadcast the
new user to all the websockets. It's best to lock down as much as possible the
number of places a client can cause the server to broadcasts to all the
websockets.
このコミットが含まれているのは:
n9k 2022-02-20 07:20:43 +00:00
コミット 6ceb553b29
11個のファイルの変更100行の追加75行の削除

ファイルの表示

@ -31,8 +31,9 @@ def create_app(config_file):
'MAX_CAPTCHAS': config['memory']['captchas'],
'MAX_CHAT_MESSAGES': config['memory']['chat_messages'],
'MAX_CHAT_SCROLLBACK': config['memory']['chat_scrollback'],
'CHECKUP_PERIOD_USER': config['intervals']['sunset_users'],
'CHECKUP_PERIOD_CAPTCHA': config['intervals']['expire_captchas'],
'TASK_PERIOD_ROTATE_USERS': config['tasks']['rotate_users'],
'TASK_PERIOD_ROTATE_CAPTCHAS': config['tasks']['rotate_captchas'],
'TASK_PERIOD_BROADCAST_USERS_UPDATE': config['tasks']['broadcast_users_update'],
'THRESHOLD_USER_NOTWATCHING': config['thresholds']['user_notwatching'],
'THRESHOLD_USER_TENTATIVE': config['thresholds']['user_tentative'],
'THRESHOLD_USER_ABSENT': config['thresholds']['user_absent'],
@ -61,13 +62,18 @@ def create_app(config_file):
)
app.messages_by_id = OrderedDict()
app.users_by_token = {}
app.messages = app.messages_by_id.values()
app.users_by_token = {}
app.users = app.users_by_token.values()
app.segments_directory_cache = DirectoryCache(config['stream']['segments_dir'])
app.captchas = OrderedDict()
app.captcha_factory = create_captcha_factory(app.config['CAPTCHA_FONTS'])
app.captcha_signer = create_captcha_signer(app.config['SECRET_KEY'])
app.captchas = OrderedDict()
app.users_update_buffer = set()
app.background_sleep = set()

29
anonstream/broadcast.py ノーマルファイル
ファイルの表示

@ -0,0 +1,29 @@
from quart import current_app
from anonstream.utils.user import get_user_for_websocket
USERS = current_app.users
USERS_BY_TOKEN = current_app.users_by_token
USERS_UPDATE_BUFFER = current_app.users_update_buffer
def broadcast(users, payload):
for user in users:
for queue in user['websockets']:
queue.put_nowait(payload)
def broadcast_users_update():
users_for_websocket = {}
for token in USERS_UPDATE_BUFFER:
user = USERS_BY_TOKEN[token]
token_hash = user['token_hash']
users_for_websocket[token_hash] = get_user_for_websocket(user)
if users_for_websocket:
broadcast(
users=USERS,
payload={
'type': 'set-users',
'users': users_for_websocket,
},
)
USERS_UPDATE_BUFFER.clear()

ファイルの表示

@ -3,8 +3,9 @@ from datetime import datetime
from quart import current_app, escape
from anonstream.broadcast import broadcast, broadcast_users_update
from anonstream.helpers.chat import generate_nonce_hash, get_scrollback
from anonstream.utils.chat import message_for_websocket
from anonstream.utils.chat import get_message_for_websocket
CONFIG = current_app.config
MESSAGES_BY_ID = current_app.messages_by_id
@ -15,14 +16,9 @@ USERS = current_app.users
class Rejected(ValueError):
pass
def broadcast(users, payload):
for user in users:
for queue in user['websockets']:
queue.put_nowait(payload)
def messages_for_websocket():
def get_all_messages_for_websocket():
return list(map(
lambda message: message_for_websocket(
lambda message: get_message_for_websocket(
user=USERS_BY_TOKEN[message['token']],
message=message,
),
@ -30,11 +26,11 @@ def messages_for_websocket():
))
def add_chat_message(user, nonce, comment, ignore_empty=False):
# special case: if the comment is empty, do nothing and return
# Special case: if the comment is empty, do nothing and return
if ignore_empty and len(comment) == 0:
return
# check message
# Check message
message_id = generate_nonce_hash(nonce)
if message_id in MESSAGES_BY_ID:
raise Rejected('Discarded suspected duplicate message')
@ -43,7 +39,7 @@ def add_chat_message(user, nonce, comment, ignore_empty=False):
if len(comment) > 512:
raise Rejected('Message exceeded 512 chars')
# add message
# Create and add message
timestamp_ms = time.time_ns() // 1_000_000
timestamp = timestamp_ms // 1000
try:
@ -72,7 +68,11 @@ def add_chat_message(user, nonce, comment, ignore_empty=False):
while len(MESSAGES_BY_ID) > CONFIG['MAX_CHAT_MESSAGES']:
MESSAGES_BY_ID.pop(last=False)
# broadcast message to websockets
# Broadcast a users update to all websockets,
# in case this message is from a new user
broadcast_users_update()
# Broadcast message to websockets
broadcast(
USERS,
payload={

ファイルの表示

@ -5,8 +5,8 @@ from functools import wraps
from quart import current_app, request, abort, make_response, render_template, request
from werkzeug.security import check_password_hash
from anonstream.user import see, user_for_websocket
from anonstream.chat import broadcast
from anonstream.broadcast import broadcast
from anonstream.user import see
from anonstream.helpers.user import generate_user
from anonstream.utils.user import generate_token
@ -14,6 +14,7 @@ CONFIG = current_app.config
MESSAGES = current_app.messages
USERS_BY_TOKEN = current_app.users_by_token
USERS = current_app.users
USERS_UPDATE_BUFFER = current_app.users_update_buffer
def check_auth(context):
auth = context.authorization
@ -65,14 +66,9 @@ def with_user_from(context):
broadcaster=broadcaster,
)
USERS_BY_TOKEN[token] = user
broadcast(
USERS,
payload={
'type': 'add-user',
'token_hash': user['token_hash'],
'user': user_for_websocket(user),
},
)
# Add to the users update buffer
USERS_UPDATE_BUFFER.add(token)
# Set cookie
response = await f(user, *args, **kwargs)

ファイルの表示

@ -329,31 +329,24 @@ const on_websocket_message = (event) => {
});
break;
case "add-user":
console.log("ws add-user", receipt);
users[receipt.token_hash] = receipt.user;
update_user_colors(receipt.token_hash);
update_user_tripcodes(receipt.token_hash);
break;
case "mut-user":
console.log("ws mut-user", receipt);
const user = users[receipt.token_hash];
user.name = receipt.name;
user.color = receipt.color;
user.tripcode = receipt.tripcode;
update_user_names(receipt.token_hash);
update_user_colors(receipt.token_hash);
update_user_tripcodes(receipt.token_hash);
break;
case "set-users":
console.log("ws set-users", receipt);
for (const token_hash of Object.keys(receipt.users)) {
users[token_hash] = receipt.users[token_hash];
}
update_user_names();
update_user_colors();
update_user_tripcodes();
break;
case "rem-users":
console.log("ws rem-users", receipt);
for (const token_hash of receipt.token_hashes) {
delete users[token_hash];
}
update_user_styles();
break;
console.log("ws rem-users", receipt);
for (const token_hash of receipt.token_hashes) {
delete users[token_hash];
}
update_user_colors();
update_user_tripcodes();
break;
default:
console.log("incomprehensible websocket message", receipt);

ファイルの表示

@ -3,7 +3,7 @@ from functools import wraps
from quart import current_app
from anonstream.chat import broadcast
from anonstream.broadcast import broadcast, broadcast_users_update
from anonstream.wrappers import with_timestamp
from anonstream.helpers.user import is_visible
@ -36,7 +36,7 @@ def with_period(period):
return periodically
@with_period(CONFIG['CHECKUP_PERIOD_USER'])
@with_period(CONFIG['TASK_PERIOD_ROTATE_USERS'])
@with_timestamp
def t_sunset_users(timestamp):
tokens = []
@ -51,6 +51,10 @@ def t_sunset_users(timestamp):
token_hash = USERS_BY_TOKEN.pop(token)['token_hash']
token_hashes.append(token_hash)
# Broadcast a users update, in case any users being
# removed have been mutated or are new.
broadcast_users_update()
if token_hashes:
broadcast(
users=USERS,
@ -60,4 +64,9 @@ def t_sunset_users(timestamp):
},
)
@with_period(CONFIG['TASK_PERIOD_BROADCAST_USERS_UPDATE'])
def t_broadcast_users_update():
broadcast_users_update()
current_app.add_background_task(t_sunset_users)
current_app.add_background_task(t_broadcast_users_update)

ファイルの表示

@ -3,18 +3,18 @@ from math import inf
from quart import current_app
from anonstream.chat import broadcast
from anonstream.wrappers import try_except_log, with_timestamp
from anonstream.helpers.user import is_visible
from anonstream.helpers.captcha import check_captcha_digest, Answer
from anonstream.helpers.tripcode import generate_tripcode
from anonstream.utils.colour import color_to_colour, get_contrast, NotAColor
from anonstream.utils.user import user_for_websocket
from anonstream.utils.user import get_user_for_websocket
CONFIG = current_app.config
MESSAGES = current_app.messages
USERS = current_app.users
CAPTCHA_SIGNER = current_app.captcha_signer
USERS_UPDATE_BUFFER = current_app.users_update_buffer
class BadAppearance(ValueError):
pass
@ -57,16 +57,8 @@ def try_change_appearance(user, name, color, password,
elif want_change_tripcode:
change_tripcode(user, password)
broadcast(
USERS,
payload={
'type': 'mut-user',
'token_hash': user['token_hash'],
'name': user['name'],
'color': user['color'],
'tripcode': user['tripcode'],
},
)
# Add to the users update buffer
USERS_UPDATE_BUFFER.add(user['token'])
return errors
@ -113,13 +105,13 @@ def see(user):
user['last']['seen'] = int(time.time())
@with_timestamp
def users_for_websocket(timestamp):
def get_all_users_for_websocket(timestamp):
visible_users = filter(
lambda user: is_visible(timestamp, MESSAGES, user),
USERS,
)
return {
user['token_hash']: user_for_websocket(user)
user['token_hash']: get_user_for_websocket(user)
for user in visible_users
}

ファイルの表示

@ -8,7 +8,7 @@ class NonceReuse(Exception):
def generate_nonce():
return secrets.token_urlsafe(16)
def message_for_websocket(user, message):
def get_message_for_websocket(user, message):
message_keys = ('seq', 'date', 'time_minutes', 'time_seconds', 'markup')
user_keys = ('token_hash',)
return {

ファイルの表示

@ -9,7 +9,7 @@ from quart import escape, Markup
def generate_token():
return secrets.token_hex(16)
def user_for_websocket(user):
def get_user_for_websocket(user):
keys = ['broadcaster', 'name', 'color', 'tripcode']
return {key: user[key] for key in keys}

ファイルの表示

@ -4,9 +4,8 @@ from quart import current_app, websocket
from anonstream.stream import get_stream_title, get_stream_uptime
from anonstream.captcha import get_random_captcha_digest
from anonstream.chat import messages_for_websocket, add_chat_message, Rejected
from anonstream.user import users_for_websocket, see, verify, BadCaptcha
from anonstream.wrappers import with_first_argument
from anonstream.chat import get_all_messages_for_websocket, add_chat_message, Rejected
from anonstream.user import get_all_users_for_websocket, see, verify, BadCaptcha
from anonstream.utils.chat import generate_nonce
from anonstream.utils.websocket import parse_websocket_data, Malformed
@ -18,8 +17,8 @@ async def websocket_outbound(queue, user):
'nonce': generate_nonce(),
'title': get_stream_title(),
'uptime': get_stream_uptime(),
'messages': messages_for_websocket(),
'users': users_for_websocket(),
'messages': get_all_messages_for_websocket(),
'users': get_all_users_for_websocket(),
'default': {
True: CONFIG['DEFAULT_HOST_NAME'],
False: CONFIG['DEFAULT_ANON_NAME'],

ファイルの表示

@ -20,9 +20,10 @@ captchas = 256
chat_messages = 8192
chat_scrollback = 256
[intervals]
sunset_users = 60
expire_captchas = 60
[tasks]
rotate_users = 60
rotate_captchas = 60
broadcast_users_update = 4
[names]
broadcaster = "Broadcaster"