From 236d73a342ac512ae5b291881959a955f2f5e94a Mon Sep 17 00:00:00 2001 From: n9k Date: Sun, 20 Feb 2022 01:06:13 +0000 Subject: [PATCH] Gracefully finish background tasks on shutdown --- anonstream/__init__.py | 10 ++++++---- anonstream/tasks.py | 18 +++++++++++++----- app.py | 3 ++- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/anonstream/__init__.py b/anonstream/__init__.py index 3975485..d025d7f 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -58,11 +58,13 @@ def create_app(): app.users = app.users_by_token.values() app.segments_directory_cache = DirectoryCache(config['stream']['segments_dir']) - @app.while_serving + app.background_sleep = set() + + @app.after_serving async def shutdown(): - app.shutting_down = False - yield - app.shutting_down = True + # make all background tasks finish + for task in app.background_sleep: + task.cancel() @app.before_serving async def startup(): diff --git a/anonstream/tasks.py b/anonstream/tasks.py index 02eb427..bccb657 100644 --- a/anonstream/tasks.py +++ b/anonstream/tasks.py @@ -12,17 +12,25 @@ MESSAGES = current_app.messages USERS_BY_TOKEN = current_app.users_by_token USERS = current_app.users +async def sleep_and_collect_task(delay): + coro = asyncio.sleep(delay) + task = asyncio.create_task(coro) + current_app.background_sleep.add(task) + try: + await task + finally: + current_app.background_sleep.remove(task) + def with_period(period): def periodically(f): @wraps(f) async def wrapper(*args, **kwargs): - await asyncio.sleep(period) while True: - if current_app.shutting_down: + try: + await sleep_and_collect_task(period) + except asyncio.CancelledError: break - else: - f(*args, **kwargs) - await asyncio.sleep(period) + f(*args, **kwargs) return wrapper diff --git a/app.py b/app.py index d33ac48..33a34d3 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,6 @@ import anonstream +app = anonstream.create_app() + if __name__ == '__main__': - app = anonstream.create_app() app.run(port=5051, debug=True)