Allowedness: check in special case route handlers

このコミットが含まれているのは:
n9k 2022-06-25 05:02:02 +00:00
コミット 4a68759806
1個のファイルの変更24行の追加17行の削除

ファイルの表示

@ -4,13 +4,13 @@
import math import math
from quart import current_app, request, render_template, abort, make_response, redirect, url_for, send_from_directory from quart import current_app, request, render_template, abort, make_response, redirect, url_for, send_from_directory
from werkzeug.exceptions import NotFound, TooManyRequests from werkzeug.exceptions import Forbidden, NotFound, TooManyRequests
from anonstream.access import add_failure, pop_failure from anonstream.access import add_failure, pop_failure
from anonstream.captcha import get_captcha_image, get_random_captcha_digest from anonstream.captcha import get_captcha_image, get_random_captcha_digest
from anonstream.segments import segments, StopSendingSegments from anonstream.segments import segments, StopSendingSegments
from anonstream.stream import is_online, get_stream_uptime from anonstream.stream import is_online, get_stream_uptime
from anonstream.user import watching, create_eyes, renew_eyes, EyesException, RatelimitedEyes, TooManyEyes from anonstream.user import watching, create_eyes, renew_eyes, EyesException, RatelimitedEyes, TooManyEyes, ensure_allowedness, Blacklisted, SecretClub
from anonstream.routes.wrappers import with_user_from, auth_required, clean_cache_headers, generate_and_add_user from anonstream.routes.wrappers import with_user_from, auth_required, clean_cache_headers, generate_and_add_user
from anonstream.helpers.captcha import check_captcha_digest, Answer from anonstream.helpers.captcha import check_captcha_digest, Answer
from anonstream.utils.security import generate_csp from anonstream.utils.security import generate_csp
@ -21,25 +21,33 @@ CAPTCHA_SIGNER = current_app.captcha_signer
STATIC_DIRECTORY = current_app.root_path / 'static' STATIC_DIRECTORY = current_app.root_path / 'static'
@current_app.route('/') @current_app.route('/')
@with_user_from(request, fallback_to_token=True) @with_user_from(request, fallback_to_token=True, ignore_allowedness=True)
async def home(timestamp, user_or_token): async def home(timestamp, user_or_token):
match user_or_token: match user_or_token:
case str() | None: case str() | None as token:
failure_id = request.args.get('failure', type=int) failure_id = request.args.get('failure', type=int)
response = await render_template( response = await render_template(
'captcha.html', 'captcha.html',
csp=generate_csp(), csp=generate_csp(),
token=user_or_token, token=token,
digest=get_random_captcha_digest(), digest=get_random_captcha_digest(),
failure=pop_failure(failure_id), failure=pop_failure(failure_id),
) )
case dict(): case dict() as user:
response = await render_template( try:
'home.html', ensure_allowedness(user, timestamp=timestamp)
csp=generate_csp(), except Blacklisted:
user=user_or_token, raise Forbidden('You have been blacklisted.')
version=current_app.version, except SecretClub:
) # TODO allow changing tripcode
raise Forbidden('You have not been whitelisted.')
else:
response = await render_template(
'home.html',
csp=generate_csp(),
user=user,
version=current_app.version,
)
return response return response
@current_app.route('/stream.mp4') @current_app.route('/stream.mp4')
@ -101,11 +109,10 @@ async def captcha(timestamp, user_or_token):
return image, {'Content-Type': 'image/jpeg'} return image, {'Content-Type': 'image/jpeg'}
@current_app.post('/access') @current_app.post('/access')
@with_user_from(request, fallback_to_token=True) @with_user_from(request, fallback_to_token=True, ignore_allowedness=True)
async def access(timestamp, user_or_token): async def access(timestamp, user_or_token):
match user_or_token: match user_or_token:
case str() | None: case str() | None as token:
token = user_or_token
form = await request.form form = await request.form
digest = form.get('digest', '') digest = form.get('digest', '')
answer = form.get('answer', '') answer = form.get('answer', '')
@ -122,8 +129,8 @@ async def access(timestamp, user_or_token):
if failure_id is not None: if failure_id is not None:
url = url_for('home', token=token, failure=failure_id) url = url_for('home', token=token, failure=failure_id)
raise abort(redirect(url, 303)) raise abort(redirect(url, 303))
case dict(): case dict() as user:
user = user_or_token pass
url = url_for('home', token=user['token']) url = url_for('home', token=user['token'])
return redirect(url, 303) return redirect(url, 303)