Setup background tasks, create t_sunset_users task

このコミットが含まれているのは:
n9k 2022-02-19 03:14:37 +00:00
コミット 7058677000
10個のファイルの変更85行の追加61行の削除

ファイルの表示

@ -9,7 +9,7 @@ from anonstream.utils.user import generate_token
from anonstream.utils.colour import color_to_colour
from anonstream.segments import DirectoryCache
async def create_app():
def create_app():
with open('config.toml') as fp:
config = toml.load(fp)
@ -29,8 +29,8 @@ async def create_app():
'MAX_NOTICES': config['memory']['notices'],
'MAX_CHAT_MESSAGES': config['memory']['chat_messages'],
'MAX_CHAT_SCROLLBACK': config['memory']['chat_scrollback'],
'CHECKUP_PERIOD_USER': config['ratelimits']['user_absence'],
'CHECKUP_PERIOD_CAPTCHA': config['ratelimits']['captcha_expiry'],
'CHECKUP_PERIOD_USER': config['intervals']['sunset_users'],
'CHECKUP_PERIOD_CAPTCHA': config['intervals']['expire_captchas'],
'THRESHOLD_USER_NOTWATCHING': config['thresholds']['user_notwatching'],
'THRESHOLD_USER_TENTATIVE': config['thresholds']['user_tentative'],
'THRESHOLD_USER_ABSENT': config['thresholds']['user_absent'],
@ -58,7 +58,15 @@ async def create_app():
app.users = app.users_by_token.values()
app.segments_directory_cache = DirectoryCache(config['stream']['segments_dir'])
async with app.app_context():
@app.while_serving
async def shutdown():
app.shutting_down = False
yield
app.shutting_down = True
@app.before_serving
async def startup():
import anonstream.routes
import anonstream.tasks
return app

ファイルの表示

