2022-02-15 19:11:53 +09:00
|
|
|
import time
|
2022-02-16 18:55:30 +09:00
|
|
|
from math import inf
|
2022-02-15 19:11:53 +09:00
|
|
|
|
2022-02-14 19:16:09 +09:00
|
|
|
from quart import current_app
|
|
|
|
|
2022-02-18 14:16:54 +09:00
|
|
|
from anonstream.wrappers import try_except_log, with_timestamp
|
2022-02-17 09:20:51 +09:00
|
|
|
from anonstream.helpers.user import is_visible
|
2022-02-20 13:23:32 +09:00
|
|
|
from anonstream.helpers.captcha import check_captcha_digest, Answer
|
2022-02-17 21:51:09 +09:00
|
|
|
from anonstream.helpers.tripcode import generate_tripcode
|
|
|
|
from anonstream.utils.colour import color_to_colour, get_contrast, NotAColor
|
2022-02-20 16:20:43 +09:00
|
|
|
from anonstream.utils.user import get_user_for_websocket
|
2022-02-16 18:55:30 +09:00
|
|
|
|
2022-02-17 09:20:51 +09:00
|
|
|
CONFIG = current_app.config
|
2022-02-18 10:32:34 +09:00
|
|
|
MESSAGES = current_app.messages
|
|
|
|
USERS = current_app.users
|
2022-02-20 13:23:32 +09:00
|
|
|
CAPTCHA_SIGNER = current_app.captcha_signer
|
2022-02-20 16:20:43 +09:00
|
|
|
USERS_UPDATE_BUFFER = current_app.users_update_buffer
|
2022-02-14 19:16:09 +09:00
|
|
|
|
2022-02-20 13:23:32 +09:00
|
|
|
class BadAppearance(ValueError):
|
2022-02-17 21:51:09 +09:00
|
|
|
pass
|
|
|
|
|
2022-02-20 13:23:32 +09:00
|
|
|
class BadCaptcha(ValueError):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def add_state(user, **state):
|
|
|
|
state_id = time.time_ns() // 1_000_000
|
|
|
|
user['state'][state_id] = state
|
|
|
|
while len(user['state']) > CONFIG['MAX_STATES']:
|
|
|
|
user['state'].popitem(last=False)
|
|
|
|
return state_id
|
2022-02-15 19:11:53 +09:00
|
|
|
|
2022-02-20 13:23:32 +09:00
|
|
|
def pop_state(user, state_id):
|
2022-02-15 19:11:53 +09:00
|
|
|
try:
|
2022-02-20 13:23:32 +09:00
|
|
|
state = user['state'].pop(state_id)
|
2022-02-15 19:11:53 +09:00
|
|
|
except KeyError:
|
2022-02-20 13:23:32 +09:00
|
|
|
state = None
|
|
|
|
return state
|
2022-02-17 21:51:09 +09:00
|
|
|
|
2022-02-19 12:14:37 +09:00
|
|
|
def try_change_appearance(user, name, color, password,
|
2022-02-18 14:16:54 +09:00
|
|
|
want_delete_tripcode, want_change_tripcode):
|
|
|
|
errors = []
|
|
|
|
def try_(f, *args, **kwargs):
|
|
|
|
return try_except_log(errors, BadAppearance)(f)(*args, **kwargs)
|
|
|
|
|
|
|
|
try_(change_name, user, name, dry_run=True)
|
|
|
|
try_(change_color, user, color, dry_run=True)
|
|
|
|
if want_delete_tripcode:
|
|
|
|
pass
|
|
|
|
elif want_change_tripcode:
|
|
|
|
try_(change_tripcode, user, password, dry_run=True)
|
|
|
|
|
|
|
|
if not errors:
|
|
|
|
change_name(user, name)
|
|
|
|
change_color(user, color)
|
|
|
|
if want_delete_tripcode:
|
|
|
|
delete_tripcode(user)
|
|
|
|
elif want_change_tripcode:
|
|
|
|
change_tripcode(user, password)
|
|
|
|
|
2022-02-20 16:20:43 +09:00
|
|
|
# Add to the users update buffer
|
|
|
|
USERS_UPDATE_BUFFER.add(user['token'])
|
2022-02-18 14:16:54 +09:00
|
|
|
|
|
|
|
return errors
|
|
|
|
|
2022-02-17 21:51:09 +09:00
|
|
|
def change_name(user, name, dry_run=False):
|
|
|
|
if dry_run:
|
|
|
|
if name is not None:
|
|
|
|
if len(name) == 0:
|
|
|
|
raise BadAppearance('Name was empty')
|
|
|
|
if len(name) > 24:
|
|
|
|
raise BadAppearance('Name exceeded 24 chars')
|
|
|
|
else:
|
|
|
|
user['name'] = name
|
|
|
|
|
|
|
|
def change_color(user, color, dry_run=False):
|
|
|
|
if dry_run:
|
|
|
|
try:
|
|
|
|
colour = color_to_colour(color)
|
|
|
|
except NotAColor:
|
|
|
|
raise BadAppearance('Invalid CSS color')
|
|
|
|
contrast = get_contrast(
|
|
|
|
CONFIG['CHAT_BACKGROUND_COLOUR'],
|
|
|
|
colour,
|
|
|
|
)
|
|
|
|
min_contrast = CONFIG['CHAT_NAME_MIN_CONTRAST']
|
|
|
|
if contrast < min_contrast:
|
|
|
|
raise BadAppearance(
|
|
|
|
'Colour had insufficient contrast:',
|
|
|
|
(f'{contrast:.2f}', f'/{min_contrast}'),
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
user['color'] = color
|
|
|
|
|
|
|
|
def change_tripcode(user, password, dry_run=False):
|
|
|
|
if dry_run:
|
|
|
|
if len(password) > 1024:
|
|
|
|
raise BadAppearance('Password exceeded 1024 chars')
|
|
|
|
else:
|
|
|
|
user['tripcode'] = generate_tripcode(password)
|
|
|
|
|
|
|
|
def delete_tripcode(user):
|
|
|
|
user['tripcode'] = None
|
2022-02-16 18:55:30 +09:00
|
|
|
|
|
|
|
def see(user):
|
2022-02-19 09:35:42 +09:00
|
|
|
user['last']['seen'] = int(time.time())
|
2022-02-16 18:55:30 +09:00
|
|
|
|
|
|
|
@with_timestamp
|
2022-02-20 16:20:43 +09:00
|
|
|
def get_all_users_for_websocket(timestamp):
|
2022-02-16 18:55:30 +09:00
|
|
|
visible_users = filter(
|
2022-02-18 10:32:34 +09:00
|
|
|
lambda user: is_visible(timestamp, MESSAGES, user),
|
|
|
|
USERS,
|
2022-02-16 18:55:30 +09:00
|
|
|
)
|
|
|
|
return {
|
2022-02-20 16:20:43 +09:00
|
|
|
user['token_hash']: get_user_for_websocket(user)
|
2022-02-16 18:55:30 +09:00
|
|
|
for user in visible_users
|
|
|
|
}
|
2022-02-20 13:23:32 +09:00
|
|
|
|
|
|
|
def verify(user, digest, answer):
|
|
|
|
if user['verified']:
|
|
|
|
verification_happened = False
|
|
|
|
else:
|
|
|
|
match check_captcha_digest(CAPTCHA_SIGNER, digest, answer):
|
|
|
|
case Answer.MISSING:
|
|
|
|
raise BadCaptcha('Captcha is required')
|
|
|
|
case Answer.BAD:
|
|
|
|
raise BadCaptcha('Captcha was incorrect')
|
|
|
|
case Answer.EXPIRED:
|
|
|
|
raise BadCaptcha('Captcha has expired')
|
|
|
|
case Answer.OK:
|
|
|
|
user['verified'] = True
|
|
|
|
verification_happened = True
|
|
|
|
|
|
|
|
return verification_happened
|