Event socket
This commit adds a unix socket on which you can receive internal events as they happen. Currently the only supported event is a chat message being added. Intended for external applications that depend on chat messages, e.g. text-to-speech or Twitch Plays Pokémon.
このコミットが含まれているのは:
コミット
65d28a6937
|
@ -51,6 +51,9 @@ def create_app(config_file):
|
||||||
# Background tasks' asyncio.sleep tasks, cancelled on shutdown
|
# Background tasks' asyncio.sleep tasks, cancelled on shutdown
|
||||||
app.background_sleep = set()
|
app.background_sleep = set()
|
||||||
|
|
||||||
|
# Queues for event socket clients
|
||||||
|
app.event_queues = set()
|
||||||
|
|
||||||
@app.after_serving
|
@app.after_serving
|
||||||
async def shutdown():
|
async def shutdown():
|
||||||
# Force all background tasks to finish
|
# Force all background tasks to finish
|
||||||
|
@ -60,10 +63,23 @@ def create_app(config_file):
|
||||||
@app.before_serving
|
@app.before_serving
|
||||||
async def startup():
|
async def startup():
|
||||||
# Start control server
|
# Start control server
|
||||||
from anonstream.control.server import start_control_server_at
|
if app.config['SOCKET_CONTROL_ENABLED']:
|
||||||
async def start_control_server():
|
from anonstream.control.server import start_control_server_at
|
||||||
return await start_control_server_at(app.config['CONTROL_ADDRESS'])
|
async def start_control_server():
|
||||||
app.add_background_task(start_control_server)
|
return await start_control_server_at(
|
||||||
|
app.config['SOCKET_CONTROL_ADDRESS']
|
||||||
|
)
|
||||||
|
app.add_background_task(start_control_server)
|
||||||
|
|
||||||
|
# Start event server
|
||||||
|
if app.config['SOCKET_EVENT_ENABLED']:
|
||||||
|
from anonstream.events import start_event_server_at
|
||||||
|
async def start_event_server():
|
||||||
|
return await start_event_server_at(
|
||||||
|
app.config['SOCKET_EVENT_ADDRESS']
|
||||||
|
)
|
||||||
|
app.add_background_task(start_event_server)
|
||||||
|
|
||||||
|
|
||||||
# Create routes and background tasks
|
# Create routes and background tasks
|
||||||
import anonstream.routes
|
import anonstream.routes
|
||||||
|
|
|
@ -7,6 +7,7 @@ from datetime import datetime
|
||||||
from quart import current_app, escape
|
from quart import current_app, escape
|
||||||
|
|
||||||
from anonstream.broadcast import broadcast, broadcast_users_update
|
from anonstream.broadcast import broadcast, broadcast_users_update
|
||||||
|
from anonstream.events import notify_event_sockets
|
||||||
from anonstream.helpers.chat import generate_nonce_hash, get_scrollback
|
from anonstream.helpers.chat import generate_nonce_hash, get_scrollback
|
||||||
from anonstream.utils.chat import get_message_for_websocket, get_approx_linespan
|
from anonstream.utils.chat import get_message_for_websocket, get_approx_linespan
|
||||||
|
|
||||||
|
@ -102,6 +103,12 @@ def add_chat_message(user, nonce, comment, ignore_empty=False):
|
||||||
while len(MESSAGES_BY_ID) > CONFIG['MAX_CHAT_MESSAGES']:
|
while len(MESSAGES_BY_ID) > CONFIG['MAX_CHAT_MESSAGES']:
|
||||||
MESSAGES_BY_ID.pop(last=False)
|
MESSAGES_BY_ID.pop(last=False)
|
||||||
|
|
||||||
|
# Notify event sockets that a chat message was added
|
||||||
|
notify_event_sockets({
|
||||||
|
'type': 'message',
|
||||||
|
'event': message,
|
||||||
|
})
|
||||||
|
|
||||||
# Broadcast a users update to all websockets,
|
# Broadcast a users update to all websockets,
|
||||||
# in case this message is from a new user
|
# in case this message is from a new user
|
||||||
broadcast_users_update()
|
broadcast_users_update()
|
||||||
|
|
|
@ -13,7 +13,6 @@ def update_flask_from_toml(toml_config, flask_config):
|
||||||
flask_config.update({
|
flask_config.update({
|
||||||
'SECRET_KEY_STRING': toml_config['secret_key'],
|
'SECRET_KEY_STRING': toml_config['secret_key'],
|
||||||
'SECRET_KEY': toml_config['secret_key'].encode(),
|
'SECRET_KEY': toml_config['secret_key'].encode(),
|
||||||
'CONTROL_ADDRESS': toml_config['control']['address'],
|
|
||||||
'AUTH_USERNAME': toml_config['auth']['username'],
|
'AUTH_USERNAME': toml_config['auth']['username'],
|
||||||
'AUTH_PWHASH': auth_pwhash,
|
'AUTH_PWHASH': auth_pwhash,
|
||||||
'AUTH_TOKEN': generate_token(),
|
'AUTH_TOKEN': generate_token(),
|
||||||
|
@ -25,6 +24,7 @@ def update_flask_from_toml(toml_config, flask_config):
|
||||||
|
|
||||||
def toml_to_flask_sections(config):
|
def toml_to_flask_sections(config):
|
||||||
TOML_TO_FLASK_SECTIONS = (
|
TOML_TO_FLASK_SECTIONS = (
|
||||||
|
toml_to_flask_section_socket,
|
||||||
toml_to_flask_section_segments,
|
toml_to_flask_section_segments,
|
||||||
toml_to_flask_section_title,
|
toml_to_flask_section_title,
|
||||||
toml_to_flask_section_names,
|
toml_to_flask_section_names,
|
||||||
|
@ -38,6 +38,15 @@ def toml_to_flask_sections(config):
|
||||||
for toml_to_flask_section in TOML_TO_FLASK_SECTIONS:
|
for toml_to_flask_section in TOML_TO_FLASK_SECTIONS:
|
||||||
yield toml_to_flask_section(config)
|
yield toml_to_flask_section(config)
|
||||||
|
|
||||||
|
def toml_to_flask_section_socket(config):
|
||||||
|
cfg = config['socket']
|
||||||
|
return {
|
||||||
|
'SOCKET_CONTROL_ENABLED': cfg['control']['enabled'],
|
||||||
|
'SOCKET_CONTROL_ADDRESS': cfg['control']['address'],
|
||||||
|
'SOCKET_EVENT_ENABLED': cfg['event']['enabled'],
|
||||||
|
'SOCKET_EVENT_ADDRESS': cfg['event']['address'],
|
||||||
|
}
|
||||||
|
|
||||||
def toml_to_flask_section_segments(config):
|
def toml_to_flask_section_segments(config):
|
||||||
cfg = config['segments']
|
cfg = config['segments']
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
|
from quart import current_app
|
||||||
|
|
||||||
|
async def start_event_server_at(address):
|
||||||
|
return await asyncio.start_unix_server(serve_event_client, address)
|
||||||
|
|
||||||
|
async def serve_event_client(reader, writer):
|
||||||
|
reader.feed_eof()
|
||||||
|
queue = asyncio.Queue()
|
||||||
|
current_app.event_queues.add(queue)
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
event = await queue.get()
|
||||||
|
event_json = json.dumps(event, separators=(',', ':'))
|
||||||
|
writer.write(event_json.encode())
|
||||||
|
writer.write(b'\n')
|
||||||
|
try:
|
||||||
|
await writer.drain()
|
||||||
|
# Because we used reader.feed_eof(), if the client sends anything
|
||||||
|
# an AsserionError will be raised
|
||||||
|
except (ConnectionError, AssertionError):
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
current_app.event_queues.remove(queue)
|
||||||
|
|
||||||
|
def notify_event_sockets(event):
|
||||||
|
for queue in current_app.event_queues:
|
||||||
|
queue.put_nowait(event)
|
|
@ -9,7 +9,7 @@ CONFIG = current_app.config
|
||||||
|
|
||||||
def generate_nonce_hash(nonce):
|
def generate_nonce_hash(nonce):
|
||||||
parts = CONFIG['SECRET_KEY'] + b'nonce-hash\0' + nonce.encode()
|
parts = CONFIG['SECRET_KEY'] + b'nonce-hash\0' + nonce.encode()
|
||||||
return hashlib.sha256(parts).digest()
|
return hashlib.sha256(parts).hexdigest()
|
||||||
|
|
||||||
def get_scrollback(messages):
|
def get_scrollback(messages):
|
||||||
n = CONFIG['MAX_CHAT_SCROLLBACK']
|
n = CONFIG['MAX_CHAT_SCROLLBACK']
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
secret_key = "place secret key here"
|
secret_key = "place secret key here"
|
||||||
|
|
||||||
[control]
|
[socket.control]
|
||||||
|
enabled = true
|
||||||
address = "control.sock"
|
address = "control.sock"
|
||||||
|
|
||||||
|
[socket.event]
|
||||||
|
enabled = true
|
||||||
|
address = "event.sock"
|
||||||
|
|
||||||
[auth]
|
[auth]
|
||||||
username = "broadcaster"
|
username = "broadcaster"
|
||||||
|
|
||||||
|
|
読み込み中…
新しいイシューから参照