Skip to content

events

events

Event stream API endpoints.

Provides SSE and JSON endpoints backed by DaemonEventBridge. Router registration happens in app.py (stage 6).

Classes

Functions

get_event_bridge

get_event_bridge()

Return the module-level event bridge instance.

Raises RuntimeError if not yet configured (bridge is set when the router is registered in app.py — stage 6).

Source code in src/marianne/dashboard/routes/events.py
def get_event_bridge() -> DaemonEventBridge:
    """Return the module-level event bridge instance.

    Raises ``RuntimeError`` if not yet configured (bridge is set when
    the router is registered in ``app.py`` — stage 6).
    """
    if _event_bridge is None:
        raise RuntimeError(
            "DaemonEventBridge not configured. "
            "Set event_bridge via set_event_bridge() before serving requests."
        )
    return _event_bridge

set_event_bridge

set_event_bridge(bridge)

Configure the module-level event bridge (called from app.py).

Source code in src/marianne/dashboard/routes/events.py
def set_event_bridge(bridge: DaemonEventBridge) -> None:
    """Configure the module-level event bridge (called from app.py)."""
    global _event_bridge
    _event_bridge = bridge

stream_all_events async

stream_all_events(limit=Query(default=50, ge=1, le=200))

SSE stream of events across all active jobs.

Used by the global event timeline on the dashboard index page.

Source code in src/marianne/dashboard/routes/events.py
@router.get("/api/events/stream")
async def stream_all_events(
    limit: int = Query(default=50, ge=1, le=200),
) -> StreamingResponse:
    """SSE stream of events across all active jobs.

    Used by the global event timeline on the dashboard index page.
    """
    bridge = get_event_bridge()
    return StreamingResponse(
        _sse_generator(bridge, limit),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

get_observer_events async

get_observer_events(job_id, limit=Query(default=20, ge=1, le=200))

JSON endpoint returning recent observer events for a job.

Source code in src/marianne/dashboard/routes/events.py
@router.get("/api/jobs/{job_id}/observer")
async def get_observer_events(
    job_id: str,
    limit: int = Query(default=20, ge=1, le=200),
) -> list[dict[str, Any]]:
    """JSON endpoint returning recent observer events for a job."""
    bridge = get_event_bridge()
    events = await bridge.observer_events(job_id, limit=limit)
    return events