Skip to content

event_bus

event_bus

Async pub/sub event bus for the Marianne daemon.

Routes ObserverEvents from the runner and observer to downstream consumers (SSE dashboard, learning hub, future webhooks). Each subscriber gets a bounded deque — slow subscribers lose oldest events rather than blocking the publisher.

Classes

EventBus

EventBus(*, max_queue_size=1000)

Async pub/sub event bus with bounded queues per subscriber.

Subscribers receive events asynchronously via callbacks. Each subscriber has a bounded deque (max_queue_size). When the queue is full, the oldest event is dropped (backpressure via drop-oldest policy).

Usage::

bus = EventBus(max_queue_size=1000)
await bus.start()

# Subscribe to all events
sub_id = bus.subscribe(callback=my_handler)

# Subscribe with filter
sub_id = bus.subscribe(
    callback=my_handler,
    event_filter=lambda e: e["event"].startswith("sheet."),
)

# Publish events
await bus.publish(event)

# Cleanup
bus.unsubscribe(sub_id)
await bus.shutdown()
Source code in src/marianne/daemon/event_bus.py
def __init__(self, *, max_queue_size: int = 1000) -> None:
    self._max_queue_size = max_queue_size
    self._subscribers: dict[str, _Subscriber] = {}
    self._drain_task: asyncio.Task[None] | None = None
    self._pending: asyncio.Queue[ObserverEvent] = asyncio.Queue()
    self._running = False
Attributes
subscriber_count property
subscriber_count

Number of active subscribers.

Functions
start async
start()

Start the background drain loop.

Source code in src/marianne/daemon/event_bus.py
async def start(self) -> None:
    """Start the background drain loop."""
    if self._running:
        return
    self._running = True
    self._drain_task = asyncio.create_task(
        self._drain_loop(), name="event-bus-drain"
    )
publish async
publish(event)

Publish an event to all matching subscribers.

Non-blocking for the publisher. Events are queued for the drain loop to distribute to subscribers.

Source code in src/marianne/daemon/event_bus.py
async def publish(self, event: ObserverEvent) -> None:
    """Publish an event to all matching subscribers.

    Non-blocking for the publisher. Events are queued for the drain loop
    to distribute to subscribers.
    """
    if not self._running:
        return
    await self._pending.put(event)
subscribe
subscribe(callback, *, event_filter=None)

Register a subscriber.

Parameters:

Name Type Description Default
callback EventCallback

Async or sync callable receiving ObserverEvent.

required
event_filter EventFilter

Optional filter function. If provided, the subscriber only receives events where filter returns True.

None

Returns:

Type Description
str

Subscription ID for later unsubscribe.

Source code in src/marianne/daemon/event_bus.py
def subscribe(
    self,
    callback: EventCallback,
    *,
    event_filter: EventFilter = None,
) -> str:
    """Register a subscriber.

    Args:
        callback: Async or sync callable receiving ObserverEvent.
        event_filter: Optional filter function. If provided, the subscriber
            only receives events where filter returns True.

    Returns:
        Subscription ID for later unsubscribe.
    """
    sub_id = str(uuid.uuid4())
    self._subscribers[sub_id] = _Subscriber(
        callback=callback,
        event_filter=event_filter,
        queue=deque(maxlen=self._max_queue_size),
    )
    _logger.info("event_bus.subscribed", sub_id=sub_id)
    return sub_id
unsubscribe
unsubscribe(sub_id)

Remove a subscriber.

Returns:

Type Description
bool

True if the subscriber existed and was removed.

Source code in src/marianne/daemon/event_bus.py
def unsubscribe(self, sub_id: str) -> bool:
    """Remove a subscriber.

    Returns:
        True if the subscriber existed and was removed.
    """
    removed = self._subscribers.pop(sub_id, None) is not None
    if removed:
        _logger.info("event_bus.unsubscribed", sub_id=sub_id)
    return removed
shutdown async
shutdown()

Stop the drain loop and drain remaining events.

Source code in src/marianne/daemon/event_bus.py
async def shutdown(self) -> None:
    """Stop the drain loop and drain remaining events."""
    self._running = False
    if self._drain_task is not None:
        self._drain_task.cancel()
        try:
            await self._drain_task
        except asyncio.CancelledError:
            pass
        self._drain_task = None

    # Drain remaining events
    while not self._pending.empty():
        try:
            event = self._pending.get_nowait()
            await self._distribute(event)
        except asyncio.QueueEmpty:
            break

    _logger.info(
        "event_bus.shutdown",
        remaining_subscribers=len(self._subscribers),
    )

Functions