Textual TUI application for mzt top — real-time system monitor.
Provides MonitorApp, a Textual App subclass with a job-centric layout
matching the design document: header bar, jobs panel, event timeline, and
detail drill-down panel.
Usage
app = MonitorApp(reader=reader)
app.run()
Classes
SectionLabel
Bases: Static
A styled section divider label.
MonitorApp
MonitorApp(reader=None, refresh_interval=2.0, **kwargs)
Bases: App[None]
Real-time Marianne system monitor TUI.
Reads data from MonitorReader and renders a job-centric layout
with live-updating metrics, event timeline, and drill-down detail.
Source code in src/marianne/tui/app.py
| def __init__(
self,
reader: MonitorReader | None = None,
refresh_interval: float = 2.0,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self._reader = reader or MonitorReader()
self._refresh_interval = refresh_interval
self._latest_snapshot: SystemSnapshot | None = None
self._conductor_up: bool = False
self._mount_time: float = 0.0
self._stream_task: asyncio.Task[None] | None = None
|
Functions
compose
Build the widget tree.
Source code in src/marianne/tui/app.py
| def compose(self) -> ComposeResult:
"""Build the widget tree."""
yield Header()
yield HeaderPanel(id="header-panel")
yield SectionLabel("Active Jobs", id="jobs-section-label")
yield JobsPanel(id="jobs-panel")
yield SectionLabel("Event Timeline", id="timeline-section-label")
yield TimelinePanel(id="timeline-panel")
yield SectionLabel("Detail (\u2191\u2193 select, Enter drill)", id="detail-section-label")
yield DetailPanel(id="detail-panel")
yield Footer()
|
on_mount
async
Start the event stream listener on mount.
Source code in src/marianne/tui/app.py
| async def on_mount(self) -> None:
"""Start the event stream listener on mount."""
self._mount_time = time.monotonic()
# Initial data load
await self.refresh_data()
# Start background event stream listener
self._stream_task = asyncio.create_task(
self._run_event_stream(), name="tui-event-stream"
)
# Show empty detail on start
detail = self.query_one("#detail-panel", DetailPanel)
detail.show_empty()
|
on_unmount
async
Clean up the stream task.
Source code in src/marianne/tui/app.py
| async def on_unmount(self) -> None:
"""Clean up the stream task."""
if self._stream_task is not None:
self._stream_task.cancel()
try:
await self._stream_task
except asyncio.CancelledError:
pass
|
refresh_data
async
Fetch latest data from the reader and update all panels.
Source code in src/marianne/tui/app.py
| async def refresh_data(self) -> None:
"""Fetch latest data from the reader and update all panels."""
try:
snapshot = await self._reader.get_latest_snapshot()
self._latest_snapshot = snapshot
# Detect conductor status via IPC client
if self._reader._ipc_client is not None:
try:
self._conductor_up = await self._reader._ipc_client.is_daemon_running()
except Exception:
self._conductor_up = False
else:
self._conductor_up = snapshot is not None
uptime = 0.0
if snapshot is not None:
uptime = snapshot.conductor_uptime_seconds
header = self.query_one("#header-panel", HeaderPanel)
header.update_data(
snapshot=snapshot,
conductor_up=self._conductor_up,
uptime_seconds=uptime,
)
# Update timeline with recent events (only on initial load/refresh)
# Normal updates come through the event stream
since = time.time() - 300.0
events = await self._reader.get_events(since, limit=50)
observer_events = await self._reader.get_observer_events(limit=50)
observer_file_events = [
e for e in observer_events
if e.get("event", "").startswith("observer.file_")
]
jobs = self.query_one("#jobs-panel", JobsPanel)
jobs.update_data(snapshot, observer_file_events=observer_file_events)
timeline = self.query_one("#timeline-panel", TimelinePanel)
timeline.update_data(events=events, observer_events=observer_events)
except Exception:
_logger.debug("refresh_data failed", exc_info=True)
|
action_navigate_down
Move selection down in the jobs panel.
Source code in src/marianne/tui/app.py
| def action_navigate_down(self) -> None:
"""Move selection down in the jobs panel."""
jobs = self.query_one("#jobs-panel", JobsPanel)
jobs.select_next()
self._update_detail()
|
action_navigate_up
Move selection up in the jobs panel.
Source code in src/marianne/tui/app.py
| def action_navigate_up(self) -> None:
"""Move selection up in the jobs panel."""
jobs = self.query_one("#jobs-panel", JobsPanel)
jobs.select_prev()
self._update_detail()
|
action_drill_down
Show detail for the selected item.
Source code in src/marianne/tui/app.py
| def action_drill_down(self) -> None:
"""Show detail for the selected item."""
self._update_detail()
|
action_cycle_sort
Cycle sort order: ID -> CPU -> MEM.
Source code in src/marianne/tui/app.py
| def action_cycle_sort(self) -> None:
"""Cycle sort order: ID -> CPU -> MEM."""
jobs = self.query_one("#jobs-panel", JobsPanel)
current = jobs.sort_key
if current == "job_id":
new_key = "cpu"
elif current == "cpu":
new_key = "mem"
else:
new_key = "job_id"
jobs.sort_key = new_key
self.notify(f"Sorting by: {new_key.upper()}")
|
action_filter_job
Toggle filter input.
Source code in src/marianne/tui/app.py
| def action_filter_job(self) -> None:
"""Toggle filter input."""
# Simple implementation for now - clear filter if present, else notify
jobs = self.query_one("#jobs-panel", JobsPanel)
if jobs.filter_query:
jobs.filter_query = ""
self.notify("Filter cleared")
else:
self.notify("Job filtering enabled (via command line for now)")
|
action_cancel_job
Cancel the selected job.
Source code in src/marianne/tui/app.py
| def action_cancel_job(self) -> None:
"""Cancel the selected job."""
jobs = self.query_one("#jobs-panel", JobsPanel)
selected = jobs.selected_item
if selected and selected.get("type") == "job":
job_id = selected["job_id"]
self.notify(f"Cancelling job: {job_id}...")
# Use background task to avoid blocking TUI
if self._reader._ipc_client is not None:
asyncio.create_task(self._reader._ipc_client.cancel_job(job_id, ""))
else:
self.notify("Not connected to conductor", severity="error")
else:
self.notify("Select a job root node to cancel", severity="warning")
|
Functions