@ -15,10 +15,10 @@ USERS = current_app.users
class Rejected(Exception):
pass
async def broadcast(users, payload):
def broadcast(users, payload):
for user in users:
for queue in user['websockets']:
await queue.put(payload)
queue.put_nowait(payload)
def messages_for_websocket():
return list(map(
@ -29,7 +29,7 @@ def messages_for_websocket():
get_scrollback(MESSAGES),
))
async def add_chat_message(user, nonce, comment):
def add_chat_message(user, nonce, comment):
# check message
message_id = generate_nonce_hash(nonce)
if message_id in MESSAGES_BY_ID:
@ -69,7 +69,7 @@ async def add_chat_message(user, nonce, comment):
MESSAGES_BY_ID.pop(last=False)
# broadcast message to websockets
await broadcast(
broadcast(
USERS,
payload={
'type': 'chat',

ファイルの表示

@ -59,7 +59,7 @@ async def nojs_submit_message(user):
nonce = form.get('nonce', '')
try:
await add_chat_message(user, nonce, comment)
add_chat_message(user, nonce, comment)
except Rejected as e:
notice, *_ = e.args
notice_id = add_notice(user, notice)
@ -78,7 +78,7 @@ async def nojs_submit_appearance(user):
want_delete_tripcode = form.get('clear-tripcode', type=bool)
want_change_tripcode = form.get('set-tripcode', type=bool)
errors = await try_change_appearance(
errors = try_change_appearance(
user,
name,
color,

ファイルの表示

@ -8,7 +8,7 @@ from anonstream.routes.wrappers import with_user_from
@current_app.websocket('/live')
@with_user_from(websocket)
async def live(user):
queue = asyncio.Queue()
queue = asyncio.Queue(maxsize=0)
user['websockets'].add(queue)
producer = websocket_outbound(queue)

ファイルの表示

@ -5,7 +5,7 @@ from functools import wraps
from quart import current_app, request, abort, make_response, render_template, request
from werkzeug.security import check_password_hash
from anonstream.user import sunset, see, user_for_websocket
from anonstream.user import see, user_for_websocket
from anonstream.chat import broadcast
from anonstream.helpers.user import generate_user
from anonstream.utils.user import generate_token
@ -54,20 +54,6 @@ def with_user_from(context):
else:
token = context.args.get('token') or context.cookies.get('token') or generate_token()
# Remove non-visible absent users
sunsetted_token_hashes = sunset(
messages=MESSAGES,
users_by_token=USERS_BY_TOKEN,
)
if sunsetted_token_hashes:
await broadcast(
users=USERS,
payload={
'type': 'rem-users',
'token_hashes': sunsetted_token_hashes,
},
)
# Update / create user
user = USERS_BY_TOKEN.get(token)
if user is not None:
@ -79,7 +65,7 @@ def with_user_from(context):
broadcaster=broadcaster,
)
USERS_BY_TOKEN[token] = user
await broadcast(
broadcast(
USERS,
payload={
'type': 'add-user',

55
anonstream/tasks.py ノーマルファイル
ファイルの表示

@ -0,0 +1,55 @@
import asyncio
from functools import wraps
from quart import current_app
from anonstream.chat import broadcast
from anonstream.wrappers import with_timestamp
from anonstream.helpers.user import is_visible
CONFIG = current_app.config
MESSAGES = current_app.messages
USERS_BY_TOKEN = current_app.users_by_token
USERS = current_app.users
def with_period(period):
def periodically(f):
@wraps(f)
async def wrapper(*args, **kwargs):
await asyncio.sleep(period)
while True:
if current_app.shutting_down:
break
else:
f(*args, **kwargs)
await asyncio.sleep(period)
return wrapper
return periodically
@with_period(CONFIG['CHECKUP_PERIOD_USER'])
@with_timestamp
def t_sunset_users(timestamp):
tokens = []
for token in USERS_BY_TOKEN:
user = USERS_BY_TOKEN[token]
if not is_visible(timestamp, MESSAGES, user):
tokens.append(token)
token_hashes = []
while tokens:
token = tokens.pop()
token_hash = USERS_BY_TOKEN.pop(token)['token_hash']
token_hashes.append(token_hash)
if token_hashes:
broadcast(
users=USERS,
payload={
'type': 'rem-users',
'token_hashes': token_hashes,
},
)
current_app.add_background_task(t_sunset_users)

ファイルの表示

@ -31,7 +31,7 @@ def pop_notice(user, notice_id):
notice, verbose = None, False
return notice, verbose
async def try_change_appearance(user, name, color, password,
def try_change_appearance(user, name, color, password,
want_delete_tripcode, want_change_tripcode):
errors = []
def try_(f, *args, **kwargs):
@ -52,7 +52,7 @@ async def try_change_appearance(user, name, color, password,
elif want_change_tripcode:
change_tripcode(user, password)
await broadcast(
broadcast(
USERS,
payload={
'type': 'mut-user',
@ -117,24 +117,3 @@ def users_for_websocket(timestamp):
user['token_hash']: user_for_websocket(user)
for user in visible_users
}
last_checkup = -inf
def sunset(messages, users_by_token):
global last_checkup
timestamp = int(time.time())
if timestamp - last_checkup < CONFIG['CHECKUP_PERIOD_USER']:
return []
to_delete = []
for token in users_by_token:
user = users_by_token[token]
if not is_visible(timestamp, messages, user):
to_delete.append(token)
for index, token in enumerate(to_delete):
to_delete[index] = users_by_token.pop(token)['token_hash']
last_checkup = timestamp
return to_delete

ファイルの表示

@ -44,7 +44,7 @@ async def websocket_inbound(queue, user):
}
else:
try:
markup = await add_chat_message(user, nonce, comment)
markup = add_chat_message(user, nonce, comment)
except Rejected as e:
notice, *_ = e.args
payload = {
@ -57,4 +57,4 @@ async def websocket_inbound(queue, user):
'nonce': nonce,
'next': generate_nonce(),
}
await queue.put(payload)
queue.put_nowait(payload)

8
app.py
ファイルの表示

@ -1,9 +1,5 @@
import asyncio
import anonstream
async def main():
app = await anonstream.create_app()
await app.run_task(port=5051, debug=True)
if __name__ == '__main__':
asyncio.run(main())
app = anonstream.create_app()
app.run(port=5051, debug=True)

ファイルの表示

@ -21,9 +21,9 @@ notices = 32
chat_messages = 8192
chat_scrollback = 256
[ratelimits]
user_absence = 8
captcha_expiry = 8
[intervals]
sunset_users = 60
expire_captchas = 60
[thresholds]
user_notwatching = 8