anonstream/anonstream/routes/wrappers.py

239 行
8.1 KiB
Python

# SPDX-FileCopyrightText: 2022 n9k <https://git.076.ne.jp/ninya9k>
# SPDX-License-Identifier: AGPL-3.0-or-later
import hashlib
import hmac
import re
import string
from functools import wraps
from urllib.parse import quote, unquote
from quart import current_app, request, make_response, render_template, request, url_for, Markup
from werkzeug.exceptions import BadRequest, Unauthorized, Forbidden
from werkzeug.security import check_password_hash
from anonstream.broadcast import broadcast
from anonstream.user import ensure_allowedness, Blacklisted, SecretClub
from anonstream.helpers.user import generate_user
from anonstream.utils.user import generate_token, Presence
from anonstream.wrappers import get_timestamp
CONFIG = current_app.config
MESSAGES = current_app.messages
USERS_BY_TOKEN = current_app.users_by_token
USERS = current_app.users
USERS_UPDATE_BUFFER = current_app.users_update_buffer
TOKEN_ALPHABET = (
string.digits
+ string.ascii_lowercase
+ string.ascii_uppercase
+ string.punctuation
+ ' '
)
RE_TOKEN = re.compile(r'[%s]{1,256}' % re.escape(TOKEN_ALPHABET))
def try_unquote(string):
if string is None:
return None
else:
return unquote(string)
def check_auth(context):
auth = context.authorization
return (
auth is not None
and auth.type == 'basic'
and auth.username == CONFIG['AUTH_USERNAME']
and check_password_hash(CONFIG['AUTH_PWHASH'], auth.password)
)
def auth_required(f):
@wraps(f)
async def wrapper(*args, **kwargs):
if check_auth(request):
return await f(*args, **kwargs)
hint = (
'The broadcaster should log in with the credentials printed in '
'their terminal.'
)
if request.authorization is None:
description = hint
else:
description = Markup(
f'Wrong username or password. Refresh the page to try again. '
f'<br>'
f'{hint}'
)
error = Unauthorized(description)
response = await current_app.handle_http_exception(error)
response = await make_response(response)
response.headers['WWW-Authenticate'] = 'Basic'
return response
return wrapper
def generate_and_add_user(
timestamp, token=None, broadcaster=False, verified=False, headers=None,
):
token = token or generate_token()
user = generate_user(
timestamp=timestamp,
token=token,
broadcaster=broadcaster,
verified=verified,
headers=headers,
)
USERS_BY_TOKEN[token] = user
USERS_UPDATE_BUFFER.add(token)
return user
def with_user_from(context, fallback_to_token=False, ignore_allowedness=False):
def with_user_from_context(f):
@wraps(f)
async def wrapper(*args, **kwargs):
timestamp = get_timestamp()
# Get token
broadcaster = check_auth(context)
token_from_args = context.args.get('token')
token_from_cookie = try_unquote(context.cookies.get('token'))
token_from_context = token_from_args or token_from_cookie
if broadcaster:
token = CONFIG['AUTH_TOKEN']
elif CONFIG['ACCESS_CAPTCHA']:
token = token_from_context
else:
token = token_from_context or generate_token()
# Reject invalid tokens
if isinstance(token, str) and not RE_TOKEN.fullmatch(token):
raise BadRequest(Markup(
f'Your token contains disallowed characters or is too '
f'long. Tokens must match this regular expression: <br>'
f'<code>{RE_TOKEN.pattern}</code>'
))
# Only logged in broadcaster may have the broadcaster's token
if (
not broadcaster
and isinstance(token, str)
and hmac.compare_digest(token, CONFIG['AUTH_TOKEN'])
):
raise Unauthorized(Markup(
f"You are using the broadcaster's token but you are "
f"not logged in. The broadcaster should "
f"<a href=\"{url_for('login')}\" target=\"_top\">"
f"click here"
f"</a> "
f"and log in with the credentials printed in their "
f"terminal when they started anonstream."
))
# Create response
user = USERS_BY_TOKEN.get(token)
if CONFIG['ACCESS_CAPTCHA'] and not broadcaster:
if user is not None:
user['last']['seen'] = timestamp
user['headers'] = tuple(context.headers)
if not ignore_allowedness:
assert_allowedness(timestamp, user)
if user is not None and user['verified'] is not None:
response = await f(timestamp, user, *args, **kwargs)
elif fallback_to_token:
#assert not broadcaster
response = await f(timestamp, token, *args, **kwargs)
else:
raise Forbidden(Markup(
f"You have not solved the access captcha. "
f"<a href=\"{url_for('home', token=token)}\" target=\"_top\">"
f"Click here."
f"</a>"
))
else:
if user is not None:
user['last']['seen'] = timestamp
user['headers'] = tuple(context.headers)
else:
user = generate_and_add_user(
timestamp,
token,
broadcaster,
headers=tuple(context.headers),
)
if not ignore_allowedness:
assert_allowedness(timestamp, user)
response = await f(timestamp, user, *args, **kwargs)
# Set cookie
if token_from_cookie != token:
response = await make_response(response)
response.headers['Set-Cookie'] = f'token={quote(token)}; path=/'
return response
return wrapper
return with_user_from_context
async def render_template_with_etag(template, deferred_kwargs, **kwargs):
render = await render_template(template, **kwargs)
tag = hashlib.sha256(render.encode()).hexdigest()
etag = f'W/"{tag}"'
if request.if_none_match.contains_weak(tag):
return '', 304, {'ETag': etag}
else:
rendered_template = await render_template(
template,
**deferred_kwargs,
**kwargs,
)
return rendered_template, {'ETag': etag}
def clean_cache_headers(f):
@wraps(f)
async def wrapper(*args, **kwargs):
response = await f(*args, **kwargs)
# Remove Last-Modified
try:
response.headers.pop('Last-Modified')
except KeyError:
pass
# Obfuscate ETag
try:
original_etag = response.headers['ETag']
except KeyError:
pass
else:
parts = CONFIG['SECRET_KEY'] + b'etag\0' + original_etag.encode()
tag = hashlib.sha256(parts).hexdigest()
response.headers['ETag'] = f'"{tag}"'
return response
return wrapper
def etag_conditional(f):
@wraps(f)
async def wrapper(*args, **kwargs):
response = await f(*args, **kwargs)
etag = response.headers.get('ETag')
if etag is not None:
if match := re.fullmatch(r'"(?P<tag>.+)"', etag):
tag = match.group('tag')
if tag in request.if_none_match:
return '', 304, {'ETag': etag}
return response
return wrapper
def assert_allowedness(timestamp, user):
try:
ensure_allowedness(user, timestamp=timestamp)
except Blacklisted as e:
raise Forbidden('You have been blacklisted.')
except SecretClub as e:
raise Forbidden('You have not been whitelisted.')