anonstream/website/chat.py

284 行
10 KiB
Python

import secrets
import time
import unicodedata
import re
from flask import escape, Markup
from collections import deque
from datetime import datetime
import website.utils.captcha as captcha_util
import website.utils.tripcode as tripcode
import website.viewership as viewership
from website.constants import CONFIG, BROADCASTER_TOKEN, MESSAGE_MAX_LENGTH, CHAT_MAX_STORAGE, CHAT_TIMEOUT, FLOOD_PERIOD, FLOOD_THRESHOLD, CAPTCHA_LIFETIME, \
NOTES, N_NONE, N_TOKEN_EMPTY, N_MESSAGE_EMPTY, N_MESSAGE_LONG, N_BANNED, N_TOOFAST, N_FLOOD, N_CAPTCHA_MISSING, N_CAPTCHA_WRONG, N_CAPTCHA_USED, N_CAPTCHA_EXPIRED, N_CAPTCHA_RANDOM, N_CONFIRM, N_WORDFILTER, N_WORDFILTER_BAN, N_APPEAR_OK, N_APPEAR_FAIL
messages = deque() # messages are stored from most recent on the left to least recent on the right
captchas = {} # captchas that have been used already
viewers = viewership.viewers
nonces = {}
# TODO: PMs / whispers (?)
def behead_chat():
while len(messages) > CHAT_MAX_STORAGE:
messages.pop()
def new_nonce():
now = int(time.time())
nonce = secrets.token_hex(6)
remove_expired_nonces()
nonces[nonce] = now
return nonce
def remove_expired_captchas():
now = int(time.time())
to_pop = []
for ciphertext in captchas:
timestamp = captchas[ciphertext]
if timestamp < now - CAPTCHA_LIFETIME:
to_pop.append(ciphertext)
for ciphertext in to_pop:
try:
captchas.pop(ciphertext)
except KeyError:
pass
def remove_expired_nonces():
now = int(time.time())
to_pop = []
for nonce in nonces:
timestamp = nonces[nonce]
if timestamp < now - CAPTCHA_LIFETIME:
to_pop.append(nonce)
for nonce in to_pop:
try:
nonces.pop(nonce)
except KeyError:
pass
def _comment(text, token, c_response, c_ciphertext, nonce):
# TODO: if multiple errors, give out the least annoying one, e.g. N_CAPTCHA_MISSING is far more annoying than N_MESSAGE_EMPTY
now = int(time.time())
# check captcha
if not viewers[token]['verified']:
if c_response and c_ciphertext:
try:
c_timestamp = captcha_util.get_creation_time(c_ciphertext, c_response)
# captcha answer is incorrect
except captcha_util.Incorrect as e:
c_timestamp = e.args[0]
remove_expired_captchas()
captchas[c_ciphertext] = c_timestamp
return N_CAPTCHA_WRONG
# captcha is not genuine
except captcha_util.FakeCiphertext:
return N_CAPTCHA_MISSING
# captcha has expired
if c_timestamp + CAPTCHA_LIFETIME < now:
remove_expired_captchas()
return N_CAPTCHA_EXPIRED
# captcha has been used already
if c_ciphertext in captchas:
return N_CAPTCHA_USED
# captcha was correct, everything is fine
remove_expired_captchas()
captchas[c_ciphertext] = c_timestamp
else:
return N_CAPTCHA_MISSING
if not token:
return N_TOKEN_EMPTY
if not text:
# some people may think that you can type in the captcha first to get
# rid of it before sending any message; this enables that behaviour
if not viewers[token]['verified']:
viewers[token]['verified'] = True
return N_NONE
return N_MESSAGE_EMPTY
if len(text) >= MESSAGE_MAX_LENGTH:
return N_MESSAGE_LONG
viewership.setdefault(token)
# remove record of old comments
for t in viewers[token]['recent_comments'].copy():
if t < now - FLOOD_PERIOD:
viewers[token]['recent_comments'].remove(t)
if viewers[token]['banned']:
return N_BANNED
# check that the commenter hasn't accidentally sent the same request twice
remove_expired_nonces()
try:
nonces.pop(nonce)
except KeyError:
return N_CONFIRM
# don't ratelimit the broadcaster
if viewers[token]['broadcaster']:
pass
elif secrets.randbelow(50) == 0:
viewers[token]['verified'] = False
return N_CAPTCHA_RANDOM
elif now < viewers[token]['last_comment'] + CHAT_TIMEOUT:
return N_TOOFAST
elif len(viewers[token]['recent_comments']) + 1 >= FLOOD_THRESHOLD:
viewers[token]['verified'] = False
return N_FLOOD
# apply word filters
markup = escape(text)
filters = CONFIG['chat']['filters']
reaction = None
# ... unless it's the broadcaster talking
if token == BROADCASTER_TOKEN and filters['tyranny']:
note = N_NONE
else:
for key in ['censor', 'block', 'ban']:
for word in filters[key]:
word = escape(word) # escape for html
if word.startswith('re:'):
regex = word[3:]
else:
word = re.escape(word) # escape for regex
regex = r'\b{}\b'.format(word)
markup, n = re.subn(regex, '<b class="censored">[CENSORED]</b>', markup, flags=re.IGNORECASE if filters['ignore_case'] else 0)
if n: reaction = key
# enact consequences of word filters
note = N_NONE
if reaction == 'block':
note = N_WORDFILTER
elif reaction == 'ban' and token == BROADCASTER_TOKEN:
note = N_WORDFILTER
elif reaction == 'ban':
note = N_WORDFILTER_BAN
viewers[token]['banned'] = True
dt = datetime.utcfromtimestamp(now)
messages.appendleft({'nomarkup': text,
'markup': Markup(markup),
'viewer': viewers[token],
'id': f'{token}-{secrets.token_hex(4)}',
'hidden': reaction == 'block' or reaction == 'ban',
'reaction': reaction,
'time': dt.strftime('%H:%M'),
'timestamp': dt.strftime('%F %T'),
'date': dt.strftime('%F')})
viewers[token]['last_comment'] = now
viewers[token]['recent_comments'].append(now)
viewers[token]['verified'] = True
behead_chat()
return note
def comment(text, token, c_response, c_ciphertext, nonce):
with viewership.lock:
failure_reason = _comment(text, token, c_response, c_ciphertext, nonce)
viewership.setdefault(BROADCASTER_TOKEN)
viewers[BROADCASTER_TOKEN]['verified'] = True
debug_comment_state = 'SUCCEEDED' if failure_reason == N_NONE and text else 'CLEARED CAPTCHA' if failure_reason == N_NONE else f'FAILED with note {NOTES[failure_reason]!r}'
print(f'Comment submission (token={token}, name={viewers[token]["nickname"]!r}, tag={viewers[token]["tag"]}, broadcaster={viewers[token]["broadcaster"]})', debug_comment_state)
return failure_reason
def mod_chat(message_ids, hide, ban, ban_and_purge):
purge = ban_and_purge
ban = ban_and_purge or ban
with viewership.lock:
if ban:
banned = {message_id.split('-')[0] for message_id in message_ids}
for token in banned:
viewers[token]['banned'] = True
for message in messages:
if hide and message['id'] in message_ids:
message['hidden'] = True
if purge and message['viewer']['token'] in banned:
message['hidden'] = True
viewership.setdefault(BROADCASTER_TOKEN)
viewers[BROADCASTER_TOKEN]['banned'] = False
def mod_users(tokens, banned):
with viewership.lock:
for token in tokens:
viewers[token]['banned'] = banned
viewership.setdefault(BROADCASTER_TOKEN)
viewers[BROADCASTER_TOKEN]['banned'] = False
def get_captcha(token):
viewership.setdefault(token)
if viewers[token]['verified']:
return None
c_src, c_ciphertext = captcha_util.gen_captcha()
return {'src': c_src, 'ciphertext': c_ciphertext}
def set_nickname(nickname, token):
viewership.setdefault(token)
nickname = ''.join(char if unicodedata.category(char) != 'Cc' else ' ' for char in nickname).strip()
if len(nickname) > 24:
return N_APPEAR_FAIL, False
if len(nickname) == 0 or nickname == viewership.default_nickname(token):
nickname = None
viewers[token]['nickname'] = nickname
return N_APPEAR_OK, True
def set_tripcode(password, token):
if len(password) > 256:
return N_APPEAR_FAIL, False
viewers[token]['tripcode'] = tripcode.gen_tripcode(password)
return N_APPEAR_OK, True
def remove_tripcode(token):
viewers[token]['tripcode'] = tripcode.default()
return N_APPEAR_OK, True
def decorate(messages):
'''
add extra stuff to a list of messages, e.g. date, chat command responses
'''
def _is_visible(message):
# uncomment the end part once the broadcaster can see hidden comments
return not message['hidden'] # or token == BROADCASTER_TOKEN
# order from least to most recent
messages.reverse()
# TODO: chat commands e.g. !uptime; try to make it so responses always follow the message with the command, so not split over a date separator
# add dates between messages that cross over a day boundary
to_insert = []
previous_message = None
for index, message in enumerate(messages):
if message.get('special') or not _is_visible(message):
continue
if previous_message and message['date'] != previous_message['date']:
to_insert.append(index)
previous_message = message
to_insert.reverse()
for index in to_insert:
messages.insert(index, {'special': 'date', 'content': messages[index]['date']})
# add date at the top if messages span several days
if to_insert:
for index, first_visible_message in enumerate(messages):
if _is_visible(message):
break
else:
index = None
if index != None:
messages.insert(index, {'special': 'date', 'content': first_visible_message['date']})
# revert back to original ordering
messages.reverse()
def viewer_messages_exist(token):
return any(message['viewer']['token'] == token for message in messages)