Skip to content

event_bridge

event_bridge

Daemon EventBus → SSE bridge for real-time event streaming.

Subscribes to daemon.monitor.stream via DaemonClient.stream() and multiplexes events to per-job and global consumers. Replaces the previous polling approach with a single long-lived subscription to the conductor's EventBus.

Classes

DaemonEventBridge

DaemonEventBridge(client)

Bridge between daemon EventBus and browser SSE streams.

Maintains a single daemon.monitor.stream subscription that feeds per-job queues and a global queue. Consumers (SSE routes) await their queue — no polling needed.

Parameters

client: Connected DaemonClient for IPC calls.

Source code in src/marianne/dashboard/services/event_bridge.py
def __init__(self, client: DaemonClient) -> None:
    self._client = client
    self._job_queues: dict[str, list[asyncio.Queue[dict[str, Any]]]] = {}
    self._global_queues: list[asyncio.Queue[dict[str, Any]]] = []
    self._lock = asyncio.Lock()
    self._subscriber_task: asyncio.Task[None] | None = None
    self._running = False
Functions
start async
start()

Start the background subscription to the conductor event stream.

Source code in src/marianne/dashboard/services/event_bridge.py
async def start(self) -> None:
    """Start the background subscription to the conductor event stream."""
    if self._running:
        return
    self._running = True
    self._subscriber_task = asyncio.create_task(
        self._subscribe_loop(),
        name="event-bridge-subscriber",
    )
    _logger.info("event_bridge.started")
stop async
stop()

Stop the background subscription and clean up.

Source code in src/marianne/dashboard/services/event_bridge.py
async def stop(self) -> None:
    """Stop the background subscription and clean up."""
    self._running = False
    if self._subscriber_task is not None:
        self._subscriber_task.cancel()
        try:
            await self._subscriber_task
        except asyncio.CancelledError:
            pass
        self._subscriber_task = None
    async with self._lock:
        for queues in self._job_queues.values():
            for q in queues:
                q.put_nowait({"event": "bridge_stopped", "data": "{}"})
        for q in self._global_queues:
            q.put_nowait({"event": "bridge_stopped", "data": "{}"})
    _logger.info("event_bridge.stopped")
job_events async
job_events(job_id)

Yield real-time SSE events for a specific job.

Creates a per-job queue, subscribes to the conductor event stream, and yields events as they arrive.

Source code in src/marianne/dashboard/services/event_bridge.py
async def job_events(self, job_id: str) -> AsyncIterator[dict[str, Any]]:
    """Yield real-time SSE events for a specific job.

    Creates a per-job queue, subscribes to the conductor event stream,
    and yields events as they arrive.
    """
    queue = await self._register_job_queue(job_id)
    try:
        while self._running:
            try:
                event = await asyncio.wait_for(queue.get(), timeout=30.0)
                queue.task_done()
                yield event
            except TimeoutError:
                yield {
                    "event": "heartbeat",
                    "data": json.dumps({"timestamp": time.time()}),
                }
    finally:
        await self._unregister_job_queue(job_id, queue)
all_events async
all_events(limit=50)

Yield real-time SSE events across all jobs.

Used by the global event timeline on the dashboard index page.

Source code in src/marianne/dashboard/services/event_bridge.py
async def all_events(self, limit: int = 50) -> AsyncIterator[dict[str, Any]]:
    """Yield real-time SSE events across all jobs.

    Used by the global event timeline on the dashboard index page.
    """
    queue = await self._register_global_queue()
    try:
        while self._running:
            try:
                event = await asyncio.wait_for(queue.get(), timeout=30.0)
                queue.task_done()
                yield event
            except TimeoutError:
                yield {
                    "event": "heartbeat",
                    "data": json.dumps({"timestamp": time.time()}),
                }
    finally:
        await self._unregister_global_queue(queue)
observer_events async
observer_events(job_id, limit=20)

Get recent observer events for a job (one-shot, not streaming).

Falls back to IPC call for historical events.

Source code in src/marianne/dashboard/services/event_bridge.py
async def observer_events(self, job_id: str, limit: int = 20) -> list[dict[str, Any]]:
    """Get recent observer events for a job (one-shot, not streaming).

    Falls back to IPC call for historical events.
    """
    try:
        events = await self._fetch_observer_events(job_id, limit=limit)
        return events
    except Exception:
        _logger.debug(
            "observer_events_fetch_error",
            job_id=job_id,
            exc_info=True,
        )
        return []

Functions