diff --git a/anonstream/__init__.py b/anonstream/__init__.py index 6d59fb3..d2c609e 100644 --- a/anonstream/__init__.py +++ b/anonstream/__init__.py @@ -51,6 +51,9 @@ def create_app(config_file): # Background tasks' asyncio.sleep tasks, cancelled on shutdown app.background_sleep = set() + # Queues for event socket clients + app.event_queues = set() + @app.after_serving async def shutdown(): # Force all background tasks to finish @@ -60,10 +63,23 @@ def create_app(config_file): @app.before_serving async def startup(): # Start control server - from anonstream.control.server import start_control_server_at - async def start_control_server(): - return await start_control_server_at(app.config['CONTROL_ADDRESS']) - app.add_background_task(start_control_server) + if app.config['SOCKET_CONTROL_ENABLED']: + from anonstream.control.server import start_control_server_at + async def start_control_server(): + return await start_control_server_at( + app.config['SOCKET_CONTROL_ADDRESS'] + ) + app.add_background_task(start_control_server) + + # Start event server + if app.config['SOCKET_EVENT_ENABLED']: + from anonstream.events import start_event_server_at + async def start_event_server(): + return await start_event_server_at( + app.config['SOCKET_EVENT_ADDRESS'] + ) + app.add_background_task(start_event_server) + # Create routes and background tasks import anonstream.routes diff --git a/anonstream/chat.py b/anonstream/chat.py index bbed55a..7f9986e 100644 --- a/anonstream/chat.py +++ b/anonstream/chat.py @@ -7,6 +7,7 @@ from datetime import datetime from quart import current_app, escape from anonstream.broadcast import broadcast, broadcast_users_update +from anonstream.events import notify_event_sockets from anonstream.helpers.chat import generate_nonce_hash, get_scrollback from anonstream.utils.chat import get_message_for_websocket, get_approx_linespan @@ -102,6 +103,12 @@ def add_chat_message(user, nonce, comment, ignore_empty=False): while len(MESSAGES_BY_ID) > CONFIG['MAX_CHAT_MESSAGES']: MESSAGES_BY_ID.pop(last=False) + # Notify event sockets that a chat message was added + notify_event_sockets({ + 'type': 'message', + 'event': message, + }) + # Broadcast a users update to all websockets, # in case this message is from a new user broadcast_users_update() diff --git a/anonstream/config.py b/anonstream/config.py index 932c45c..15ed3b0 100644 --- a/anonstream/config.py +++ b/anonstream/config.py @@ -13,7 +13,6 @@ def update_flask_from_toml(toml_config, flask_config): flask_config.update({ 'SECRET_KEY_STRING': toml_config['secret_key'], 'SECRET_KEY': toml_config['secret_key'].encode(), - 'CONTROL_ADDRESS': toml_config['control']['address'], 'AUTH_USERNAME': toml_config['auth']['username'], 'AUTH_PWHASH': auth_pwhash, 'AUTH_TOKEN': generate_token(), @@ -25,6 +24,7 @@ def update_flask_from_toml(toml_config, flask_config): def toml_to_flask_sections(config): TOML_TO_FLASK_SECTIONS = ( + toml_to_flask_section_socket, toml_to_flask_section_segments, toml_to_flask_section_title, toml_to_flask_section_names, @@ -38,6 +38,15 @@ def toml_to_flask_sections(config): for toml_to_flask_section in TOML_TO_FLASK_SECTIONS: yield toml_to_flask_section(config) +def toml_to_flask_section_socket(config): + cfg = config['socket'] + return { + 'SOCKET_CONTROL_ENABLED': cfg['control']['enabled'], + 'SOCKET_CONTROL_ADDRESS': cfg['control']['address'], + 'SOCKET_EVENT_ENABLED': cfg['event']['enabled'], + 'SOCKET_EVENT_ADDRESS': cfg['event']['address'], + } + def toml_to_flask_section_segments(config): cfg = config['segments'] return { diff --git a/anonstream/events.py b/anonstream/events.py new file mode 100644 index 0000000..df24326 --- /dev/null +++ b/anonstream/events.py @@ -0,0 +1,30 @@ +import asyncio +import json + +from quart import current_app + +async def start_event_server_at(address): + return await asyncio.start_unix_server(serve_event_client, address) + +async def serve_event_client(reader, writer): + reader.feed_eof() + queue = asyncio.Queue() + current_app.event_queues.add(queue) + try: + while True: + event = await queue.get() + event_json = json.dumps(event, separators=(',', ':')) + writer.write(event_json.encode()) + writer.write(b'\n') + try: + await writer.drain() + # Because we used reader.feed_eof(), if the client sends anything + # an AsserionError will be raised + except (ConnectionError, AssertionError): + break + finally: + current_app.event_queues.remove(queue) + +def notify_event_sockets(event): + for queue in current_app.event_queues: + queue.put_nowait(event) diff --git a/anonstream/helpers/chat.py b/anonstream/helpers/chat.py index 32c5a20..1790a19 100644 --- a/anonstream/helpers/chat.py +++ b/anonstream/helpers/chat.py @@ -9,7 +9,7 @@ CONFIG = current_app.config def generate_nonce_hash(nonce): parts = CONFIG['SECRET_KEY'] + b'nonce-hash\0' + nonce.encode() - return hashlib.sha256(parts).digest() + return hashlib.sha256(parts).hexdigest() def get_scrollback(messages): n = CONFIG['MAX_CHAT_SCROLLBACK'] diff --git a/config.toml b/config.toml index 1d7f9e2..e8c2efb 100644 --- a/config.toml +++ b/config.toml @@ -1,8 +1,13 @@ secret_key = "place secret key here" -[control] +[socket.control] +enabled = true address = "control.sock" +[socket.event] +enabled = true +address = "event.sock" + [auth] username = "broadcaster"