event_bus
event_bus
¶
Async pub/sub event bus for the Marianne daemon.
Routes ObserverEvents from the runner and observer to downstream consumers (SSE dashboard, learning hub, future webhooks). Each subscriber gets a bounded deque — slow subscribers lose oldest events rather than blocking the publisher.
Classes¶
EventBus
¶
Async pub/sub event bus with bounded queues per subscriber.
Subscribers receive events asynchronously via callbacks. Each subscriber has a bounded deque (max_queue_size). When the queue is full, the oldest event is dropped (backpressure via drop-oldest policy).
Usage::
bus = EventBus(max_queue_size=1000)
await bus.start()
# Subscribe to all events
sub_id = bus.subscribe(callback=my_handler)
# Subscribe with filter
sub_id = bus.subscribe(
callback=my_handler,
event_filter=lambda e: e["event"].startswith("sheet."),
)
# Publish events
await bus.publish(event)
# Cleanup
bus.unsubscribe(sub_id)
await bus.shutdown()
Source code in src/marianne/daemon/event_bus.py
Attributes¶
Functions¶
start
async
¶
Start the background drain loop.
publish
async
¶
Publish an event to all matching subscribers.
Non-blocking for the publisher. Events are queued for the drain loop to distribute to subscribers.
Source code in src/marianne/daemon/event_bus.py
subscribe
¶
Register a subscriber.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
EventCallback
|
Async or sync callable receiving ObserverEvent. |
required |
event_filter
|
EventFilter
|
Optional filter function. If provided, the subscriber only receives events where filter returns True. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Subscription ID for later unsubscribe. |
Source code in src/marianne/daemon/event_bus.py
unsubscribe
¶
Remove a subscriber.
Returns:
| Type | Description |
|---|---|
bool
|
True if the subscriber existed and was removed. |
Source code in src/marianne/daemon/event_bus.py
shutdown
async
¶
Stop the drain loop and drain remaining events.