From 43e1a33088569f4013bc2e4faea3ef95d303e2ec Mon Sep 17 00:00:00 2001 From: n9k Date: Thu, 17 Feb 2022 12:51:09 +0000 Subject: [PATCH] Nojs appearance form, tripcodes, colours --- anonstream/__init__.py | 16 ++- anonstream/helpers/tripcode.py | 40 ++++++ anonstream/helpers/user.py | 11 +- anonstream/routes/nojs.py | 52 +++++++- anonstream/templates/nojs_form.html | 18 +-- anonstream/user.py | 57 ++++++++- anonstream/utils/__init__.py | 2 - anonstream/utils/colour.py | 192 ++++++++++++++++++++++++++++ anonstream/utils/user.py | 13 ++ anonstream/websocket.py | 5 +- anonstream/wrappers.py | 13 ++ config.toml | 10 +- 12 files changed, 395 insertions(+), 34 deletions(-) create mode 100644 anonstream/helpers/tripcode.py delete mode 100644 anonstream/utils/__init__.py create mode 100644 anonstream/utils/colour.py diff --git a/anonstream/__init__.py b/anonstream/__init__.py index 7e22a47..aed7287 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -6,6 +6,7 @@ from quart import Quart from werkzeug.security import generate_password_hash from anonstream.utils.user import generate_token +from anonstream.utils.colour import color_to_colour from anonstream.segments import DirectoryCache async def create_app(): @@ -25,15 +26,20 @@ async def create_app(): 'AUTH_TOKEN': generate_token(), 'DEFAULT_HOST_NAME': config['names']['broadcaster'], 'DEFAULT_ANON_NAME': config['names']['anonymous'], - 'MAX_NOTICES': config['limits']['notices'], - 'MAX_CHAT_STORAGE': config['limits']['chat_storage'], - 'MAX_CHAT_SCROLLBACK': config['limits']['chat_scrollback'], - 'USER_CHECKUP_PERIOD': config['ratelimits']['user_absence'], - 'CAPTCHA_CHECKUP_PERIOD': config['ratelimits']['captcha_expiry'], + 'MAX_NOTICES': config['memory']['notices'], + 'MAX_CHAT_MESSAGES': config['memory']['chat_messages'], + 'MAX_CHAT_SCROLLBACK': config['memory']['chat_scrollback'], + 'CHECKUP_PERIOD_USER': config['ratelimits']['user_absence'], + 'CHECKUP_PERIOD_CAPTCHA': config['ratelimits']['captcha_expiry'], 'THRESHOLD_IDLE': config['thresholds']['idle'], 'THRESHOLD_ABSENT': config['thresholds']['absent'], + 'CHAT_COMMENT_MAX_LENGTH': config['chat']['max_name_length'], + 'CHAT_NAME_MAX_LENGTH': config['chat']['max_name_length'], + 'CHAT_NAME_MIN_CONTRAST': config['chat']['min_name_contrast'], + 'CHAT_BACKGROUND_COLOUR': color_to_colour(config['chat']['background_color']), }) + assert app.config['MAX_CHAT_MESSAGES'] >= app.config['MAX_CHAT_SCROLLBACK'] assert app.config['THRESHOLD_ABSENT'] >= app.config['THRESHOLD_IDLE'] app.chat = {'messages': OrderedDict(), 'nonce_hashes': set()} diff --git a/anonstream/helpers/tripcode.py b/anonstream/helpers/tripcode.py new file mode 100644 index 0000000..0b0489a --- /dev/null +++ b/anonstream/helpers/tripcode.py @@ -0,0 +1,40 @@ +import base64 +import hashlib + +import werkzeug.security +from quart import current_app + +from anonstream.utils.colour import generate_colour, generate_maximum_contrast_colour, colour_to_color + +CONFIG = current_app.config + +def _generate_tripcode_digest_legacy(password): + hexdigest, _ = werkzeug.security._hash_internal( + 'pbkdf2:sha256:150000', + CONFIG['SECRET_KEY'], + password, + ) + digest = bytes.fromhex(hexdigest) + return base64.b64encode(digest)[:8].decode() + +def generate_tripcode_digest(password): + parts = CONFIG['SECRET_KEY'] + b'tripcode\0' + password.encode() + digest = hashlib.sha256(parts).digest() + return base64.b64encode(digest)[:8].decode() + +def generate_tripcode(password, generate_digest=generate_tripcode_digest): + digest = generate_digest(password) + background_colour = generate_colour( + seed='tripcode-background\0' + digest, + bg=CONFIG['CHAT_BACKGROUND_COLOUR'], + contrast=5.0, + ) + foreground_colour = generate_maximum_contrast_colour( + seed='tripcode-foreground\0' + digest, + bg=background_colour, + ) + return { + 'digest': digest, + 'background_color': colour_to_color(background_colour), + 'foreground_color': colour_to_color(foreground_colour), + } diff --git a/anonstream/helpers/user.py b/anonstream/helpers/user.py index ea6944c..ce67a10 100644 --- a/anonstream/helpers/user.py +++ b/anonstream/helpers/user.py @@ -5,6 +5,8 @@ from math import inf from quart import current_app +from anonstream.utils.colour import generate_colour, colour_to_color + CONFIG = current_app.config def generate_token_hash(token): @@ -12,13 +14,18 @@ def generate_token_hash(token): digest = hashlib.sha256(parts).digest() return base64.b32encode(digest)[:26].lower().decode() -def generate_user(secret, token, broadcaster, timestamp): +def generate_user(token, broadcaster, timestamp): + colour = generate_colour( + seed='name\0' + token, + bg=CONFIG['CHAT_BACKGROUND_COLOUR'], + contrast=4.53, + ) return { 'token': token, 'token_hash': generate_token_hash(token), 'broadcaster': broadcaster, 'name': None, - 'color': '#c7007f', + 'color': colour_to_color(colour), 'tripcode': None, 'notices': OrderedDict(), 'seen': { diff --git a/anonstream/routes/nojs.py b/anonstream/routes/nojs.py index 9dd6f5d..022fde9 100644 --- a/anonstream/routes/nojs.py +++ b/anonstream/routes/nojs.py @@ -1,11 +1,13 @@ -from quart import current_app, request, render_template, redirect, url_for +from quart import current_app, request, render_template, redirect, url_for, escape, Markup from anonstream.stream import get_stream_title -from anonstream.user import add_notice, pop_notice +from anonstream.user import add_notice, pop_notice, change_name, change_color, change_tripcode, delete_tripcode, BadAppearance from anonstream.chat import add_chat_message, Rejected from anonstream.routes.wrappers import with_user_from +from anonstream.wrappers import try_except_log from anonstream.helpers.user import get_default_name from anonstream.utils.chat import generate_nonce +from anonstream.utils.user import concatenate_for_notice @current_app.route('/info.html') @with_user_from(request) @@ -31,11 +33,13 @@ async def nojs_chat(user): @with_user_from(request) async def nojs_form(user): notice_id = request.args.get('notice', type=int) + notice, verbose = pop_notice(user, notice_id) prefer_chat_form = request.args.get('landing') != 'appearance' return await render_template( 'nojs_form.html', user=user, - notice=pop_notice(user, notice_id), + notice=notice, + verbose=verbose, prefer_chat_form=prefer_chat_form, nonce=generate_nonce(), default_name=get_default_name(user), @@ -63,9 +67,47 @@ async def nojs_submit_message(user): else: notice_id = None - return redirect(url_for('nojs_form', token=user['token'], notice=notice_id)) + return redirect(url_for('nojs_form', token=user['token'], landing='chat', notice=notice_id)) @current_app.post('/chat/appearance') @with_user_from(request) async def nojs_submit_appearance(user): - pass + form = await request.form + name = form.get('name', '') or None + color = form.get('color', '') + password = form.get('password', '') + want_delete_tripcode = form.get('clear-tripcode', type=bool) + want_change_tripcode = form.get('set-tripcode', type=bool) + + errors = [] + def try_(f, *args, **kwargs): + return try_except_log(errors, BadAppearance)(f)(*args, **kwargs) + + try_(change_name, user, name, dry_run=True) + try_(change_color, user, color, dry_run=True) + if want_delete_tripcode: + pass + elif want_change_tripcode: + try_(change_tripcode, user, password, dry_run=True) + + if errors: + notice = Markup('
').join( + concatenate_for_notice(*error.args) for error in errors + ) + else: + change_name(user, name) + change_color(user, color) + if want_delete_tripcode: + delete_tripcode(user) + elif want_change_tripcode: + change_tripcode(user, password) + + notice = 'Changed appearance' + + notice_id = add_notice(user, notice, verbose=len(errors) > 1) + return redirect(url_for( + 'nojs_form', + token=user['token'], + landing='appearance' if errors else 'chat', + notice=notice_id, + )) diff --git a/anonstream/templates/nojs_form.html b/anonstream/templates/nojs_form.html index 8c03725..f9615ea 100644 --- a/anonstream/templates/nojs_form.html +++ b/anonstream/templates/nojs_form.html @@ -30,7 +30,7 @@ text-decoration: underline; } .tripcode { - padding: 0 4px; + padding: 0 5px; border-radius: 7px; font-family: monospace; cursor: default; @@ -40,7 +40,6 @@ } #tripcode { cursor: pointer; - margin-right: 4px; } .x { font-size: 14pt; @@ -61,6 +60,9 @@ font-size: 18pt; line-height: 1.25; } + #notice.verbose h1 { + font-size: 14pt; + } #chat-form, #appearance-form { padding: 0 var(--padding-size) var(--padding-size) var(--padding-size); @@ -110,7 +112,7 @@ #password-column { display: grid; grid-template-columns: auto auto 1fr; - grid-gap: 0.25rem; + grid-gap: 0.375rem; align-items: center; } #appearance-form label:not(.tripcode):not(.x) { @@ -181,20 +183,20 @@
{% if notice != none %} - +

{{ notice }}

Click to dismiss
{% endif %}
- +
- + @@ -204,13 +206,13 @@ (no tripcode) {% else %} - +
(cleared)
{% endif %} - +
diff --git a/anonstream/user.py b/anonstream/user.py index b7fdbeb..f2ff6b7 100644 --- a/anonstream/user.py +++ b/anonstream/user.py @@ -5,24 +5,67 @@ from quart import current_app from anonstream.wrappers import with_timestamp, with_first_argument from anonstream.helpers.user import is_visible +from anonstream.helpers.tripcode import generate_tripcode +from anonstream.utils.colour import color_to_colour, get_contrast, NotAColor from anonstream.utils.user import user_for_websocket -from anonstream.utils import listmap CONFIG = current_app.config -def add_notice(user, notice): +class BadAppearance(Exception): + pass + +def add_notice(user, notice, verbose=False): notice_id = time.time_ns() // 1_000_000 - user['notices'][notice_id] = notice + user['notices'][notice_id] = (notice, verbose) if len(user['notices']) > CONFIG['MAX_NOTICES']: user['notices'].popitem(last=False) return notice_id def pop_notice(user, notice_id): try: - notice = user['notices'].pop(notice_id) + notice, verbose = user['notices'].pop(notice_id) except KeyError: - notice = None - return notice + notice, verbose = None, False + return notice, verbose + +def change_name(user, name, dry_run=False): + if dry_run: + if name is not None: + if len(name) == 0: + raise BadAppearance('Name was empty') + if len(name) > 24: + raise BadAppearance('Name exceeded 24 chars') + else: + user['name'] = name + +def change_color(user, color, dry_run=False): + if dry_run: + try: + colour = color_to_colour(color) + except NotAColor: + raise BadAppearance('Invalid CSS color') + contrast = get_contrast( + CONFIG['CHAT_BACKGROUND_COLOUR'], + colour, + ) + min_contrast = CONFIG['CHAT_NAME_MIN_CONTRAST'] + if contrast < min_contrast: + raise BadAppearance( + 'Colour had insufficient contrast:', + (f'{contrast:.2f}', f'/{min_contrast}'), + ) + else: + user['color'] = color + +def change_tripcode(user, password, dry_run=False): + if dry_run: + if len(password) > 1024: + raise BadAppearance('Password exceeded 1024 chars') + else: + user['tripcode'] = generate_tripcode(password) + +def delete_tripcode(user): + user['tripcode'] = None def see(user): user['seen']['last'] = int(time.time()) @@ -44,7 +87,7 @@ def sunset(messages, users): global last_checkup timestamp = int(time.time()) - if timestamp - last_checkup < CONFIG['USER_CHECKUP_PERIOD']: + if timestamp - last_checkup < CONFIG['CHECKUP_PERIOD_USER']: return [] to_delete = [] diff --git a/anonstream/utils/__init__.py b/anonstream/utils/__init__.py deleted file mode 100644 index cdd4478..0000000 --- a/anonstream/utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def listmap(*args, **kwargs): - return list(map(*args, **kwargs)) diff --git a/anonstream/utils/colour.py b/anonstream/utils/colour.py new file mode 100644 index 0000000..1290d85 --- /dev/null +++ b/anonstream/utils/colour.py @@ -0,0 +1,192 @@ +import re +import random + +class NotAColor(Exception): + pass + +RE_COLOR = re.compile( + r'^#(?P[0-9a-fA-F]{2})(?P[0-9a-fA-F]{2})(?P[0-9a-fA-F]{2})$' +) + +def color_to_colour(color): + match = RE_COLOR.match(color) + if not match: + raise NotAColor + return ( + int(match.group('red'), 16), + int(match.group('green'), 16), + int(match.group('blue'), 16), + ) + +def colour_to_color(colour): + red, green, blue = colour + return f'#{red:02x}{green:02x}{blue:02x}' + +def dot(a, b): + ''' + Dot product. + ''' + return sum(i * j for i, j in zip(a, b, strict=True)) + +def _sc_to_tc(sc): + ''' + The transformation on [0,1] (from an s-component to a t-component) + defined at https://www.w3.org/TR/WCAG21/#dfn-relative-luminance. + ''' + if sc < 0.03928: + tc = sc / 12.92 + else: + tc = pow((sc + 0.055) / 1.055, 2.4) + return tc + +def _tc_to_sc(tc): + ''' + Almost-inverse of _sc_to_tc. + + The function _sc_to_tc is not injective (because of the discontinuity at + sc=0.03928), thus it has no true inverse. In this implementation, whenever + for a given `tc` there are two distinct values of `sc` such that + sc_to_tc(`sc`)=`tc`, the smaller sc is chosen. (The smaller one is less + expensive to compute). + ''' + sc = tc * 12.92 + if sc >= 0.03928: + sc = pow(tc, 1 / 2.4) * 1.055 - 0.055 + return sc + +def get_relative_luminance(colour): + ''' + Take a colour and return its relative luminance. + + https://www.w3.org/TR/WCAG21/#dfn-relative-luminance + ''' + s = map(lambda sc: sc / 255, colour) + t = map(_sc_to_tc, s) + return dot((0.2126, 0.7152, 0.0722), t) + +def get_colour(t): + ''' + Take a 3-tuple of channels `t` and return an approximation of a colour + that when fed into get_relative_luminance would internally cause the + the variable named "t" to have a value equal to `t`. + ''' + s = map(_tc_to_sc, t) + colour = map(lambda sc: round(sc * 255), s) + return tuple(colour) + +def get_contrast(bg, fg): + ''' + Return the contrast ratio between two colours `bg` and `fg`. + + https://www.w3.org/TR/WCAG21/#dfn-contrast-ratio + ''' + lumas = ( + get_relative_luminance(bg), + get_relative_luminance(fg), + ) + return (max(lumas) + 0.05) / (min(lumas) + 0.05) + +def generate_colour(seed, bg, contrast=4.5, lighter=True): + ''' + Generate a random colour with given contrast to `bg`. + + Channels of `t` are uniformly distributed. No characteristics of the + returned colour are guaranteed to be chosen uniformly from the space of + possible values. + + If `lighter` is true, the returned colour is forced to have a higher + relative luminance than `bg`. This is fine if `bg` is dark; if `bg` is + not dark, the space of possible returned colours will be a lot smaller + (and might be empty). If `lighter` is false, the returned colour is + forced to have a lower relative luminance than `bg`. + + It's simple to calculate the maximum possible contrast between `bg` and + any other colour. (The minimum contrast is always 1.) + + >>> bg = (0x23, 0x23, 0x27) + >>> luma = get_relative_luminance(bg) + >>> (luma + 0.05) / 0.05 # maximum contrast for colours with smaller luma + 1.3411743495243844 + >>> 1.05 / (luma + 0.05) # maximum contrast for colours with greater luma + 15.657919499763137 + + There are values of `contrast` for which the space of possible returned + colours is empty. For example a `contrast` greater than 21 is always + impossible, but the exact upper bound depends on `bg`. The desired + relative luminance of the returned colour must exist in the interval [0,1]. + The formula for desired luma is given below. + + >>> bg_luma = get_relative_luminance(bg) + >>> desired_luma = ( + ... contrast * (bg_luma + 0.05) - 0.05 + ... if lighter else + ... (bg_luma + 0.05) / contrast - 0.05 + ... ) + >>> 0 <= desired_luma <= 1 + True + ''' + r = random.Random(seed) + + if lighter: + desired_luma = contrast * (get_relative_luminance(bg) + 0.05) - 0.05 + else: + desired_luma = (get_relative_luminance(bg) + 0.05) / contrast - 0.05 + + V = (0.2126, 0.7152, 0.0722) + indices = [0, 1, 2] + r.shuffle(indices) + i, j, k = indices + + # V[i] * ci + V[j] * 0 + V[k] * 0 <= desired_luma + # V[i] * ci + V[j] * 1 + V[k] * 1 >= desired_luma + ci_upper = (desired_luma - V[j] * 0 - V[k] * 0) / V[i] + ci_lower = (desired_luma - V[j] * 1 - V[k] * 1) / V[i] + ci = r.uniform(max(0, ci_lower), min(1, ci_upper)) + + # V[i] * ci + V[j] * cj + V[k] * 0 <= desired_luma + # V[i] * ci + V[j] * cj + V[k] * 1 >= desired_luma + cj_upper = (desired_luma - V[i] * ci - V[k] * 0) / V[j] + cj_lower = (desired_luma - V[i] * ci - V[k] * 1) / V[j] + cj = r.uniform(max(0, cj_lower), min(1, cj_upper)) + + # V[i] * ci + V[j] * cj + V[k] * ck = desired_luma + ck = (desired_luma - V[i] * ci - V[j] * cj) / V[k] + + t = [None, None, None] + t[i], t[j], t[k] = ci, cj, ck + + s = map(_tc_to_sc, t) + colour = map(lambda sc: round(sc * 255), s) + return tuple(colour) + +def get_maximum_contrast(bg, lighter=True): + ''' + Return the maximum possible contrast between `bg` and any other lighter + or darker colour. + + If `lighter` is true, restrict to the set of colours whose relative + luminance is greater than `bg`'s. + + If `lighter` is false, restrict to the set of colours whose relative + luminance is greater than `bg`'s. + ''' + luma = get_relative_luminance(bg) + if lighter: + max_contrast = 1.05 / (luma + 0.05) + else: + max_contrast = (luma + 0.05) / 0.05 + return max_contrast + +def generate_maximum_contrast_colour(seed, bg, proportion_of_max=31/32): + max_lighter_contrast = get_maximum_contrast(bg, lighter=True) + max_darker_contrast = get_maximum_contrast(bg, lighter=False) + + max_contrast = max(max_lighter_contrast, max_darker_contrast) + colour = generate_colour( + seed, + bg, + contrast=max_contrast * proportion_of_max, + lighter=max_lighter_contrast > max_darker_contrast, + ) + + return colour diff --git a/anonstream/utils/user.py b/anonstream/utils/user.py index 9ab8fb7..4f609c2 100644 --- a/anonstream/utils/user.py +++ b/anonstream/utils/user.py @@ -4,6 +4,8 @@ import secrets from collections import OrderedDict from math import inf +from quart import escape, Markup + def generate_token(): return secrets.token_hex(16) @@ -12,3 +14,14 @@ def user_for_websocket(user, include_token_hash=True): if include_token_hash: keys.append('token_hash') return {key: user[key] for key in keys} + +def concatenate_for_notice(string, *tuples): + if not tuples: + return string + markup = Markup( + ''.join( + f' {escape(x)}{escape(y)}' + for x, y in tuples + ) + ) + return string + markup diff --git a/anonstream/websocket.py b/anonstream/websocket.py index e9d998b..d1bd247 100644 --- a/anonstream/websocket.py +++ b/anonstream/websocket.py @@ -7,7 +7,6 @@ from anonstream.chat import broadcast, add_chat_message, Rejected from anonstream.user import users_for_websocket, see from anonstream.wrappers import with_first_argument from anonstream.helpers.user import is_present -from anonstream.utils import listmap from anonstream.utils.chat import generate_nonce, message_for_websocket from anonstream.utils.websocket import parse_websocket_data, Malformed @@ -19,10 +18,10 @@ async def websocket_outbound(queue, messages, users): 'nonce': generate_nonce(), 'title': get_stream_title(), 'uptime': get_stream_uptime(), - 'chat': listmap( + 'chat': list(map( with_first_argument(users)(message_for_websocket), messages, - ), + )), 'users': users_for_websocket(messages, users), 'default': { True: CONFIG['DEFAULT_HOST_NAME'], diff --git a/anonstream/wrappers.py b/anonstream/wrappers.py index a9fcc52..6a6cf94 100644 --- a/anonstream/wrappers.py +++ b/anonstream/wrappers.py @@ -18,3 +18,16 @@ def with_first_argument(x): return wrapper return with_x + +def try_except_log(errors, exception_class): + def try_except_log_specific(f): + @wraps(f) + def wrapper(*args, **kwargs): + try: + return f(*args, **kwargs) + except exception_class as e: + errors.append(e) + + return wrapper + + return try_except_log_specific diff --git a/config.toml b/config.toml index 03d1361..7107803 100644 --- a/config.toml +++ b/config.toml @@ -10,9 +10,15 @@ segments_dir = "stream/" broadcaster = "Broadcaster" anonymous = "Anonymous" -[limits] +[chat] +max_comment_length = 512 +max_name_length = 24 +min_name_contrast = 3.0 +background_color = "#232327" + +[memory] notices = 32 -chat_storage = 8192 +chat_messages = 8192 chat_scrollback = 256 [ratelimits]