import secrets import time import unicodedata import re from flask import escape, Markup from collections import deque from datetime import datetime import website.utils.captcha as captcha_util import website.utils.tripcode as tripcode import website.viewership as viewership from website.constants import CONFIG, BROADCASTER_TOKEN, MESSAGE_MAX_LENGTH, CHAT_MAX_STORAGE, CHAT_TIMEOUT, FLOOD_PERIOD, FLOOD_THRESHOLD, CAPTCHA_LIFETIME, \ NOTES, N_NONE, N_TOKEN_EMPTY, N_MESSAGE_EMPTY, N_MESSAGE_LONG, N_BANNED, N_TOOFAST, N_FLOOD, N_CAPTCHA_MISSING, N_CAPTCHA_WRONG, N_CAPTCHA_USED, N_CAPTCHA_EXPIRED, N_CAPTCHA_RANDOM, N_CONFIRM, N_WORDFILTER, N_WORDFILTER_BAN, N_APPEAR_OK, N_APPEAR_FAIL messages = deque() # messages are stored from most recent on the left to least recent on the right captchas = {} # captchas that have been used already viewers = viewership.viewers nonces = {} # TODO: PMs / whispers (?) def behead_chat(): while len(messages) > CHAT_MAX_STORAGE: messages.pop() def new_nonce(): now = int(time.time()) nonce = secrets.token_hex(6) remove_expired_nonces() nonces[nonce] = now return nonce def remove_expired_captchas(): now = int(time.time()) to_pop = [] for ciphertext in captchas: timestamp = captchas[ciphertext] if timestamp < now - CAPTCHA_LIFETIME: to_pop.append(ciphertext) for ciphertext in to_pop: try: captchas.pop(ciphertext) except KeyError: pass def remove_expired_nonces(): now = int(time.time()) to_pop = [] for nonce in nonces: timestamp = nonces[nonce] if timestamp < now - CAPTCHA_LIFETIME: to_pop.append(nonce) for nonce in to_pop: try: nonces.pop(nonce) except KeyError: pass def _comment(text, token, c_response, c_ciphertext, nonce): # TODO: if multiple errors, give out the least annoying one, e.g. N_CAPTCHA_MISSING is far more annoying than N_MESSAGE_EMPTY now = int(time.time()) # check captcha if not viewers[token]['verified']: if c_response and c_ciphertext: try: c_timestamp = captcha_util.get_creation_time(c_ciphertext, c_response) # captcha answer is incorrect except captcha_util.Incorrect as e: c_timestamp = e.args[0] remove_expired_captchas() captchas[c_ciphertext] = c_timestamp return N_CAPTCHA_WRONG # captcha is not genuine except captcha_util.FakeCiphertext: return N_CAPTCHA_MISSING # captcha has expired if c_timestamp + CAPTCHA_LIFETIME < now: remove_expired_captchas() return N_CAPTCHA_EXPIRED # captcha has been used already if c_ciphertext in captchas: return N_CAPTCHA_USED # captcha was correct, everything is fine remove_expired_captchas() captchas[c_ciphertext] = c_timestamp else: return N_CAPTCHA_MISSING if not token: return N_TOKEN_EMPTY if not text: # some people may think that you can type in the captcha first to get # rid of it before sending any message; this enables that behaviour if not viewers[token]['verified']: viewers[token]['verified'] = True return N_NONE return N_MESSAGE_EMPTY if len(text) >= MESSAGE_MAX_LENGTH: return N_MESSAGE_LONG viewership.setdefault(token) # remove record of old comments for t in viewers[token]['recent_comments'].copy(): if t < now - FLOOD_PERIOD: viewers[token]['recent_comments'].remove(t) if viewers[token]['banned']: return N_BANNED # check that the commenter hasn't accidentally sent the same request twice remove_expired_nonces() try: nonces.pop(nonce) except KeyError: return N_CONFIRM # don't ratelimit the broadcaster if viewers[token]['broadcaster']: pass elif secrets.randbelow(50) == 0: viewers[token]['verified'] = False return N_CAPTCHA_RANDOM elif now < viewers[token]['last_comment'] + CHAT_TIMEOUT: return N_TOOFAST elif len(viewers[token]['recent_comments']) + 1 >= FLOOD_THRESHOLD: viewers[token]['verified'] = False return N_FLOOD # apply word filters markup = escape(text) filters = CONFIG['chat']['filters'] reaction = None # ... unless it's the broadcaster talking if token == BROADCASTER_TOKEN and filters['tyranny']: note = N_NONE else: for key in ['censor', 'block', 'ban']: for word in filters[key]: word = escape(word) # escape for html if word.startswith('re:'): regex = word[3:] else: word = re.escape(word) # escape for regex regex = r'\b{}\b'.format(word) markup, n = re.subn(regex, '[CENSORED]', markup, flags=re.IGNORECASE if filters['ignore_case'] else 0) if n: reaction = key # enact consequences of word filters note = N_NONE if reaction == 'block': note = N_WORDFILTER elif reaction == 'ban' and token == BROADCASTER_TOKEN: note = N_WORDFILTER elif reaction == 'ban': note = N_WORDFILTER_BAN viewers[token]['banned'] = True dt = datetime.utcfromtimestamp(now) messages.appendleft({'nomarkup': text, 'markup': Markup(markup), 'viewer': viewers[token], 'id': f'{token}-{secrets.token_hex(4)}', 'hidden': reaction == 'block' or reaction == 'ban', 'reaction': reaction, 'time': dt.strftime('%H:%M'), 'timestamp': dt.strftime('%F %T'), 'date': dt.strftime('%F')}) viewers[token]['last_comment'] = now viewers[token]['recent_comments'].append(now) viewers[token]['verified'] = True behead_chat() return note def comment(text, token, c_response, c_ciphertext, nonce): with viewership.lock: failure_reason = _comment(text, token, c_response, c_ciphertext, nonce) viewership.setdefault(BROADCASTER_TOKEN) viewers[BROADCASTER_TOKEN]['verified'] = True debug_comment_state = 'SUCCEEDED' if failure_reason == N_NONE and text else 'CLEARED CAPTCHA' if failure_reason == N_NONE else f'FAILED with note {NOTES[failure_reason]!r}' print(f'Comment submission (token={token}, name={viewers[token]["nickname"]!r}, tag={viewers[token]["tag"]}, broadcaster={viewers[token]["broadcaster"]})', debug_comment_state) return failure_reason def mod_chat(message_ids, hide, ban, ban_and_purge): purge = ban_and_purge ban = ban_and_purge or ban with viewership.lock: if ban: banned = {message_id.split('-')[0] for message_id in message_ids} for token in banned: viewers[token]['banned'] = True for message in messages: if hide and message['id'] in message_ids: message['hidden'] = True if purge and message['viewer']['token'] in banned: message['hidden'] = True viewership.setdefault(BROADCASTER_TOKEN) viewers[BROADCASTER_TOKEN]['banned'] = False def mod_users(tokens, banned): with viewership.lock: for token in tokens: viewers[token]['banned'] = banned viewership.setdefault(BROADCASTER_TOKEN) viewers[BROADCASTER_TOKEN]['banned'] = False def get_captcha(token): viewership.setdefault(token) if viewers[token]['verified']: return None c_src, c_ciphertext = captcha_util.gen_captcha() return {'src': c_src, 'ciphertext': c_ciphertext} def set_nickname(nickname, token): viewership.setdefault(token) nickname = ''.join(char if unicodedata.category(char) != 'Cc' else ' ' for char in nickname).strip() if len(nickname) > 24: return N_APPEAR_FAIL, False if len(nickname) == 0 or nickname == viewership.default_nickname(token): nickname = None viewers[token]['nickname'] = nickname return N_APPEAR_OK, True def set_tripcode(password, token): if len(password) > 256: return N_APPEAR_FAIL, False viewers[token]['tripcode'] = tripcode.gen_tripcode(password) return N_APPEAR_OK, True def remove_tripcode(token): viewers[token]['tripcode'] = tripcode.default() return N_APPEAR_OK, True def decorate(messages): ''' add extra stuff to a list of messages, e.g. date, chat command responses ''' def _is_visible(message): # uncomment the end part once the broadcaster can see hidden comments return not message['hidden'] # or token == BROADCASTER_TOKEN # order from least to most recent messages.reverse() # TODO: chat commands e.g. !uptime; try to make it so responses always follow the message with the command, so not split over a date separator # add dates between messages that cross over a day boundary to_insert = [] previous_message = None for index, message in enumerate(messages): if message.get('special') or not _is_visible(message): continue if previous_message and message['date'] != previous_message['date']: to_insert.append(index) previous_message = message to_insert.reverse() for index in to_insert: messages.insert(index, {'special': 'date', 'content': messages[index]['date']}) # add date at the top if messages span several days if to_insert: for index, first_visible_message in enumerate(messages): if _is_visible(message): break else: index = None if index != None: messages.insert(index, {'special': 'date', 'content': first_visible_message['date']}) # revert back to original ordering messages.reverse() def viewer_messages_exist(token): return any(message['viewer']['token'] == token for message in messages)