Skip to content

monitor

monitor

SSE endpoint for real-time system monitor snapshots.

Streams SystemSnapshot NDJSON events from the profiler's JSONL file or SQLite storage to connected dashboard clients. Each event is a complete SystemSnapshot serialized as JSON — the same format produced by mzt top --json.

Functions

get_monitor_storage async

get_monitor_storage(db_path=None)

Get or create a cached MonitorStorage instance.

Returns an initialized MonitorStorage that can be reused across requests instead of creating a new instance per call.

Source code in src/marianne/dashboard/routes/monitor.py
async def get_monitor_storage(db_path: Path | None = None) -> Any:
    """Get or create a cached MonitorStorage instance.

    Returns an initialized MonitorStorage that can be reused across
    requests instead of creating a new instance per call.
    """
    from marianne.daemon.profiler.storage import MonitorStorage

    path = db_path or _DEFAULT_DB_PATH
    key = str(path)
    cached = _monitor_storage_cache.get(key)
    if cached is not None:
        return cached

    storage = MonitorStorage(db_path=path)
    await storage.initialize()
    _monitor_storage_cache[key] = storage
    return storage

stream_monitor async

stream_monitor(source=Query(default='auto', description="Data source: 'daemon', 'jsonl', 'sqlite', or 'auto' (tries daemon, then jsonl, then sqlite)"), poll_interval=Query(default=1.0, ge=0.5, le=30.0, description='Poll interval in seconds'))

Stream real-time system monitor snapshots via Server-Sent Events.

Each event contains a complete SystemSnapshot as JSON — the same format used by mzt top --json.

Source selection:

  • auto (default): Uses daemon IPC if configured, then JSONL, then SQLite.
  • daemon: Polls DaemonSystemView via IPC.
  • jsonl: Tails ~/.marianne/monitor.jsonl directly.
  • sqlite: Polls ~/.marianne/monitor.db for recent snapshots.

Returns:

Type Description
StreamingResponse

SSE stream of snapshot events, with periodic heartbeat

StreamingResponse

events to keep the connection alive.

Source code in src/marianne/dashboard/routes/monitor.py
@router.get("/stream")
async def stream_monitor(
    source: str = Query(
        default="auto",
        description=(
            "Data source: 'daemon', 'jsonl', 'sqlite', "
            "or 'auto' (tries daemon, then jsonl, then sqlite)"
        ),
    ),
    poll_interval: float = Query(
        default=1.0,
        ge=0.5,
        le=30.0,
        description="Poll interval in seconds",
    ),
) -> StreamingResponse:
    """Stream real-time system monitor snapshots via Server-Sent Events.

    Each event contains a complete SystemSnapshot as JSON — the same
    format used by ``mzt top --json``.

    **Source selection:**

    - ``auto`` (default): Uses daemon IPC if configured, then JSONL, then SQLite.
    - ``daemon``: Polls ``DaemonSystemView`` via IPC.
    - ``jsonl``: Tails ``~/.marianne/monitor.jsonl`` directly.
    - ``sqlite``: Polls ``~/.marianne/monitor.db`` for recent snapshots.

    Returns:
        SSE stream of ``snapshot`` events, with periodic ``heartbeat``
        events to keep the connection alive.
    """
    if poll_interval < 0.5 or poll_interval > 30.0:
        raise HTTPException(
            status_code=400,
            detail="poll_interval must be between 0.5 and 30.0 seconds",
        )

    jsonl_path = _DEFAULT_JSONL_PATH
    db_path = _DEFAULT_DB_PATH

    # Resolve source
    if source == "auto":
        if _daemon_available():
            source = "daemon"
        elif jsonl_path.exists():
            source = "jsonl"
        elif db_path.exists():
            source = "sqlite"
        else:
            raise HTTPException(
                status_code=503,
                detail="No monitor data available. Is the conductor running with profiler enabled?",
            )

    if source == "daemon":
        generator = _stream_from_daemon(poll_interval=poll_interval)
    elif source == "jsonl":
        generator = _tail_jsonl(jsonl_path, poll_interval=poll_interval)
    elif source == "sqlite":
        generator = _stream_from_sqlite(db_path, poll_interval=poll_interval)
    else:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid source: {source}. Must be 'auto', 'daemon', 'jsonl', or 'sqlite'.",
        )

    return StreamingResponse(
        generator,
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

get_latest_snapshot async

get_latest_snapshot()

Get the most recent system snapshot (one-shot, not streaming).

When DaemonSystemView is configured, fetches a live snapshot via IPC. Falls back to reading monitor.db when the daemon service layer isn't wired.

Source code in src/marianne/dashboard/routes/monitor.py
@router.get("/snapshot")
async def get_latest_snapshot() -> dict[str, object]:
    """Get the most recent system snapshot (one-shot, not streaming).

    When ``DaemonSystemView`` is configured, fetches a live snapshot via
    IPC.  Falls back to reading ``monitor.db`` when the daemon service
    layer isn't wired.
    """
    # Try DaemonSystemView first (set by app.py when daemon is available)
    try:
        from marianne.dashboard.routes.system import get_system_view
        system_view = get_system_view()
        snap = await system_view.get_snapshot()
        if snap is not None:
            return snap
    except RuntimeError:
        pass  # SystemView not configured — fall through to file-based path
    except Exception:
        _logger.debug("monitor_snapshot.daemon_error", exc_info=True)

    # Fallback: read from monitor.db
    db_path = _DEFAULT_DB_PATH

    if not db_path.exists():
        raise HTTPException(
            status_code=503,
            detail="No monitor data available. Is the conductor running with profiler enabled?",
        )

    try:
        storage = await get_monitor_storage(db_path)
    except ImportError:
        raise HTTPException(
            status_code=503,
            detail="aiosqlite not available",
        ) from None

    # Get the most recent snapshot
    since = time.time() - 60  # Last minute
    snapshots = await storage.read_snapshots(since=since, limit=1)

    if not snapshots:
        raise HTTPException(
            status_code=404,
            detail="No recent snapshots found",
        )

    result: dict[str, object] = snapshots[-1].model_dump(mode="json")
    return result