Skip to content

Index

tui

TUI package — Textual-based terminal UI for mzt top.

Classes

MonitorApp

MonitorApp(reader=None, refresh_interval=2.0, **kwargs)

Bases: App[None]

Real-time Marianne system monitor TUI.

Reads data from MonitorReader and renders a job-centric layout with live-updating metrics, event timeline, and drill-down detail.

Source code in src/marianne/tui/app.py
def __init__(
    self,
    reader: MonitorReader | None = None,
    refresh_interval: float = 2.0,
    **kwargs: Any,
) -> None:
    super().__init__(**kwargs)
    self._reader = reader or MonitorReader()
    self._refresh_interval = refresh_interval
    self._latest_snapshot: SystemSnapshot | None = None
    self._conductor_up: bool = False
    self._mount_time: float = 0.0
    self._stream_task: asyncio.Task[None] | None = None
Functions
compose
compose()

Build the widget tree.

Source code in src/marianne/tui/app.py
def compose(self) -> ComposeResult:
    """Build the widget tree."""
    yield Header()
    yield HeaderPanel(id="header-panel")
    yield SectionLabel("Active Jobs", id="jobs-section-label")
    yield JobsPanel(id="jobs-panel")
    yield SectionLabel("Event Timeline", id="timeline-section-label")
    yield TimelinePanel(id="timeline-panel")
    yield SectionLabel("Detail (\u2191\u2193 select, Enter drill)", id="detail-section-label")
    yield DetailPanel(id="detail-panel")
    yield Footer()
on_mount async
on_mount()

Start the event stream listener on mount.

Source code in src/marianne/tui/app.py
async def on_mount(self) -> None:
    """Start the event stream listener on mount."""
    self._mount_time = time.monotonic()
    # Initial data load
    await self.refresh_data()

    # Start background event stream listener
    self._stream_task = asyncio.create_task(
        self._run_event_stream(), name="tui-event-stream"
    )

    # Show empty detail on start
    detail = self.query_one("#detail-panel", DetailPanel)
    detail.show_empty()
on_unmount async
on_unmount()

Clean up the stream task.

Source code in src/marianne/tui/app.py
async def on_unmount(self) -> None:
    """Clean up the stream task."""
    if self._stream_task is not None:
        self._stream_task.cancel()
        try:
            await self._stream_task
        except asyncio.CancelledError:
            pass
refresh_data async
refresh_data()

Fetch latest data from the reader and update all panels.

Source code in src/marianne/tui/app.py
async def refresh_data(self) -> None:
    """Fetch latest data from the reader and update all panels."""
    try:
        snapshot = await self._reader.get_latest_snapshot()
        self._latest_snapshot = snapshot

        # Detect conductor status via IPC client
        if self._reader._ipc_client is not None:
            try:
                self._conductor_up = await self._reader._ipc_client.is_daemon_running()
            except Exception:
                self._conductor_up = False
        else:
            self._conductor_up = snapshot is not None

        uptime = 0.0
        if snapshot is not None:
            uptime = snapshot.conductor_uptime_seconds

        header = self.query_one("#header-panel", HeaderPanel)
        header.update_data(
            snapshot=snapshot,
            conductor_up=self._conductor_up,
            uptime_seconds=uptime,
        )

        # Update timeline with recent events (only on initial load/refresh)
        # Normal updates come through the event stream
        since = time.time() - 300.0
        events = await self._reader.get_events(since, limit=50)
        observer_events = await self._reader.get_observer_events(limit=50)

        observer_file_events = [
            e for e in observer_events
            if e.get("event", "").startswith("observer.file_")
        ]

        jobs = self.query_one("#jobs-panel", JobsPanel)
        jobs.update_data(snapshot, observer_file_events=observer_file_events)

        timeline = self.query_one("#timeline-panel", TimelinePanel)
        timeline.update_data(events=events, observer_events=observer_events)

    except Exception:
        _logger.debug("refresh_data failed", exc_info=True)
