コミットを比較

...

8 コミット

作成者 SHA1 メッセージ 日付
n9k 36334faf24 Control socket: view and delete eyes 2022-06-14 09:24:41 +00:00
n9k c713bdda0d Control socket: cleanup 2022-06-14 08:54:26 +00:00
n9k bccbbd82e3 Control socket: separate files 2022-06-14 08:54:24 +00:00
n9k 751664d1c4 More sensible variable names in colour generation 2022-06-14 08:50:31 +00:00
n9k 47ee5fe607 Take a range of contrasts for generating colours 2022-06-14 08:50:31 +00:00
n9k 1422bebd8e Require Authorization header for broadcaster
As opposed to just the broadcaster token. This makes the broadcaster
username/password login mandatory, which previously was only mandatory
in the `auth_required` wrapper, but not elsewhere (so for example
leaving comments as the broadcaster was possible with the token only). A
less safe alternative to this would be to compare tokens in `check_auth`
once the Authorization header didn't match.
2022-06-14 08:50:31 +00:00
n9k 6ef3a77465 Explicitly reject weird tokens
Includes really long tokens
2022-06-14 08:50:31 +00:00
n9k 506f91a41b Control socket: escape json whitespace if necessary 2022-06-14 08:49:54 +00:00
16個のファイルの変更497行の追加403行の削除

ファイルの表示

ファイルの表示

44
anonstream/control/commands/__init__.py ノーマルファイル
ファイルの表示

@ -0,0 +1,44 @@
from anonstream.control.commands.help import *
from anonstream.control.commands.exit import *
from anonstream.control.commands.title import *
from anonstream.control.commands.chat import *
from anonstream.control.commands.user import *
METHOD_HELP = 'help'
METHOD_EXIT = 'exit'
METHOD_TITLE = 'title'
METHOD_CHAT = 'chat'
METHOD_USER = 'user'
METHOD_COMMAND_FUNCTIONS = {
METHOD_HELP: {
None: command_help,
'help': command_help_help,
},
METHOD_EXIT: {
None: command_exit,
'help': command_exit_help,
},
METHOD_TITLE: {
None: command_title_show,
'help': command_title_help,
'show': command_title_show,
'set': command_title_set,
},
METHOD_CHAT: {
None: command_chat_help,
'help': command_chat_help,
'delete': command_chat_delete,
},
METHOD_USER: {
None: command_user_show,
'help': command_user_help,
'show': command_user_show,
'attr': command_user_attr,
'get': command_user_get,
'set': command_user_set,
'eyes': command_user_eyes,
},
}

34
anonstream/control/commands/chat.py ノーマルファイル
ファイルの表示

@ -0,0 +1,34 @@
from anonstream.control.exceptions import BadArgument, Incomplete, Garbage
from anonstream.chat import delete_chat_messages
async def command_chat_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: chat {show [MESSAGES] | delete SEQS}\n'
'Commands:\n'
#' chat show [MESSAGES]......show chat messages\n'
' chat delete SEQS..........delete chat messages\n'
'Definitions:\n'
#' MESSAGES..................undefined\n'
' SEQS......................=SEQ [SEQ...]\n'
' SEQ.......................a chat message\'s seq, base-10 integer\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_chat_delete(args):
match args:
case []:
raise Incomplete
case _:
try:
seqs = list(map(int, args))
except ValueError as e:
raise BadArgument('SEQ must be a base-10 integer') from e
delete_chat_messages(seqs)
normal_options = ['delete', *map(str, seqs)]
response = ''
return normal_options, response

20
anonstream/control/commands/exit.py ノーマルファイル
ファイルの表示

@ -0,0 +1,20 @@
from anonstream.control.exceptions import Exit, Incomplete
async def command_exit(args):
match args:
case []:
raise Exit
case [*garbage]:
raise Garbage(garbage)
async def command_exit_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: exit\n'
'close the connection\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response

