Skip to content

reader

reader

Data reader for the Marianne Monitor TUI.

Provides MonitorReader which reads system snapshots and process events from multiple data sources (IPC, SQLite, JSONL) for display in mzt top.

The reader auto-detects the best available data source: 1. IPC (DaemonClient) — live data from the running conductor 2. SQLite (MonitorStorage) — historical data from the profiler database 3. JSONL tail — streaming fallback when neither IPC nor SQLite available

Classes

MonitorReader

MonitorReader(storage=None, jsonl_path=None, ipc_client=None)

Reads profiler data for display in the TUI.

Supports three data sources, tried in priority order:

  1. IPC — queries the running daemon via DaemonClient
  2. SQLite — reads directly from MonitorStorage
  3. JSONL — tails the NDJSON streaming log
Parameters

storage: Optional MonitorStorage instance for SQLite reads. jsonl_path: Path to the JSONL streaming file. ipc_client: Optional DaemonClient for live daemon queries.

Source code in src/marianne/tui/reader.py
def __init__(
    self,
    storage: MonitorStorage | None = None,
    jsonl_path: Path | None = None,
    ipc_client: Any | None = None,
) -> None:
    self._storage = storage
    self._jsonl_path = jsonl_path.expanduser() if jsonl_path else None
    self._ipc_client = ipc_client
    self._source: str = "none"
Attributes
source property
source

The active data source name: 'ipc', 'sqlite', 'jsonl', or 'none'.

Functions
get_latest_snapshot async
get_latest_snapshot()

Get the most recent system snapshot.

Returns None if no data is available from any source.

Source code in src/marianne/tui/reader.py
async def get_latest_snapshot(self) -> SystemSnapshot | None:
    """Get the most recent system snapshot.

    Returns None if no data is available from any source.
    """
    await self._ensure_source()

    if self._source == "ipc":
        return await self._get_latest_ipc()
    elif self._source == "sqlite":
        return await self._get_latest_sqlite()
    elif self._source == "jsonl":
        return self._get_latest_jsonl()
    return None
get_snapshots async
get_snapshots(since, limit=100)

Get snapshots since the given unix timestamp.

Source code in src/marianne/tui/reader.py
async def get_snapshots(
    self, since: float, limit: int = 100
) -> list[SystemSnapshot]:
    """Get snapshots since the given unix timestamp."""
    await self._ensure_source()

    if self._source == "sqlite" and self._storage is not None:
        return await self._storage.read_snapshots(since, limit)
    elif self._source == "ipc":
        # IPC doesn't support historical queries — return latest only
        latest = await self._get_latest_ipc()
        if latest and latest.timestamp >= since:
            return [latest]
    elif self._source == "jsonl":
        return self._read_jsonl_since(since, limit)
    return []
get_events async
get_events(since, limit=50)

Get process lifecycle events since the given unix timestamp.

Source code in src/marianne/tui/reader.py
async def get_events(
    self, since: float, limit: int = 50
) -> list[ProcessEvent]:
    """Get process lifecycle events since the given unix timestamp."""
    await self._ensure_source()

    if self._source == "sqlite" and self._storage is not None:
        return await self._storage.read_events(since, limit)
    elif self._source == "ipc" and self._ipc_client is not None:
        return await self._get_events_ipc(since, limit)
    return []
get_observer_events async
get_observer_events(job_id=None, limit=50)

Get recent observer events via IPC.

Returns observer events (file and process activity) from the ObserverRecorder's in-memory ring buffer.

Parameters:

Name Type Description Default
job_id str | None

Specific job ID, or None for all jobs.

None
limit int

Maximum number of events to return.

50

Returns:

Type Description
list[dict[str, Any]]

List of observer event dicts, newest first. Empty if no

list[dict[str, Any]]

IPC client or on error.

Source code in src/marianne/tui/reader.py
async def get_observer_events(
    self, job_id: str | None = None, limit: int = 50,
) -> list[dict[str, Any]]:
    """Get recent observer events via IPC.

    Returns observer events (file and process activity) from the
    ObserverRecorder's in-memory ring buffer.

    Args:
        job_id: Specific job ID, or None for all jobs.
        limit: Maximum number of events to return.

    Returns:
        List of observer event dicts, newest first. Empty if no
        IPC client or on error.
    """
    await self._ensure_source()
    if self._source == "ipc" and self._ipc_client is not None:
        try:
            result = await self._ipc_client.call(
                "daemon.observer_events",
                {"job_id": job_id, "limit": limit},
            )
            if result and isinstance(result, dict):
                events: list[dict[str, Any]] = result.get("events", [])
                return events
        except Exception:
            _logger.debug("reader.observer_events_failed", exc_info=True)
    return []
stream_events async
stream_events()

Yield EventBus events as they arrive from the daemon via IPC.

Source code in src/marianne/tui/reader.py
async def stream_events(self) -> AsyncIterator[dict[str, Any]]:
    """Yield EventBus events as they arrive from the daemon via IPC."""
    await self._ensure_source()
    if self._source == "ipc" and self._ipc_client is not None:
        async for event in self._ipc_client.stream("daemon.monitor.stream"):
            yield event
stream_snapshots async
stream_snapshots()

Yield snapshots as they become available.

For JSONL: tails the file, parsing new lines as they appear. For IPC: polls the daemon at a regular interval. For SQLite: polls the database at a regular interval.

Source code in src/marianne/tui/reader.py
async def stream_snapshots(self) -> AsyncIterator[SystemSnapshot]:
    """Yield snapshots as they become available.

    For JSONL: tails the file, parsing new lines as they appear.
    For IPC: polls the daemon at a regular interval.
    For SQLite: polls the database at a regular interval.
    """
    await self._ensure_source()

    if self._source == "jsonl":
        async for snapshot in self._tail_jsonl():
            yield snapshot
    elif self._source == "ipc":
        async for snapshot in self._poll_ipc():
            yield snapshot
    elif self._source == "sqlite":
        async for snapshot in self._poll_sqlite():
            yield snapshot

Functions