action_navigate_down
action_navigate_down()

Move selection down in the jobs panel.

Source code in src/marianne/tui/app.py
def action_navigate_down(self) -> None:
    """Move selection down in the jobs panel."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    jobs.select_next()
    self._update_detail()
action_navigate_up
action_navigate_up()

Move selection up in the jobs panel.

Source code in src/marianne/tui/app.py
def action_navigate_up(self) -> None:
    """Move selection up in the jobs panel."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    jobs.select_prev()
    self._update_detail()
action_drill_down
action_drill_down()

Show detail for the selected item.

Source code in src/marianne/tui/app.py
def action_drill_down(self) -> None:
    """Show detail for the selected item."""
    self._update_detail()
action_cycle_sort
action_cycle_sort()

Cycle sort order: ID -> CPU -> MEM.

Source code in src/marianne/tui/app.py
def action_cycle_sort(self) -> None:
    """Cycle sort order: ID -> CPU -> MEM."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    current = jobs.sort_key
    if current == "job_id":
        new_key = "cpu"
    elif current == "cpu":
        new_key = "mem"
    else:
        new_key = "job_id"

    jobs.sort_key = new_key
    self.notify(f"Sorting by: {new_key.upper()}")
action_filter_job
action_filter_job()

Toggle filter input.

Source code in src/marianne/tui/app.py
def action_filter_job(self) -> None:
    """Toggle filter input."""
    # Simple implementation for now - clear filter if present, else notify
    jobs = self.query_one("#jobs-panel", JobsPanel)
    if jobs.filter_query:
        jobs.filter_query = ""
        self.notify("Filter cleared")
    else:
        self.notify("Job filtering enabled (via command line for now)")
action_cancel_job
action_cancel_job()

Cancel the selected job.

Source code in src/marianne/tui/app.py
def action_cancel_job(self) -> None:
    """Cancel the selected job."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    selected = jobs.selected_item
    if selected and selected.get("type") == "job":
        job_id = selected["job_id"]
        self.notify(f"Cancelling job: {job_id}...")
        # Use background task to avoid blocking TUI
        if self._reader._ipc_client is not None:
            asyncio.create_task(self._reader._ipc_client.cancel_job(job_id, ""))
        else:
            self.notify("Not connected to conductor", severity="error")
    else:
        self.notify("Select a job root node to cancel", severity="warning")

MonitorReader

MonitorReader(storage=None, jsonl_path=None, ipc_client=None)

Reads profiler data for display in the TUI.

Supports three data sources, tried in priority order:

  1. IPC — queries the running daemon via DaemonClient
  2. SQLite — reads directly from MonitorStorage
  3. JSONL — tails the NDJSON streaming log
Parameters

storage: Optional MonitorStorage instance for SQLite reads. jsonl_path: Path to the JSONL streaming file. ipc_client: Optional DaemonClient for live daemon queries.

Source code in src/marianne/tui/reader.py
def __init__(
    self,
    storage: MonitorStorage | None = None,
    jsonl_path: Path | None = None,
    ipc_client: Any | None = None,
) -> None:
    self._storage = storage
    self._jsonl_path = jsonl_path.expanduser() if jsonl_path else None
    self._ipc_client = ipc_client
    self._source: str = "none"
Attributes
source property
source

The active data source name: 'ipc', 'sqlite', 'jsonl', or 'none'.

Functions
get_latest_snapshot async
get_latest_snapshot()

Get the most recent system snapshot.

Returns None if no data is available from any source.

Source code in src/marianne/tui/reader.py
async def get_latest_snapshot(self) -> SystemSnapshot | None:
    """Get the most recent system snapshot.

    Returns None if no data is available from any source.
    """
    await self._ensure_source()

    if self._source == "ipc":
        return await self._get_latest_ipc()
    elif self._source == "sqlite":
        return await self._get_latest_sqlite()
    elif self._source == "jsonl":
        return self._get_latest_jsonl()
    return None
get_snapshots async
get_snapshots(since, limit=100)

