Skip to content

app

app

Textual TUI application for mzt top — real-time system monitor.

Provides MonitorApp, a Textual App subclass with a job-centric layout matching the design document: header bar, jobs panel, event timeline, and detail drill-down panel.

Usage

app = MonitorApp(reader=reader) app.run()

Classes

SectionLabel

Bases: Static

A styled section divider label.

MonitorApp

MonitorApp(reader=None, refresh_interval=2.0, **kwargs)

Bases: App[None]

Real-time Marianne system monitor TUI.

Reads data from MonitorReader and renders a job-centric layout with live-updating metrics, event timeline, and drill-down detail.

Source code in src/marianne/tui/app.py
def __init__(
    self,
    reader: MonitorReader | None = None,
    refresh_interval: float = 2.0,
    **kwargs: Any,
) -> None:
    super().__init__(**kwargs)
    self._reader = reader or MonitorReader()
    self._refresh_interval = refresh_interval
    self._latest_snapshot: SystemSnapshot | None = None
    self._conductor_up: bool = False
    self._mount_time: float = 0.0
    self._stream_task: asyncio.Task[None] | None = None
Functions
compose
compose()

Build the widget tree.

Source code in src/marianne/tui/app.py
def compose(self) -> ComposeResult:
    """Build the widget tree."""
    yield Header()
    yield HeaderPanel(id="header-panel")
    yield SectionLabel("Active Jobs", id="jobs-section-label")
    yield JobsPanel(id="jobs-panel")
    yield SectionLabel("Event Timeline", id="timeline-section-label")
    yield TimelinePanel(id="timeline-panel")
    yield SectionLabel("Detail (\u2191\u2193 select, Enter drill)", id="detail-section-label")
    yield DetailPanel(id="detail-panel")
    yield Footer()
on_mount async
on_mount()

Start the event stream listener on mount.

Source code in src/marianne/tui/app.py
async def on_mount(self) -> None:
    """Start the event stream listener on mount."""
    self._mount_time = time.monotonic()
    # Initial data load
    await self.refresh_data()

    # Start background event stream listener
    self._stream_task = asyncio.create_task(
        self._run_event_stream(), name="tui-event-stream"
    )

    # Show empty detail on start
    detail = self.query_one("#detail-panel", DetailPanel)
    detail.show_empty()
on_unmount async
on_unmount()

Clean up the stream task.

Source code in src/marianne/tui/app.py
async def on_unmount(self) -> None:
    """Clean up the stream task."""
    if self._stream_task is not None:
        self._stream_task.cancel()
        try:
            await self._stream_task
        except asyncio.CancelledError:
            pass
refresh_data async
refresh_data()

Fetch latest data from the reader and update all panels.

Source code in src/marianne/tui/app.py
async def refresh_data(self) -> None:
    """Fetch latest data from the reader and update all panels."""
    try:
        snapshot = await self._reader.get_latest_snapshot()
        self._latest_snapshot = snapshot

        # Detect conductor status via IPC client
        if self._reader._ipc_client is not None:
            try:
                self._conductor_up = await self._reader._ipc_client.is_daemon_running()
            except Exception:
                self._conductor_up = False
        else:
            self._conductor_up = snapshot is not None

        uptime = 0.0
        if snapshot is not None:
            uptime = snapshot.conductor_uptime_seconds

        header = self.query_one("#header-panel", HeaderPanel)
        header.update_data(
            snapshot=snapshot,
            conductor_up=self._conductor_up,
            uptime_seconds=uptime,
        )

        # Update timeline with recent events (only on initial load/refresh)
        # Normal updates come through the event stream
        since = time.time() - 300.0
        events = await self._reader.get_events(since, limit=50)
        observer_events = await self._reader.get_observer_events(limit=50)

        observer_file_events = [
            e for e in observer_events
            if e.get("event", "").startswith("observer.file_")
        ]

        jobs = self.query_one("#jobs-panel", JobsPanel)
        jobs.update_data(snapshot, observer_file_events=observer_file_events)

        timeline = self.query_one("#timeline-panel", TimelinePanel)
        timeline.update_data(events=events, observer_events=observer_events)

    except Exception:
        _logger.debug("refresh_data failed", exc_info=True)
action_navigate_down
action_navigate_down()

Move selection down in the jobs panel.

Source code in src/marianne/tui/app.py
def action_navigate_down(self) -> None:
    """Move selection down in the jobs panel."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    jobs.select_next()
    self._update_detail()
action_navigate_up
action_navigate_up()

Move selection up in the jobs panel.

Source code in src/marianne/tui/app.py
def action_navigate_up(self) -> None:
    """Move selection up in the jobs panel."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    jobs.select_prev()
    self._update_detail()
action_drill_down
action_drill_down()

Show detail for the selected item.

Source code in src/marianne/tui/app.py
def action_drill_down(self) -> None:
    """Show detail for the selected item."""
    self._update_detail()
action_cycle_sort
action_cycle_sort()

Cycle sort order: ID -> CPU -> MEM.

Source code in src/marianne/tui/app.py
def action_cycle_sort(self) -> None:
    """Cycle sort order: ID -> CPU -> MEM."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    current = jobs.sort_key
    if current == "job_id":
        new_key = "cpu"
    elif current == "cpu":
        new_key = "mem"
    else:
        new_key = "job_id"

    jobs.sort_key = new_key
    self.notify(f"Sorting by: {new_key.upper()}")
action_filter_job
action_filter_job()

Toggle filter input.

Source code in src/marianne/tui/app.py
def action_filter_job(self) -> None:
    """Toggle filter input."""
    # Simple implementation for now - clear filter if present, else notify
    jobs = self.query_one("#jobs-panel", JobsPanel)
    if jobs.filter_query:
        jobs.filter_query = ""
        self.notify("Filter cleared")
    else:
        self.notify("Job filtering enabled (via command line for now)")
action_cancel_job
action_cancel_job()

Cancel the selected job.

Source code in src/marianne/tui/app.py
def action_cancel_job(self) -> None:
    """Cancel the selected job."""
    jobs = self.query_one("#jobs-panel", JobsPanel)
    selected = jobs.selected_item
    if selected and selected.get("type") == "job":
        job_id = selected["job_id"]
        self.notify(f"Cancelling job: {job_id}...")
        # Use background task to avoid blocking TUI
        if self._reader._ipc_client is not None:
            asyncio.create_task(self._reader._ipc_client.cancel_job(job_id, ""))
        else:
            self.notify("Not connected to conductor", severity="error")
    else:
        self.notify("Select a job root node to cancel", severity="warning")

Functions