38
anonstream/control/commands/help.py ノーマルファイル
ファイルの表示

@ -0,0 +1,38 @@
from anonstream.control.exceptions import Garbage
async def command_help(args):
match args:
case []:
normal_options = []
response = (
'Usage: METHOD [COMMAND | help]\n'
'Examples:\n'
' help...........................show this help message\n'
' exit...........................close the control connection\n'
' title [show]...................show the stream title\n'
' title set TITLE................set the stream title\n'
' user [show]....................show a list of users\n'
' user attr USER.................set an attribute of a user\n'
' user get USER ATTR.............set an attribute of a user\n'
' user set USER ATTR VALUE.......set an attribute of a user\n'
#' user kick USERS [FAREWELL].....kick users\n'
' user eyes USER [show]..........show a list of active video responses\n'
' user eyes USER delete EYES_ID..end a video response\n'
#' chat show MESSAGES.............show a list of messages\n'
' chat delete SEQS...............delete a set of messages\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_help_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: help\n'
'show usage syntax and examples\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response

53
anonstream/control/commands/title.py ノーマルファイル
ファイルの表示

@ -0,0 +1,53 @@
import json
from anonstream.control.exceptions import BadArgument, Incomplete, Garbage, Failed
from anonstream.control.utils import json_dumps_contiguous
from anonstream.stream import get_stream_title, set_stream_title
async def command_title_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: title [show | set TITLE]\n'
'Commands:\n'
' title [show].......show the stream title\n'
' title set TITLE....set the stream title to TITLE\n'
'Definitions:\n'
' TITLE..............a json-encoded string, whitespace must be \\uXXXX-escaped\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_title_show(args):
match args:
case []:
normal_options = ['show']
response = json.dumps(await get_stream_title()) + '\n'
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_title_set(args):
match args:
case [title_json]:
try:
title = json.loads(title_json)
except json.JSONDecodeError as e:
raise BadArgument('could not decode json')
else:
if not isinstance(title, str):
raise BadArgument('could not decode json as string')
else:
try:
await set_stream_title(title)
except OSError as e:
raise Failed(str(e)) from e
normal_options = ['set', json_dumps_contiguous(title)]
response = ''
case []:
raise Incomplete
case [_, *garbage]:
raise Garbage(garbage)
return normal_options, response

153
anonstream/control/commands/user.py ノーマルファイル
ファイルの表示

@ -0,0 +1,153 @@
import json
from quart import current_app
from anonstream.control.exceptions import BadArgument, Incomplete, Garbage, Failed
from anonstream.control.utils import json_dumps_contiguous
from anonstream.utils.user import USER_WEBSOCKET_ATTRS
USERS_BY_TOKEN = current_app.users_by_token
USERS = current_app.users
USERS_UPDATE_BUFFER = current_app.users_update_buffer
async def command_user_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: user [show | attr USER | get USER ATTR | set USER ATTR VALUE]\n'
'Commands:\n'
' user [show].......................show all users\' tokens\n'
' user attr USER....................show names of a user\'s attributes\n'
' user get USER ATTR................show an attribute of a user\n'
' user set USER ATTR................set an attribute of a user\n'
' user eyes USER [show].............show a user\'s active video responses\n'
' user eyes USER delete EYES_ID.....end a video response to a user\n'
'Definitions:\n'
#' USER..............................={token TOKEN | hash HASH}\n'
' USER..............................=token TOKEN\n'
' TOKEN..............................a token\n'
#' HASH..............................a token hash\n'
' ATTR...............................a user attribute, re:[a-z0-9_]+\n'
' EYES_ID............................a user\'s eyes_id, base 10 integer\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_show(args):
match args:
case []:
normal_options = ['show']
response = json.dumps(tuple(USERS_BY_TOKEN)) + '\n'
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_attr(args):
match args:
case []:
raise Incomplete
case ['token', token_json]:
try:
token = json.loads(token_json)
except json.JSONDecodeError:
raise BadArgument('could not decode TOKEN as json')
try:
user = USERS_BY_TOKEN[token]
except KeyError:
raise Failed(f"no user exists with token {token!r}, try 'user show'")
normal_options = ['attr', 'token', json_dumps_contiguous(token)]
response = json.dumps(tuple(user.keys())) + '\n'
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_get(args):
match args:
case ['token', token_json, attr]:
try:
token = json.loads(token_json)
except json.JSONDecodeError:
raise BadArgument('could not decode TOKEN as json')
try:
user = USERS_BY_TOKEN[token]
except KeyError:
raise Failed(f"no user exists with token {token!r}, try 'user show'")
try:
value = user[attr]
except KeyError:
raise Failed(f"user has no attribute {attr!r}, try 'user attr token {json_dumps_contiguous(token)}'")
try:
value_json = json.dumps(value)
except TypeError:
raise Failed(f'attribute {attr!r} is not JSON serializable')
normal_options = ['get', 'token', json_dumps_contiguous(token), attr]
response = value_json + '\n'
case []:
raise Incomplete
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_set(args):
match args:
case ['token', token_json, attr, value_json]:
try:
token = json.loads(token_json)
except json.JSONDecodeError:
raise BadArgument('could not decode TOKEN as json')
try:
user = USERS_BY_TOKEN[token]
except KeyError:
raise Failed(f"no user exists with token {token!r}, try 'user show'")
try:
value = user[attr]
except KeyError:
raise Failed(f"user has no attribute {attr!r}, try 'user attr token {json_dumps_contiguous(token)}")
try:
value = json.loads(value_json)
except json.JSONDecodeError:
raise Failed('could not decode json')
user[attr] = value
if attr in USER_WEBSOCKET_ATTRS:
USERS_UPDATE_BUFFER.add(token)
normal_options = ['set', 'token', json_dumps_contiguous(token), attr, json_dumps_contiguous(value)]
response = ''
case []:
raise Incomplete
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_eyes(args):
match args:
case ['token', token_json, *subargs]:
try:
token = json.loads(token_json)
except json.JSONDecodeError:
raise BadArgument('could not decode TOKEN as json')
try:
user = USERS_BY_TOKEN[token]
except KeyError:
raise Failed(f"no user exists with token {token!r}, try 'user show'")
match subargs:
case [] | ['show']:
normal_options = ['eyes', 'token', json_dumps_contiguous(token), 'show']
response = json.dumps(user['eyes']['current']) + '\n'
case ['delete', eyes_id_json]:
try:
eyes_id = json.loads(eyes_id_json)
except json.JSONDecodeError:
raise BadArgument('could not decode EYES_ID as json')
try:
user['eyes']['current'].pop(eyes_id)
except KeyError:
pass
normal_options = ['eyes', 'token', json_dumps_contiguous(token), 'delete', json_dumps_contiguous(eyes_id)]
response = ''
case []:
raise Incomplete
case [*garbage]:
raise Garbage(garbage)
return normal_options, response

23
anonstream/control/exceptions.py ノーマルファイル
ファイルの表示

@ -0,0 +1,23 @@
class Exit(Exception):
pass
class UnknownMethod(Exception):
pass
class UnknownCommand(Exception):
pass
class UnknownArgument(Exception):
pass
class BadArgument(Exception):
pass
class Incomplete(Exception):
pass
class Garbage(Exception):
pass
class Failed(Exception):
pass

61
anonstream/control/parse.py ノーマルファイル
ファイルの表示

@ -0,0 +1,61 @@
import json
from anonstream.control.exceptions import UnknownMethod, UnknownCommand, BadArgument, Incomplete, Garbage, Failed
from anonstream.control.commands import METHOD_COMMAND_FUNCTIONS
async def parse_request(request):
try:
method, *options = request.split()
except ValueError:
normal, response = (None, []), ''
else:
try:
normal, response = await parse(method, options)
except UnknownMethod as e:
unknown_method, *_ = e.args
normal = None
response = f"method {unknown_method!r} is unknown, try 'help'\n"
except UnknownCommand as e:
method, unknown_command, *_ = e.args
normal = None
response = f"command {unknown_command!r} is unknown, try {f'{method} help'!r}\n"
except BadArgument as e:
reason, *_ = e.args
normal = None
response = f"{reason}, try {f'{method} help'!r}\n"
except Incomplete as e:
method, *_ = e.args
normal = None
response = f"command is incomplete, try {f'{method} help'!r}\n"
except Garbage as e:
garbage, *_ = e.args
normal = None
response = f"command has trailing garbage {garbage!r}, try {f'{method} help'!r}\n"
except Failed as e:
reason, *_ = e.args
normal = None
response = reason + '\n'
return normal, response
async def parse(method, options):
try:
command, *args = options
except ValueError:
command, args = None, []
try:
functions = METHOD_COMMAND_FUNCTIONS[method]
except KeyError:
raise UnknownMethod(method)
else:
normal_method = method
try:
fn = functions[command]
except KeyError:
raise UnknownCommand(method, command)
else:
try:
normal_options, response = await fn(args)
except Incomplete as e:
raise Incomplete(method) from e
normal = (normal_method, normal_options)
return normal, response

ファイルの表示

@ -1,39 +1,7 @@
import asyncio
import json
from quart import current_app
from anonstream.chat import delete_chat_messages
from anonstream.stream import get_stream_title, set_stream_title
from anonstream.utils.user import USER_WEBSOCKET_ATTRS
USERS_BY_TOKEN = current_app.users_by_token
USERS = current_app.users
USERS_UPDATE_BUFFER = current_app.users_update_buffer
class UnknownMethod(Exception):
pass
class UnknownCommand(Exception):
pass
class UnknownArgument(Exception):
pass
class BadArgument(Exception):
pass
class Incomplete(Exception):
pass
class Garbage(Exception):
pass
class Failed(Exception):
pass
class Exit(Exception):
pass
from anonstream.control.exceptions import Exit
from anonstream.control.parse import parse_request
def start_control_server_at(address):
return asyncio.start_unix_server(serve_control_client, address)
@ -64,337 +32,3 @@ async def serve_control_client(reader, writer):
writer.write(response.encode())
await writer.drain()
async def parse_request(request):
try:
method, *options = request.split()
except ValueError:
normal, response = (None, []), ''
else:
try:
normal, response = await parse(method, options)
except UnknownMethod as e:
unknown_method, *_ = e.args
normal = None
response = f"method {unknown_method!r} is unknown, try 'help'\n"
except UnknownCommand as e:
method, unknown_command, *_ = e.args
normal = None
response = f"command {unknown_command!r} is unknown, try {f'{method} help'!r}\n"
except BadArgument as e:
reason, *_ = e.args
normal = None
response = f"{reason}, try {f'{method} help'!r}\n"
except Incomplete as e:
method, *_ = e.args
normal = None
response = f"command is incomplete, try {f'{method} help'!r}\n"
except Garbage as e:
garbage, *_ = e.args
normal = None
response = f"command has trailing garbage {garbage!r}, try {f'{method} help'!r}\n"
except Failed as e:
reason, *_ = e.args
normal = None
response = reason + '\n'
return normal, response
async def parse(method, options):
try:
command, *args = options
except ValueError:
command, args = None, []
try:
functions = METHOD_COMMAND_FUNCTIONS[method]
except KeyError:
raise UnknownMethod(method)
else:
normal_method = method
try:
fn = functions[command]
except KeyError:
raise UnknownCommand(method, command)
else:
try:
normal_options, response = await fn(args)
except Incomplete as e:
raise Incomplete(method) from e
normal = (normal_method, normal_options)
return normal, response
async def command_help(args):
match args:
case []:
normal_options = []
response = (
'Usage: METHOD [COMMAND | help]\n'
'Examples:\n'
' help.......................show this help message\n'
' exit.......................close the control connection\n'
' title [show]...............show the stream title\n'
' title set TITLE............set the stream title\n'
' user [show]................show a list of users\n'
' user set USER ATTR VALUE...set an attribute of a user\n'
' user show USERS............show a list of users\n'
' user kick USERS [FAREWELL].kick users\n'
' user eyes USER [show]......show a list of active video responses\n'
' user eyes USER blind IDS...kill a set of video responses\n'
' chat show MESSAGES.........show a list of messages\n'
' chat delete MESSAGES.......delete a set of messages\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_help_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: help\n'
'show usage syntax and examples\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_exit(args):
match args:
case []:
raise Exit
case [*garbage]:
raise Garbage(garbage)
async def command_exit_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: exit\n'
'close the connection\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_title_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: title [show | set TITLE]\n'
'Commands:\n'
' title [show].......show the stream title\n'
' title set TITLE....set the stream title to TITLE\n'
'Definitions:\n'
' TITLE..............a json-encoded string, whitespace must be \\uXXXX-escaped\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_title_show(args):
match args:
case []:
normal_options = ['show']
response = json.dumps(await get_stream_title()) + '\n'
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_title_set(args):
match args:
case [title_json]:
try:
title = json.loads(title_json)
except json.JSONDecodeError as e:
raise BadArgument('could not decode json')
else:
if not isinstance(title, str):
raise BadArgument('could not decode json as string')
else:
try:
await set_stream_title(title)
except OSError as e:
raise Failed(str(e)) from e
normal_options = ['set', json.dumps(title).replace(' ', r'\u0020')]
response = ''
case []:
raise Incomplete
case [_, *garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: user [show | attr USER | get USER ATTR | set USER ATTR VALUE]\n'
'Commands:\n'
' user [show]...........show all users\' tokens\n'
' user attr USER........show names of a user\'s attributes\n'
' user get USER ATTR....show an attribute of a user\n'
' user set USER ATTR....set an attribute of a user\n'
'Definitions:\n'
' USER..................={token TOKEN | hash HASH}\n'
' TOKEN.................a token\n'
' HASH..................a token hash\n'
' ATTR..................a user attribute, re:[a-z0-9_]+\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_show(args):
match args:
case []:
normal_options = ['show']
response = json.dumps(tuple(USERS_BY_TOKEN)) + '\n'
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_attr(args):
match args:
case []:
raise Incomplete
case ['token', token_json]:
try:
token = json.loads(token_json)
except json.JSONDecodeError:
raise BadArgument('could not decode token as json')
try:
user = USERS_BY_TOKEN[token]
except KeyError:
raise Failed(f"no user exists with token {token!r}, try 'user show'")
normal_options = ['attr', 'token', json.dumps(token).replace(' ', r'\u0020')]
response = json.dumps(tuple(user.keys())) + '\n'
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_get(args):
match args:
case ['token', token_json, attr]:
try:
token = json.loads(token_json)
except json.JSONDecodeError:
raise BadArgument('could not decode token as json')
try:
user = USERS_BY_TOKEN[token]
except KeyError:
raise Failed(f"no user exists with token {token!r}, try 'user show'")
try:
value = user[attr]
except KeyError:
raise Failed(f"user has no attribute {attr!r}, try 'user attr token {token}'")
try:
value_json = json.dumps(value)
except TypeError:
raise Failed(f'attribute {attr!r} is not JSON serializable')
normal_options = ['get', 'token', json.dumps(token).replace(' ', r'\u0020'), attr]
response = value_json + '\n'
case []:
raise Incomplete
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_user_set(args):
match args:
case ['token', token_json, attr, value_json]:
try:
token = json.loads(token_json)
except json.JSONDecodeError:
raise BadArgument('could not decode token as json')
try:
user = USERS_BY_TOKEN[token]
except KeyError:
raise Failed(f"no user exists with token {token!r}, try 'user show'")
try:
value = user[attr]
except KeyError:
raise Failed(f"user has no attribute {attr!r}, try 'user attr token {TOKEN}")
try:
value = json.loads(value_json)
except JSON.JSONDecodeError:
raise Failed('could not decode json')
user[attr] = value
if attr in USER_WEBSOCKET_ATTRS:
USERS_UPDATE_BUFFER.add(token)
normal_options = ['set', 'token', json.dumps(token).replace(' ', r'\u0020'), attr, json.dumps(value)]
response = ''
case []:
raise Incomplete
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_chat_help(args):
match args:
case []:
normal_options = ['help']
response = (
'Usage: chat {show [MESSAGES] | delete SEQS}\n'
'Commands:\n'
' chat show [MESSAGES]......show chat messages\n'
' chat delete SEQS..........delete chat messages\n'
'Definitions:\n'
' MESSAGES..................undefined\n'
' SEQS......................=SEQ [SEQ...]\n'
' SEQ.......................a chat message\'s seq, base-10 integer\n'
)
case [*garbage]:
raise Garbage(garbage)
return normal_options, response
async def command_chat_delete(args):
match args:
case []:
raise Incomplete
case _:
try:
seqs = list(map(int, args))
except ValueError as e:
raise BadArgument('SEQ must be a base-10 integer') from e
delete_chat_messages(seqs)
normal_options = ['delete', *map(str, seqs)]
response = ''
return normal_options, response
METHOD_HELP = 'help'
METHOD_EXIT = 'exit'
METHOD_TITLE = 'title'
METHOD_CHAT = 'chat'
METHOD_USER = 'user'
METHOD_COMMAND_FUNCTIONS = {
METHOD_HELP: {
None: command_help,
'help': command_help_help,
},
METHOD_EXIT: {
None: command_exit,
'help': command_exit_help,
},
METHOD_TITLE: {
None: command_title_show,
'help': command_title_help,
'show': command_title_show,
'set': command_title_set,
},
METHOD_CHAT: {
None: command_chat_help,
'help': command_chat_help,
'delete': command_chat_delete,
},
METHOD_USER: {
None: command_user_show,
'help': command_user_help,
'show': command_user_show,
'attr': command_user_attr,
'get': command_user_get,
'set': command_user_set,
},
}

4
anonstream/control/utils.py ノーマルファイル
ファイルの表示

@ -0,0 +1,4 @@
import json
def json_dumps_contiguous(obj, **kwargs):
return json.dumps(obj, **kwargs).replace(' ', '\u0020')

ファイルの表示

@ -38,7 +38,8 @@ def generate_tripcode(password):
background_colour = generate_colour(
seed='tripcode-background\0' + digest,
bg=CONFIG['CHAT_BACKGROUND_COLOUR'],
contrast=5.0,
min_contrast=5.0,
max_contrast=5.0,
)
foreground_colour = generate_maximum_contrast_colour(
seed='tripcode-foreground\0' + digest,

ファイルの表示

@ -26,7 +26,7 @@ def generate_user(timestamp, token, broadcaster, presence):
colour = generate_colour(
seed='name\0' + token,
bg=CONFIG['CHAT_BACKGROUND_COLOUR'],
contrast=4.53,
min_contrast=4.53,
)
token_hash, tag = generate_token_hash_and_tag(token)
return {

ファイルの表示

@ -2,6 +2,9 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import hashlib
import hmac
import re
import string
import time
from functools import wraps
@ -19,6 +22,15 @@ USERS_BY_TOKEN = current_app.users_by_token
USERS = current_app.users
USERS_UPDATE_BUFFER = current_app.users_update_buffer
TOKEN_ALPHABET = (
string.digits
+ string.ascii_lowercase
+ string.ascii_uppercase
+ string.punctuation
+ ' '
)
RE_TOKEN = re.compile(r'[%s]{1,256}' % re.escape(TOKEN_ALPHABET))
def check_auth(context):
auth = context.authorization
return (
@ -68,6 +80,12 @@ def with_user_from(context):
or context.cookies.get('token')
or generate_token()
)
if hmac.compare_digest(token, CONFIG['AUTH_TOKEN']):
raise abort(401)
# Reject invalid tokens
if not RE_TOKEN.fullmatch(token):
raise abort(400)
# Update / create user
user = USERS_BY_TOKEN.get(token)

ファイルの表示

@ -3,6 +3,7 @@
import re
import random
from math import inf
class NotAColor(Exception):
pass
@ -47,7 +48,7 @@ def _tc_to_sc(tc):
Almost-inverse of _sc_to_tc.
The function _sc_to_tc is not injective (because of the discontinuity at
sc=0.03928), thus it has no true inverse. In this implementation, whenever
sc=0.03928), thus it has no true inverse. In this implementation, whenever
for a given `tc` there are two distinct values of `sc` such that
sc_to_tc(`sc`)=`tc`, the smaller sc is chosen. (The smaller one is less
expensive to compute).
@ -89,22 +90,23 @@ def get_contrast(bg, fg):
)
return (max(lumas) + 0.05) / (min(lumas) + 0.05)
def generate_colour(seed, bg, contrast=4.5, lighter=True):
def generate_colour(seed, bg, min_contrast=4.5, max_contrast=inf, lighter=True):
'''
Generate a random colour with given contrast to `bg`.
Generate a random colour with a contrast to `bg` in a given interval.
Channels of `t` are uniformly distributed. No characteristics of the
returned colour are guaranteed to be chosen uniformly from the space of
possible values.
This works by generating an intermediate 3-tuple `t` and transforming it
into the returned colour. Channels of `t` are uniformly distributed, but no
characteristics of the returned colour are guaranteed to be chosen uniformly
from the space of possible values.
If `lighter` is true, the returned colour is forced to have a higher
relative luminance than `bg`. This is fine if `bg` is dark; if `bg` is
not dark, the space of possible returned colours will be a lot smaller
(and might be empty). If `lighter` is false, the returned colour is
forced to have a lower relative luminance than `bg`.
relative luminance than `bg`. This is fine if `bg` is dark; if `bg` is not
dark, the space of possible returned colours will be a lot smaller (and
might be empty). If `lighter` is false, the returned colour is forced to
have a lower relative luminance than `bg`.
It's simple to calculate the maximum possible contrast between `bg` and
any other colour. (The minimum contrast is always 1.)
It's simple to calculate the maximum possible contrast between `bg` and any
other colour. (The minimum contrast is always 1.)
>>> bg = (0x23, 0x23, 0x27)
>>> luma = get_relative_luminance(bg)
@ -113,11 +115,13 @@ def generate_colour(seed, bg, contrast=4.5, lighter=True):
>>> 1.05 / (luma + 0.05) # maximum contrast for colours with greater luma
15.657919499763137
There are values of `contrast` for which the space of possible returned
colours is empty. For example a `contrast` greater than 21 is always
impossible, but the exact upper bound depends on `bg`. The desired
relative luminance of the returned colour must exist in the interval [0,1].
The formula for desired luma is given below.
There are contrast intervals for which the space of possible returned
colours is empty. For example a contrast greater than 21 is always
impossible, but the exact upper bound depends on `bg`. The desired relative
luminance of the returned colour must exist in the interval [0,1]. The
formula for desired luma is given below. This is for one particular
contrast but the same formula can be used twice (once with `min_contrast` and
once with `max_contrast`) to get a range of desired lumas.
>>> bg_luma = get_relative_luminance(bg)
>>> desired_luma = (
@ -131,32 +135,37 @@ def generate_colour(seed, bg, contrast=4.5, lighter=True):
r = random.Random(seed)
if lighter:
desired_luma = contrast * (get_relative_luminance(bg) + 0.05) - 0.05
min_desired_luma = min_contrast * (get_relative_luminance(bg) + 0.05) - 0.05
max_desired_luma = max_contrast * (get_relative_luminance(bg) + 0.05) - 0.05
else:
desired_luma = (get_relative_luminance(bg) + 0.05) / contrast - 0.05
min_desired_luma = (get_relative_luminance(bg) + 0.05) / max_contrast - 0.05
max_desired_luma = (get_relative_luminance(bg) + 0.05) / min_contrast - 0.05
V = (0.2126, 0.7152, 0.0722)
indices = [0, 1, 2]
r.shuffle(indices)
i, j, k = indices
# V[i] * ci + V[j] * 0 + V[k] * 0 <= desired_luma
# V[i] * ci + V[j] * 1 + V[k] * 1 >= desired_luma
ci_upper = (desired_luma - V[j] * 0 - V[k] * 0) / V[i]
ci_lower = (desired_luma - V[j] * 1 - V[k] * 1) / V[i]
ci = r.uniform(max(0, ci_lower), min(1, ci_upper))
# V[i] * ti + V[j] * 0 + V[k] * 0 <= max_desired_luma
# V[i] * ti + V[j] * 1 + V[k] * 1 >= min_desired_luma
ti_upper = (max_desired_luma - V[j] * 0 - V[k] * 0) / V[i]
ti_lower = (min_desired_luma - V[j] * 1 - V[k] * 1) / V[i]
ti = r.uniform(max(0, ti_lower), min(1, ti_upper))
# V[i] * ci + V[j] * cj + V[k] * 0 <= desired_luma
# V[i] * ci + V[j] * cj + V[k] * 1 >= desired_luma
cj_upper = (desired_luma - V[i] * ci - V[k] * 0) / V[j]
cj_lower = (desired_luma - V[i] * ci - V[k] * 1) / V[j]
cj = r.uniform(max(0, cj_lower), min(1, cj_upper))
# V[i] * ti + V[j] * tj + V[k] * 0 <= max_desired_luma
# V[i] * ti + V[j] * tj + V[k] * 1 >= min_desired_luma
tj_upper = (max_desired_luma - V[i] * ti - V[k] * 0) / V[j]
tj_lower = (min_desired_luma - V[i] * ti - V[k] * 1) / V[j]
tj = r.uniform(max(0, tj_lower), min(1, tj_upper))
# V[i] * ci + V[j] * cj + V[k] * ck = desired_luma
ck = (desired_luma - V[i] * ci - V[j] * cj) / V[k]
# V[i] * ti + V[j] * tj + V[k] * tk <= max_desired_luma
# V[i] * ti + V[j] * tj + V[k] * tk >= min_desired_luma
tk_upper = (max_desired_luma - V[i] * ti - V[j] * tj) / V[k]
tk_lower = (min_desired_luma - V[i] * ti - V[j] * tj) / V[k]
tk = r.uniform(max(0, tk_lower), min(1, tk_upper))
t = [None, None, None]
t[i], t[j], t[k] = ci, cj, ck
t[i], t[j], t[k] = ti, tj, tk
s = map(_tc_to_sc, t)
colour = map(lambda sc: round(sc * 255), s)
@ -185,10 +194,12 @@ def generate_maximum_contrast_colour(seed, bg, proportion_of_max=31/32):
max_darker_contrast = get_maximum_contrast(bg, lighter=False)
max_contrast = max(max_lighter_contrast, max_darker_contrast)
practical_max_contrast = max_contrast * proportion_of_max
colour = generate_colour(
seed,
bg,
contrast=max_contrast * proportion_of_max,
min_contrast=practical_max_contrast,
max_contrast=practical_max_contrast,
lighter=max_lighter_contrast > max_darker_contrast,
)