Get snapshots since the given unix timestamp.

Source code in src/marianne/tui/reader.py
async def get_snapshots(
    self, since: float, limit: int = 100
) -> list[SystemSnapshot]:
    """Get snapshots since the given unix timestamp."""
    await self._ensure_source()

    if self._source == "sqlite" and self._storage is not None:
        return await self._storage.read_snapshots(since, limit)
    elif self._source == "ipc":
        # IPC doesn't support historical queries — return latest only
        latest = await self._get_latest_ipc()
        if latest and latest.timestamp >= since:
            return [latest]
    elif self._source == "jsonl":
        return self._read_jsonl_since(since, limit)
    return []
get_events async
get_events(since, limit=50)

Get process lifecycle events since the given unix timestamp.

Source code in src/marianne/tui/reader.py
async def get_events(
    self, since: float, limit: int = 50
) -> list[ProcessEvent]:
    """Get process lifecycle events since the given unix timestamp."""
    await self._ensure_source()

    if self._source == "sqlite" and self._storage is not None:
        return await self._storage.read_events(since, limit)
    elif self._source == "ipc" and self._ipc_client is not None:
        return await self._get_events_ipc(since, limit)
    return []
get_observer_events async
get_observer_events(job_id=None, limit=50)

Get recent observer events via IPC.

Returns observer events (file and process activity) from the ObserverRecorder's in-memory ring buffer.

Parameters:

Name Type Description Default
job_id str | None

Specific job ID, or None for all jobs.

None
limit int

Maximum number of events to return.

50

Returns:

Type Description
list[dict[str, Any]]

List of observer event dicts, newest first. Empty if no

list[dict[str, Any]]

IPC client or on error.

Source code in src/marianne/tui/reader.py
async def get_observer_events(
    self, job_id: str | None = None, limit: int = 50,
) -> list[dict[str, Any]]:
    """Get recent observer events via IPC.

    Returns observer events (file and process activity) from the
    ObserverRecorder's in-memory ring buffer.

    Args:
        job_id: Specific job ID, or None for all jobs.
        limit: Maximum number of events to return.

    Returns:
        List of observer event dicts, newest first. Empty if no
        IPC client or on error.
    """
    await self._ensure_source()
    if self._source == "ipc" and self._ipc_client is not None:
        try:
            result = await self._ipc_client.call(
                "daemon.observer_events",
                {"job_id": job_id, "limit": limit},
            )
            if result and isinstance(result, dict):
                events: list[dict[str, Any]] = result.get("events", [])
                return events
        except Exception:
            _logger.debug("reader.observer_events_failed", exc_info=True)
    return []
stream_events async
stream_events()

Yield EventBus events as they arrive from the daemon via IPC.

Source code in src/marianne/tui/reader.py
async def stream_events(self) -> AsyncIterator[dict[str, Any]]:
    """Yield EventBus events as they arrive from the daemon via IPC."""
    await self._ensure_source()
    if self._source == "ipc" and self._ipc_client is not None:
        async for event in self._ipc_client.stream("daemon.monitor.stream"):
            yield event
stream_snapshots async
stream_snapshots()

Yield snapshots as they become available.

For JSONL: tails the file, parsing new lines as they appear. For IPC: polls the daemon at a regular interval. For SQLite: polls the database at a regular interval.

Source code in src/marianne/tui/reader.py
async def stream_snapshots(self) -> AsyncIterator[SystemSnapshot]:
    """Yield snapshots as they become available.

    For JSONL: tails the file, parsing new lines as they appear.
    For IPC: polls the daemon at a regular interval.
    For SQLite: polls the database at a regular interval.
    """
    await self._ensure_source()

    if self._source == "jsonl":
        async for snapshot in self._tail_jsonl():
            yield snapshot
    elif self._source == "ipc":
        async for snapshot in self._poll_ipc():
            yield snapshot
    elif self._source == "sqlite":
        async for snapshot in self._poll_sqlite():
            yield snapshot