Skip to content

Index

profiler

Profiler package — system resource collection, storage, and anomaly detection.

Collects per-process metrics, GPU stats, strace summaries, and stores time-series data in SQLite for consumption by mzt top and the anomaly/correlation analyzers.

Classes

AnomalyDetector

AnomalyDetector(config=None)

Detects resource anomalies by comparing snapshots against thresholds.

Runs on each new snapshot collected by ProfilerCollector. Stateless except for the configuration — all history is passed in via the detect method.

Source code in src/marianne/daemon/profiler/anomaly.py
def __init__(self, config: AnomalyConfig | None = None) -> None:
    self.config = config or AnomalyConfig()
Functions
detect
detect(current, history)

Run all anomaly checks against current snapshot and history.

Parameters:

Name Type Description Default
current SystemSnapshot

The most recent system snapshot.

required
history list[SystemSnapshot]

Recent snapshots (oldest-first) for trend analysis. Should cover at least the configured spike window.

required

Returns:

Type Description
list[Anomaly]

List of detected Anomaly objects (may be empty).

Source code in src/marianne/daemon/profiler/anomaly.py
def detect(
    self,
    current: SystemSnapshot,
    history: list[SystemSnapshot],
) -> list[Anomaly]:
    """Run all anomaly checks against *current* snapshot and *history*.

    Args:
        current: The most recent system snapshot.
        history: Recent snapshots (oldest-first) for trend analysis.
                 Should cover at least the configured spike window.

    Returns:
        List of detected ``Anomaly`` objects (may be empty).
    """
    anomalies: list[Anomaly] = []
    anomalies.extend(self._check_memory_spikes(current, history))
    anomalies.extend(self._check_runaway_processes(current, history))
    anomalies.extend(self._check_zombies(current))
    anomalies.extend(self._check_fd_exhaustion(current))
    anomalies.extend(self._check_memory_pressure(current))
    return anomalies

ProfilerCollector

ProfilerCollector(config, monitor, pgroup, event_bus, manager=None)

Central orchestrator for the daemon profiler subsystem.

Coordinates: - Periodic metric collection (system + per-process + GPU + strace) - SQLite + JSONL persistence via MonitorStorage - Heuristic anomaly detection via AnomalyDetector - EventBus integration for monitor.anomaly events - Process lifecycle tracking via sheet.started/completed/failed

Parameters

config: Profiler configuration (interval, storage paths, thresholds). monitor: The daemon's ResourceMonitor for system-level metrics. pgroup: The daemon's ProcessGroupManager for child process enumeration. event_bus: The daemon's EventBus for publishing anomaly events and subscribing to sheet lifecycle events. manager: Optional JobManager for mapping PIDs to job_id/sheet_num and reading running job / active sheet counts.

Source code in src/marianne/daemon/profiler/collector.py
def __init__(
    self,
    config: ProfilerConfig,
    monitor: ResourceMonitor,
    pgroup: ProcessGroupManager,
    event_bus: EventBus,
    manager: JobManager | None = None,
) -> None:
    self._config = config
    self._monitor = monitor
    self._pgroup = pgroup
    self._event_bus = event_bus
    self._manager = manager

    # Storage
    self._storage = MonitorStorage(
        db_path=config.storage_path.expanduser(),
        jsonl_path=config.jsonl_path.expanduser(),
        jsonl_max_bytes=config.jsonl_max_bytes,
    )

    # Strace management
    self._strace = StraceManager(enabled=config.strace_enabled)

    # Anomaly detection
    self._anomaly_detector = AnomalyDetector(config=config.anomaly)

    # State
    self._running = False
    self._loop_task: asyncio.Task[None] | None = None
    self._history: list[SystemSnapshot] = []
    self._latest_snapshot: SystemSnapshot | None = None
    self._known_pids: set[int] = set()
    self._sub_ids: list[str] = []
    self._recent_events: list[ProcessEvent] = []
    self._max_recent_events = 200
    # F-488: monotonic timestamp of the last retention cleanup.
    # Initialized to 0 so the first loop iteration runs cleanup
    # promptly on daemon startup (catches up on any backlog from
    # periods when the daemon wasn't running).
    self._last_cleanup_monotonic: float = 0.0
Functions
start async
start()

Initialize storage, subscribe to events, start collection loop.

Source code in src/marianne/daemon/profiler/collector.py
async def start(self) -> None:
    """Initialize storage, subscribe to events, start collection loop."""
    if self._running:
        return

    if not self._config.enabled:
        _logger.info("profiler.disabled")
        return

    await self._storage.initialize()

    # Subscribe to sheet lifecycle events
    self._sub_ids.append(
        self._event_bus.subscribe(
            self._on_sheet_started,
            event_filter=lambda e: e.get("event") == "sheet.started",
        )
    )
    self._sub_ids.append(
        self._event_bus.subscribe(
            self._on_sheet_completed,
            event_filter=lambda e: e.get("event") in (
                "sheet.completed", "sheet.failed"
            ),
        )
    )

    self._running = True
    self._loop_task = asyncio.create_task(
        self._collection_loop(), name="profiler-collection-loop"
    )
    _logger.info(
        "profiler.started",
        interval=self._config.interval_seconds,
        strace=self._strace.enabled,
        gpu=self._config.gpu_enabled,
    )
stop async
stop()

Stop collection loop, detach strace, unsubscribe from events.

Source code in src/marianne/daemon/profiler/collector.py
async def stop(self) -> None:
    """Stop collection loop, detach strace, unsubscribe from events."""
    self._running = False

    if self._loop_task is not None:
        self._loop_task.cancel()
        try:
            await self._loop_task
        except asyncio.CancelledError:
            pass
        self._loop_task = None

    # Detach all strace processes
    await self._strace.detach_all()

    # Unsubscribe from event bus
    for sub_id in self._sub_ids:
        self._event_bus.unsubscribe(sub_id)
    self._sub_ids.clear()

    # Close pooled storage connection
    await self._storage.close()

    _logger.info("profiler.stopped")
collect_snapshot async
collect_snapshot()

Gather all metrics into a single SystemSnapshot.

Steps: 1. System memory via SystemProbe 2. Per-process metrics via psutil (with PID → job mapping) 3. GPU metrics via GpuProbe 4. Load average via os.getloadavg() 5. Strace summaries for attached PIDs 6. Pressure level from BackpressureController 7. Running jobs / active sheets from JobManager

Source code in src/marianne/daemon/profiler/collector.py
async def collect_snapshot(self) -> SystemSnapshot:
    """Gather all metrics into a single SystemSnapshot.

    Steps:
    1. System memory via SystemProbe
    2. Per-process metrics via psutil (with PID → job mapping)
    3. GPU metrics via GpuProbe
    4. Load average via os.getloadavg()
    5. Strace summaries for attached PIDs
    6. Pressure level from BackpressureController
    7. Running jobs / active sheets from JobManager
    """
    now = time.time()
    daemon_pid = os.getpid()

    # 1. System memory
    system_total = 0.0
    system_available = 0.0
    system_used = 0.0
    daemon_rss = SystemProbe.get_memory_mb() or 0.0

    try:
        system_total, system_available, system_used = self._get_system_memory()
    except Exception:
        _logger.debug("profiler.system_memory_probe_failed", exc_info=True)

    # 2. Per-process metrics
    processes, zombie_count, zombie_pids = await self._collect_process_metrics()

    # 3. GPU metrics
    gpus_raw: list[Any] = []
    if self._config.gpu_enabled:
        try:
            gpus_raw = await GpuProbe.get_gpu_metrics_async()
        except Exception:
            _logger.debug("profiler.gpu_probe_failed", exc_info=True)

    # Convert gpu_probe.GpuMetric (dataclass) → profiler.models.GpuMetric (Pydantic)
    from marianne.daemon.profiler.models import GpuMetric as PydanticGpuMetric
    gpus = [
        PydanticGpuMetric(
            index=g.index,
            utilization_pct=g.utilization_pct,
            memory_used_mb=g.memory_used_mb,
            memory_total_mb=g.memory_total_mb,
            temperature_c=g.temperature_c,
        )
        for g in gpus_raw
    ]

    # 4. Load average
    load1 = load5 = load15 = 0.0
    try:
        load1, load5, load15 = os.getloadavg()
    except OSError:
        pass

    # 5. Strace summaries — merge into process metrics
    for proc in processes:
        if proc.pid in self._strace.attached_pids:
            # Don't detach — just note it's being traced.
            # The summary is collected on detach (process exit).
            pass

    # 6. Pressure level
    pressure_level = "none"
    if self._manager is not None:
        try:
            pressure_level = self._manager.backpressure.current_level().value
        except Exception:
            _logger.debug("profiler.pressure_level_failed", exc_info=True)

    # 7. Running jobs / active sheets
    running_jobs = 0
    active_sheets = 0
    if self._manager is not None:
        running_jobs = self._manager.running_count
        active_sheets = self._manager.active_job_count

    # 8. Per-job progress from live CheckpointState
    job_progress_list: list[JobProgress] = []
    if self._manager is not None:
        for jid, state in self._manager._live_states.items():
            job_progress_list.append(
                JobProgress(
                    job_id=jid,
                    total_sheets=getattr(state, "total_sheets", 1),
                    last_completed_sheet=getattr(state, "last_completed_sheet", 0),
                    current_sheet=getattr(state, "current_sheet", None),
                    status=getattr(state, "status", "unknown"),
                )
            )

    # 9. Conductor uptime
    conductor_uptime = 0.0
    if self._manager is not None:
        conductor_uptime = self._manager.uptime_seconds

    snapshot = SystemSnapshot(
        timestamp=now,
        daemon_pid=daemon_pid,
        system_memory_total_mb=system_total,
        system_memory_available_mb=system_available,
        system_memory_used_mb=system_used,
        daemon_rss_mb=daemon_rss,
        load_avg_1=load1,
        load_avg_5=load5,
        load_avg_15=load15,
        processes=processes,
        gpus=gpus,
        pressure_level=pressure_level,
        running_jobs=running_jobs,
        active_sheets=active_sheets,
        zombie_count=zombie_count,
        zombie_pids=zombie_pids,
        job_progress=job_progress_list,
        conductor_uptime_seconds=conductor_uptime,
    )

    self._latest_snapshot = snapshot
    return snapshot
get_resource_context_for_pid
get_resource_context_for_pid(pid)

Get current resource context for a specific PID.

Returns a dict suitable for embedding in sheet event data: rss_mb, cpu_pct, syscall_hotspot, anomalies_active.

If the PID is not found in the latest snapshot, returns a dict with all values set to None/empty.

Source code in src/marianne/daemon/profiler/collector.py
def get_resource_context_for_pid(self, pid: int) -> dict[str, Any]:
    """Get current resource context for a specific PID.

    Returns a dict suitable for embedding in sheet event data:
    ``rss_mb``, ``cpu_pct``, ``syscall_hotspot``, ``anomalies_active``.

    If the PID is not found in the latest snapshot, returns a dict
    with all values set to None/empty.
    """
    context: dict[str, Any] = {
        "rss_mb": None,
        "cpu_pct": None,
        "syscall_hotspot": None,
        "anomalies_active": [],
    }

    snapshot = self._latest_snapshot
    if snapshot is None:
        return context

    # Add daemon-level RSS as fallback context
    context["daemon_rss_mb"] = snapshot.daemon_rss_mb

    # Find the process in the latest snapshot
    proc_match: ProcessMetric | None = None
    for proc in snapshot.processes:
        if proc.pid == pid:
            proc_match = proc
            break

    if proc_match is not None:
        context["rss_mb"] = round(proc_match.rss_mb, 1)
        context["cpu_pct"] = round(proc_match.cpu_percent, 1)

        # Build syscall hotspot summary from time percentages
        if proc_match.syscall_time_pct:
            top_syscall = max(
                proc_match.syscall_time_pct.items(),
                key=lambda x: x[1],
            )
            context["syscall_hotspot"] = (
                f"{top_syscall[0]} {top_syscall[1]:.0%}"
            )

    # Collect active anomalies from the most recent detection cycle.
    # We check the anomaly detector against the latest snapshot and history.
    try:
        anomalies = self._anomaly_detector.detect(snapshot, self._history)
        context["anomalies_active"] = [
            a.anomaly_type.value for a in anomalies
        ]
    except Exception:
        _logger.debug("profiler.anomaly_check_for_context_failed", exc_info=True)

    return context
get_resource_context
get_resource_context()

Get general resource context (not PID-specific).

Useful when no specific PID is available for the event.

Source code in src/marianne/daemon/profiler/collector.py
def get_resource_context(self) -> dict[str, Any]:
    """Get general resource context (not PID-specific).

    Useful when no specific PID is available for the event.
    """
    context: dict[str, Any] = {
        "rss_mb": None,
        "cpu_pct": None,
        "syscall_hotspot": None,
        "anomalies_active": [],
    }

    snapshot = self._latest_snapshot
    if snapshot is None:
        return context

    context["daemon_rss_mb"] = snapshot.daemon_rss_mb
    context["pressure_level"] = snapshot.pressure_level
    context["running_jobs"] = snapshot.running_jobs
    context["active_sheets"] = snapshot.active_sheets

    try:
        anomalies = self._anomaly_detector.detect(snapshot, self._history)
        context["anomalies_active"] = [
            a.anomaly_type.value for a in anomalies
        ]
    except Exception:
        _logger.debug("profiler.anomaly_check_for_context_failed", exc_info=True)

    return context
get_latest_snapshot
get_latest_snapshot()

Return the latest snapshot as a JSON-serializable dict.

Used by the daemon.top IPC method.

Source code in src/marianne/daemon/profiler/collector.py
def get_latest_snapshot(self) -> dict[str, Any] | None:
    """Return the latest snapshot as a JSON-serializable dict.

    Used by the ``daemon.top`` IPC method.
    """
    if self._latest_snapshot is None:
        return None
    return self._latest_snapshot.model_dump(mode="json")
get_jsonl_path
get_jsonl_path()

Return the JSONL streaming log path.

Used by the daemon.top.stream IPC method.

Source code in src/marianne/daemon/profiler/collector.py
def get_jsonl_path(self) -> str | None:
    """Return the JSONL streaming log path.

    Used by the ``daemon.top.stream`` IPC method.
    """
    if self._config.jsonl_path:
        return str(self._config.jsonl_path.expanduser())
    return None
get_recent_events
get_recent_events(limit=50)

Return recent process events as JSON-serializable dicts.

Used by the daemon.events IPC method.

Source code in src/marianne/daemon/profiler/collector.py
def get_recent_events(self, limit: int = 50) -> list[dict[str, Any]]:
    """Return recent process events as JSON-serializable dicts.

    Used by the ``daemon.events`` IPC method.
    """
    events = self._recent_events[-limit:]
    return [e.model_dump(mode="json") for e in events]

CorrelationAnalyzer

CorrelationAnalyzer(storage, learning_hub, config=None)

Periodic statistical analysis of resource usage vs. job outcomes.

Cross-references profiler snapshots (peak memory, CPU, syscall distributions, anomalies) with job success/failure outcomes from the learning store to identify predictive patterns.

Lifecycle::

analyzer = CorrelationAnalyzer(storage, learning_hub, config)
await analyzer.start(event_bus)
# ... periodic analysis runs automatically ...
await analyzer.stop()
Source code in src/marianne/daemon/profiler/correlation.py
def __init__(
    self,
    storage: MonitorStorage,
    learning_hub: LearningHub,
    config: CorrelationConfig | None = None,
) -> None:
    self._storage = storage
    self._learning_hub = learning_hub
    self._config = config or CorrelationConfig()
    self._running = False
    self._loop_task: asyncio.Task[None] | None = None
Functions
start async
start(event_bus)

Start the periodic analysis loop.

The event_bus parameter is accepted for interface consistency with other daemon components but is not currently used by the correlation analyzer (it reads from storage, not events).

Source code in src/marianne/daemon/profiler/correlation.py
async def start(self, event_bus: EventBus) -> None:
    """Start the periodic analysis loop.

    The event_bus parameter is accepted for interface consistency
    with other daemon components but is not currently used by the
    correlation analyzer (it reads from storage, not events).
    """
    if self._running:
        return

    self._running = True
    self._loop_task = asyncio.create_task(
        self._analysis_loop(), name="correlation-analysis-loop"
    )
    _logger.info(
        "correlation_analyzer.started",
        interval_minutes=self._config.interval_minutes,
        min_sample_size=self._config.min_sample_size,
    )
stop async
stop()

Stop the periodic analysis loop.

Source code in src/marianne/daemon/profiler/correlation.py
async def stop(self) -> None:
    """Stop the periodic analysis loop."""
    self._running = False

    if self._loop_task is not None:
        self._loop_task.cancel()
        try:
            await self._loop_task
        except asyncio.CancelledError:
            pass
        self._loop_task = None

    _logger.info("correlation_analyzer.stopped")
analyze async
analyze()

Run correlation analysis on completed jobs.

Steps: 1. Query completed jobs from storage (last 7 days) 2. For each job: get peak memory, total CPU, syscall distribution 3. Cross-reference with job outcomes from learning store 4. Statistical analysis: - Memory vs failure rate (binned histogram) - Syscall hotspots vs failure rate - Anomaly presence vs failure rate - Execution duration vs failure rate 5. Generate RESOURCE_CORRELATION patterns for confidence > 0.6 6. Store in LearningHub

Returns:

Type Description
list[dict[str, Any]]

List of generated correlation dicts (for testing/logging).

Source code in src/marianne/daemon/profiler/correlation.py
async def analyze(self) -> list[dict[str, Any]]:
    """Run correlation analysis on completed jobs.

    Steps:
    1. Query completed jobs from storage (last 7 days)
    2. For each job: get peak memory, total CPU, syscall distribution
    3. Cross-reference with job outcomes from learning store
    4. Statistical analysis:
       - Memory vs failure rate (binned histogram)
       - Syscall hotspots vs failure rate
       - Anomaly presence vs failure rate
       - Execution duration vs failure rate
    5. Generate RESOURCE_CORRELATION patterns for confidence > 0.6
    6. Store in LearningHub

    Returns:
        List of generated correlation dicts (for testing/logging).
    """
    if not self._learning_hub.is_running:
        _logger.debug("correlation_analyzer.learning_hub_not_running")
        return []

    # 1. Get completed job profiles from storage
    since = time.time() - _LOOKBACK_SECONDS
    job_profiles = await self._get_job_profiles(since)

    if len(job_profiles) < self._config.min_sample_size:
        _logger.debug(
            "correlation_analyzer.insufficient_samples",
            sample_count=len(job_profiles),
            min_required=self._config.min_sample_size,
        )
        return []

    # 2. Cross-reference with outcomes from learning store
    enriched = self._enrich_with_outcomes(job_profiles)

    if not enriched:
        _logger.debug("correlation_analyzer.no_enriched_profiles")
        return []

    # 3. Run statistical analyses
    correlations: list[dict[str, Any]] = []
    correlations.extend(self._analyze_memory_vs_failure(enriched))
    correlations.extend(self._analyze_syscall_vs_failure(enriched))
    correlations.extend(self._analyze_duration_vs_failure(enriched))

    # 4. Filter by confidence and store
    stored = 0
    for corr in correlations:
        if corr["confidence"] >= _MIN_CONFIDENCE:
            self._store_correlation(corr)
            stored += 1

    if stored > 0:
        _logger.info(
            "correlation_analyzer.patterns_generated",
            total_analyzed=len(enriched),
            correlations_found=len(correlations),
            patterns_stored=stored,
        )

    return correlations

GpuMetric dataclass

GpuMetric(index, utilization_pct, memory_used_mb, memory_total_mb, temperature_c)

Snapshot of a single GPU's current state.

GpuProbe

GPU resource probes following the SystemProbe pattern.

Each method tries pynvml first, then falls back to nvidia-smi. Returns empty list when no GPU is available — callers treat that as "no GPU present" (not an error).

Functions
get_gpu_metrics staticmethod
get_gpu_metrics()

Get current metrics for all GPUs.

Priority
  1. pynvml (fast, in-process)
  2. nvidia-smi subprocess fallback
  3. Empty list (no GPU / no drivers)

Returns:

Type Description
list[GpuMetric]

List of GpuMetric, one per GPU. Empty if no GPU available.

Source code in src/marianne/daemon/profiler/gpu_probe.py
@staticmethod
def get_gpu_metrics() -> list[GpuMetric]:
    """Get current metrics for all GPUs.

    Priority:
        1. pynvml (fast, in-process)
        2. nvidia-smi subprocess fallback
        3. Empty list (no GPU / no drivers)

    Returns:
        List of GpuMetric, one per GPU.  Empty if no GPU available.
    """
    if _pynvml_available:
        try:
            return GpuProbe._probe_pynvml()
        except Exception:
            _logger.debug("pynvml_probe_failed", exc_info=True)
    # Fallback to nvidia-smi
    try:
        return GpuProbe._probe_nvidia_smi_sync()
    except Exception:
        _logger.debug("nvidia_smi_probe_failed", exc_info=True)
    return []
get_gpu_metrics_async async staticmethod
get_gpu_metrics_async()

Async variant of get_gpu_metrics.

Uses asyncio.create_subprocess_exec for the nvidia-smi fallback so it doesn't block the event loop.

Returns:

Type Description
list[GpuMetric]

List of GpuMetric, one per GPU. Empty if no GPU available.

Source code in src/marianne/daemon/profiler/gpu_probe.py
@staticmethod
async def get_gpu_metrics_async() -> list[GpuMetric]:
    """Async variant of get_gpu_metrics.

    Uses ``asyncio.create_subprocess_exec`` for the nvidia-smi fallback
    so it doesn't block the event loop.

    Returns:
        List of GpuMetric, one per GPU.  Empty if no GPU available.
    """
    if _pynvml_available:
        try:
            return GpuProbe._probe_pynvml()
        except Exception:
            _logger.debug("pynvml_probe_failed", exc_info=True)
    try:
        return await GpuProbe._probe_nvidia_smi_async()
    except Exception:
        _logger.debug("nvidia_smi_async_probe_failed", exc_info=True)
    return []
is_available staticmethod
is_available()

Check whether any GPU probing method is available.

Returns True if pynvml is importable OR nvidia-smi is on PATH.

Source code in src/marianne/daemon/profiler/gpu_probe.py
@staticmethod
def is_available() -> bool:
    """Check whether any GPU probing method is available.

    Returns True if pynvml is importable OR nvidia-smi is on PATH.
    """
    if _pynvml_available:
        return True
    return shutil.which("nvidia-smi") is not None

Anomaly

Bases: BaseModel

A detected resource anomaly.

Produced by AnomalyDetector when heuristic thresholds are exceeded. Published to EventBus as monitor.anomaly events and stored as RESOURCE_ANOMALY patterns in the learning system.

AnomalyConfig

Bases: BaseModel

Thresholds for anomaly detection.

AnomalySeverity

Bases: str, Enum

Severity level for detected anomalies.

AnomalyType

Bases: str, Enum

Types of resource anomalies the detector can identify.

Attributes
MEMORY_SPIKE class-attribute instance-attribute
MEMORY_SPIKE = 'memory_spike'

RSS increased >threshold in recent window.

RUNAWAY_PROCESS class-attribute instance-attribute
RUNAWAY_PROCESS = 'runaway_process'

Child process consuming >threshold CPU for extended duration.

ZOMBIE class-attribute instance-attribute
ZOMBIE = 'zombie'

One or more zombie child processes found.

FD_EXHAUSTION class-attribute instance-attribute
FD_EXHAUSTION = 'fd_exhaustion'

Process approaching file descriptor limits.

CorrelationConfig

Bases: BaseModel

Configuration for the periodic correlation analyzer.

EventType

Bases: str, Enum

Process lifecycle event types.

ProcessEvent

Bases: BaseModel

Lifecycle event for a child process (spawn, exit, signal, kill, oom).

ProcessMetric

Bases: BaseModel

Resource metrics for a single process in a snapshot.

ProfilerConfig

Bases: BaseModel

Top-level profiler configuration for DaemonConfig.profiler.

Controls data collection, storage, anomaly detection thresholds, and correlation analysis frequency.

ResourceEstimate

Bases: BaseModel

Scheduling hint based on learned resource correlations.

Returned by BackpressureController.estimate_job_resource_needs() to inform job admission and scheduling decisions.

RetentionConfig

Bases: BaseModel

Data retention policy for profiler storage.

SystemSnapshot

Bases: BaseModel

Point-in-time system resource snapshot.

Collected periodically by ProfilerCollector, stored in SQLite + JSONL, and consumed by AnomalyDetector and CorrelationAnalyzer.

MonitorStorage

MonitorStorage(db_path, jsonl_path=None, jsonl_max_bytes=52428800)

Async SQLite + JSONL storage for profiler time-series data.

Uses aiosqlite for non-blocking database access and WAL mode for safe concurrent reads (mzt top) while the daemon writes.

Parameters

db_path: Path to the SQLite database file. Parent directories are created automatically. jsonl_path: Optional path for the NDJSON streaming log. When provided, each snapshot is also appended as a single JSON line. jsonl_max_bytes: Maximum JSONL file size before rotation (default 50 MB).

Source code in src/marianne/daemon/profiler/storage.py
def __init__(
    self,
    db_path: Path,
    jsonl_path: Path | None = None,
    jsonl_max_bytes: int = 52_428_800,
) -> None:
    self._db_path = Path(db_path).expanduser()
    self._jsonl_path = Path(jsonl_path).expanduser() if jsonl_path else None
    self._jsonl_max_bytes = jsonl_max_bytes
    self._initialized = False
    self._init_lock = asyncio.Lock()
    self._pool: aiosqlite.Connection | None = None
Functions
close async
close()

Close the pooled connection. Call on shutdown.

Source code in src/marianne/daemon/profiler/storage.py
async def close(self) -> None:
    """Close the pooled connection. Call on shutdown."""
    await self._close_pool()
initialize async
initialize()

Create tables and indexes if they don't exist.

Source code in src/marianne/daemon/profiler/storage.py
async def initialize(self) -> None:
    """Create tables and indexes if they don't exist."""
    if self._initialized:
        return

    async with self._init_lock:
        if self._initialized:
            return

        self._db_path.parent.mkdir(parents=True, exist_ok=True)

        async with self._connect() as db:
            await db.executescript(_SCHEMA_SQL)
            await db.commit()

        self._initialized = True
        _logger.info(
            "storage.initialized",
            db_path=str(self._db_path),
            jsonl_path=str(self._jsonl_path) if self._jsonl_path else None,
        )
write_snapshot async
write_snapshot(snapshot)

Insert a snapshot and its process metrics into the database.

Returns the snapshot row ID for cross-referencing.

Source code in src/marianne/daemon/profiler/storage.py
async def write_snapshot(self, snapshot: SystemSnapshot) -> int:
    """Insert a snapshot and its process metrics into the database.

    Returns the snapshot row ID for cross-referencing.
    """
    await self._ensure_initialized()

    # Serialize GPU lists as JSON arrays
    gpu_util = json.dumps([g.utilization_pct for g in snapshot.gpus])
    gpu_mem = json.dumps([g.memory_used_mb for g in snapshot.gpus])
    gpu_temp = json.dumps([g.temperature_c for g in snapshot.gpus])

    async with self._connect() as db:
        cursor = await db.execute(
            """
            INSERT INTO snapshots (
                timestamp, daemon_pid,
                system_memory_total_mb, system_memory_available_mb,
                system_memory_used_mb, daemon_rss_mb,
                child_count, zombie_count,
                load_avg_1, load_avg_5, load_avg_15,
                gpu_count, gpu_utilization_pct, gpu_memory_used_mb,
                gpu_temperature, pressure_level,
                running_jobs, active_sheets
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                snapshot.timestamp,
                snapshot.daemon_pid,
                snapshot.system_memory_total_mb,
                snapshot.system_memory_available_mb,
                snapshot.system_memory_used_mb,
                snapshot.daemon_rss_mb,
                len(snapshot.processes),
                snapshot.zombie_count,
                snapshot.load_avg_1,
                snapshot.load_avg_5,
                snapshot.load_avg_15,
                len(snapshot.gpus),
                gpu_util,
                gpu_mem,
                gpu_temp,
                snapshot.pressure_level,
                snapshot.running_jobs,
                snapshot.active_sheets,
            ),
        )
        snapshot_id = cursor.lastrowid
        assert snapshot_id is not None

        # Insert per-process metrics
        for proc in snapshot.processes:
            await db.execute(
                """
                INSERT INTO process_metrics (
                    snapshot_id, pid, ppid, command, state,
                    cpu_percent, rss_mb, vms_mb, threads, open_fds,
                    age_seconds, job_id, sheet_num,
                    syscall_counts, syscall_time_pct
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    snapshot_id,
                    proc.pid,
                    proc.ppid,
                    proc.command,
                    proc.state,
                    proc.cpu_percent,
                    proc.rss_mb,
                    proc.vms_mb,
                    proc.threads,
                    proc.open_fds,
                    proc.age_seconds,
                    proc.job_id,
                    proc.sheet_num,
                    json.dumps(proc.syscall_counts) if proc.syscall_counts else "{}",
                    json.dumps(proc.syscall_time_pct) if proc.syscall_time_pct else "{}",
                ),
            )

        await db.commit()

    return snapshot_id
write_event async
write_event(event)

Insert a process lifecycle event.

Source code in src/marianne/daemon/profiler/storage.py
async def write_event(self, event: ProcessEvent) -> None:
    """Insert a process lifecycle event."""
    await self._ensure_initialized()

    async with self._connect() as db:
        await db.execute(
            """
            INSERT INTO process_events (
                timestamp, pid, event_type,
                exit_code, signal_num,
                job_id, sheet_num, details
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                event.timestamp,
                event.pid,
                (event.event_type.value
                 if isinstance(event.event_type, EventType)
                 else event.event_type),
                event.exit_code,
                event.signal_num,
                event.job_id,
                event.sheet_num,
                event.details,
            ),
        )
        await db.commit()
read_snapshots async
read_snapshots(since, limit=100)

Read snapshots since the given unix timestamp.

Returns snapshots in chronological order, most recent last. Process metrics are reconstructed for each snapshot.

Source code in src/marianne/daemon/profiler/storage.py
async def read_snapshots(
    self, since: float, limit: int = 100
) -> list[SystemSnapshot]:
    """Read snapshots since the given unix timestamp.

    Returns snapshots in chronological order, most recent last.
    Process metrics are reconstructed for each snapshot.
    """
    await self._ensure_initialized()

    snapshots: list[SystemSnapshot] = []

    async with self._connect() as db:
        cursor = await db.execute(
            """
            SELECT id, timestamp, daemon_pid,
                   system_memory_total_mb, system_memory_available_mb,
                   system_memory_used_mb, daemon_rss_mb,
                   child_count, zombie_count,
                   load_avg_1, load_avg_5, load_avg_15,
                   gpu_count, gpu_utilization_pct, gpu_memory_used_mb,
                   gpu_temperature, pressure_level,
                   running_jobs, active_sheets
            FROM snapshots
            WHERE timestamp >= ?
            ORDER BY timestamp ASC
            LIMIT ?
            """,
            (since, limit),
        )
        rows = await cursor.fetchall()

        for row in rows:
            snapshot_id = row[0]

            # Reconstruct GPU metrics from JSON arrays
            gpu_count = row[12] or 0
            gpu_util = json.loads(row[13]) if row[13] else []
            gpu_mem = json.loads(row[14]) if row[14] else []
            gpu_temp = json.loads(row[15]) if row[15] else []

            gpus = [
                GpuMetric(
                    index=i,
                    utilization_pct=gpu_util[i] if i < len(gpu_util) else 0.0,
                    memory_used_mb=gpu_mem[i] if i < len(gpu_mem) else 0.0,
                    temperature_c=gpu_temp[i] if i < len(gpu_temp) else 0.0,
                )
                for i in range(gpu_count)
            ]

            # Load process metrics for this snapshot
            proc_cursor = await db.execute(
                """
                SELECT pid, ppid, command, state,
                       cpu_percent, rss_mb, vms_mb, threads, open_fds,
                       age_seconds, job_id, sheet_num,
                       syscall_counts, syscall_time_pct
                FROM process_metrics
                WHERE snapshot_id = ?
                """,
                (snapshot_id,),
            )
            proc_rows = await proc_cursor.fetchall()

            processes = [
                ProcessMetric(
                    pid=pr[0],
                    ppid=pr[1] or 0,
                    command=pr[2] or "",
                    state=pr[3] or "S",
                    cpu_percent=pr[4] or 0.0,
                    rss_mb=pr[5] or 0.0,
                    vms_mb=pr[6] or 0.0,
                    threads=pr[7] or 1,
                    open_fds=pr[8] or 0,
                    age_seconds=pr[9] or 0.0,
                    job_id=pr[10],
                    sheet_num=pr[11],
                    syscall_counts=json.loads(pr[12]) if pr[12] else {},
                    syscall_time_pct=json.loads(pr[13]) if pr[13] else {},
                )
                for pr in proc_rows
            ]

            snapshots.append(
                SystemSnapshot(
                    timestamp=row[1],
                    daemon_pid=row[2] or 0,
                    system_memory_total_mb=row[3] or 0.0,
                    system_memory_available_mb=row[4] or 0.0,
                    system_memory_used_mb=row[5] or 0.0,
                    daemon_rss_mb=row[6] or 0.0,
                    load_avg_1=row[9] or 0.0,
                    load_avg_5=row[10] or 0.0,
                    load_avg_15=row[11] or 0.0,
                    processes=processes,
                    gpus=gpus,
                    pressure_level=row[16] or "none",
                    running_jobs=row[17] or 0,
                    active_sheets=row[18] or 0,
                    zombie_count=row[8] or 0,
                )
            )

    return snapshots
read_events async
read_events(since, limit=100)

Read process events since the given unix timestamp.

Source code in src/marianne/daemon/profiler/storage.py
async def read_events(
    self, since: float, limit: int = 100
) -> list[ProcessEvent]:
    """Read process events since the given unix timestamp."""
    await self._ensure_initialized()

    async with self._connect() as db:
        cursor = await db.execute(
            """
            SELECT timestamp, pid, event_type,
                   exit_code, signal_num,
                   job_id, sheet_num, details
            FROM process_events
            WHERE timestamp >= ?
            ORDER BY timestamp ASC
            LIMIT ?
            """,
            (since, limit),
        )
        rows = await cursor.fetchall()

    return [
        ProcessEvent(
            timestamp=row[0],
            pid=row[1],
            event_type=EventType(row[2]),
            exit_code=row[3],
            signal_num=row[4],
            job_id=row[5],
            sheet_num=row[6],
            details=row[7] or "",
        )
        for row in rows
    ]
read_process_history async
read_process_history(pid, since)

Read historical metrics for a specific process.

Source code in src/marianne/daemon/profiler/storage.py
async def read_process_history(
    self, pid: int, since: float
) -> list[ProcessMetric]:
    """Read historical metrics for a specific process."""
    await self._ensure_initialized()

    async with self._connect() as db:
        cursor = await db.execute(
            """
            SELECT pm.pid, pm.ppid, pm.command, pm.state,
                   pm.cpu_percent, pm.rss_mb, pm.vms_mb,
                   pm.threads, pm.open_fds, pm.age_seconds,
                   pm.job_id, pm.sheet_num,
                   pm.syscall_counts, pm.syscall_time_pct
            FROM process_metrics pm
            JOIN snapshots s ON s.id = pm.snapshot_id
            WHERE pm.pid = ? AND s.timestamp >= ?
            ORDER BY s.timestamp ASC
            """,
            (pid, since),
        )
        rows = await cursor.fetchall()

    return [
        ProcessMetric(
            pid=row[0],
            ppid=row[1] or 0,
            command=row[2] or "",
            state=row[3] or "S",
            cpu_percent=row[4] or 0.0,
            rss_mb=row[5] or 0.0,
            vms_mb=row[6] or 0.0,
            threads=row[7] or 1,
            open_fds=row[8] or 0,
            age_seconds=row[9] or 0.0,
            job_id=row[10],
            sheet_num=row[11],
            syscall_counts=json.loads(row[12]) if row[12] else {},
            syscall_time_pct=json.loads(row[13]) if row[13] else {},
        )
        for row in rows
    ]
read_job_resource_profile async
read_job_resource_profile(job_id)

Aggregate resource profile for a specific job.

Returns a dict with peak memory, total CPU-time, process spawn count, and syscall hotspots — useful for scheduling hints and mzt diagnose --resources.

Source code in src/marianne/daemon/profiler/storage.py
async def read_job_resource_profile(self, job_id: str) -> dict[str, Any]:
    """Aggregate resource profile for a specific job.

    Returns a dict with peak memory, total CPU-time, process spawn
    count, and syscall hotspots — useful for scheduling hints and
    ``mzt diagnose --resources``.
    """
    await self._ensure_initialized()

    profile: dict[str, Any] = {
        "job_id": job_id,
        "peak_rss_mb": 0.0,
        "total_cpu_seconds": 0.0,
        "process_spawn_count": 0,
        "unique_pids": set(),
        "syscall_hotspots": {},
        "sheet_metrics": {},
    }

    async with self._connect() as db:
        # Per-process metrics aggregated by job
        cursor = await db.execute(
            """
            SELECT pm.pid, pm.sheet_num, pm.rss_mb, pm.cpu_percent,
                   pm.syscall_time_pct, s.timestamp
            FROM process_metrics pm
            JOIN snapshots s ON s.id = pm.snapshot_id
            WHERE pm.job_id = ?
            ORDER BY s.timestamp ASC
            """,
            (job_id,),
        )
        rows = await cursor.fetchall()

        for row in rows:
            pid = row[0]
            sheet_num = row[1]
            rss_mb = row[2] or 0.0
            cpu_pct = row[3] or 0.0
            syscall_pct_json = row[4]

            profile["unique_pids"].add(pid)
            if rss_mb > profile["peak_rss_mb"]:
                profile["peak_rss_mb"] = rss_mb

            # Accumulate syscall time across all observations
            if syscall_pct_json:
                for sc, pct in json.loads(syscall_pct_json).items():
                    profile["syscall_hotspots"][sc] = (
                        profile["syscall_hotspots"].get(sc, 0.0) + pct
                    )

            # Track per-sheet peak memory
            if sheet_num is not None:
                key = str(sheet_num)
                if key not in profile["sheet_metrics"]:
                    profile["sheet_metrics"][key] = {
                        "peak_rss_mb": 0.0,
                        "max_cpu_pct": 0.0,
                    }
                sm = profile["sheet_metrics"][key]
                if rss_mb > sm["peak_rss_mb"]:
                    sm["peak_rss_mb"] = rss_mb
                if cpu_pct > sm["max_cpu_pct"]:
                    sm["max_cpu_pct"] = cpu_pct

        # Count spawn events for this job
        event_cursor = await db.execute(
            """
            SELECT COUNT(*) FROM process_events
            WHERE job_id = ? AND event_type = ?
            """,
            (job_id, EventType.SPAWN.value),
        )
        spawn_row = await event_cursor.fetchone()
        profile["process_spawn_count"] = spawn_row[0] if spawn_row else 0

    # Convert set to count for JSON serialization
    profile["unique_pid_count"] = len(profile["unique_pids"])
    del profile["unique_pids"]

    return profile
cleanup async
cleanup(retention)

Apply retention policy by deleting old data.

  • Snapshots + process_metrics older than full_resolution_hours
  • Process events older than events_days
Source code in src/marianne/daemon/profiler/storage.py
async def cleanup(self, retention: RetentionConfig) -> None:
    """Apply retention policy by deleting old data.

    - Snapshots + process_metrics older than full_resolution_hours
    - Process events older than events_days
    """
    await self._ensure_initialized()

    now = time.time()
    snapshot_cutoff = now - (retention.full_resolution_hours * 3600)
    event_cutoff = now - (retention.events_days * 86400)

    async with self._connect() as db:
        # Delete process metrics for old snapshots first (FK constraint)
        await db.execute(
            """
            DELETE FROM process_metrics
            WHERE snapshot_id IN (
                SELECT id FROM snapshots WHERE timestamp < ?
            )
            """,
            (snapshot_cutoff,),
        )

        # Delete old snapshots
        await db.execute(
            "DELETE FROM snapshots WHERE timestamp < ?",
            (snapshot_cutoff,),
        )

        # Delete old events
        await db.execute(
            "DELETE FROM process_events WHERE timestamp < ?",
            (event_cutoff,),
        )

        await db.commit()

    _logger.info(
        "storage.cleanup_complete",
        snapshot_cutoff_hours=retention.full_resolution_hours,
        event_cutoff_days=retention.events_days,
    )
append_jsonl
append_jsonl(snapshot)

Append one NDJSON line for the given snapshot.

Synchronous I/O — callers should wrap in run_in_executor if strict non-blocking is required. In practice the writes are small and fast enough for the daemon's collection loop.

Performs size-based rotation: when the file exceeds jsonl_max_bytes, renames it with a .1 suffix (keeping at most 2 rotated files) and starts a new file.

Source code in src/marianne/daemon/profiler/storage.py
def append_jsonl(self, snapshot: SystemSnapshot) -> None:
    """Append one NDJSON line for the given snapshot.

    Synchronous I/O — callers should wrap in ``run_in_executor``
    if strict non-blocking is required.  In practice the writes are
    small and fast enough for the daemon's collection loop.

    Performs size-based rotation: when the file exceeds
    ``jsonl_max_bytes``, renames it with a ``.1`` suffix (keeping
    at most 2 rotated files) and starts a new file.
    """
    if self._jsonl_path is None:
        return

    self._jsonl_path.parent.mkdir(parents=True, exist_ok=True)

    # Rotate if needed
    if self._jsonl_path.exists():
        try:
            size = self._jsonl_path.stat().st_size
        except OSError:
            size = 0
        if size >= self._jsonl_max_bytes:
            self._rotate_jsonl()

    # Serialize using Pydantic's model_dump for reliable output
    line = json.dumps(snapshot.model_dump(), separators=(",", ":"))
    try:
        with self._jsonl_path.open("a") as f:
            f.write(line + "\n")
    except OSError:
        _logger.warning("storage.jsonl_write_failed", exc_info=True)

StraceManager

StraceManager(enabled=True)

Manages strace attachment to child processes.

Typical lifecycle::

mgr = StraceManager(enabled=True)
await mgr.attach(pid)        # spawns ``strace -c -p PID``
...                           # time passes, child does work
summary = await mgr.detach(pid)  # SIGINT -> parse summary
await mgr.detach_all()        # cleanup on shutdown
Source code in src/marianne/daemon/profiler/strace_manager.py
def __init__(self, enabled: bool = True) -> None:
    self._enabled = enabled
    # Maps target PID -> strace asyncio.subprocess.Process
    self._attached: dict[int, asyncio.subprocess.Process] = {}
    # Maps target PID -> full-trace asyncio.subprocess.Process
    self._full_traces: dict[int, asyncio.subprocess.Process] = {}
Attributes
attached_pids property
attached_pids

PIDs currently being traced.

Functions
is_available staticmethod
is_available()

Check whether strace is available on this system.

Source code in src/marianne/daemon/profiler/strace_manager.py
@staticmethod
def is_available() -> bool:
    """Check whether strace is available on this system."""
    return _strace_path is not None
attach async
attach(pid)

Attach strace -c -p <pid> for syscall summary collection.

Parameters:

Name Type Description Default
pid int

Target process PID to trace.

required

Returns:

Type Description
bool

True if strace was successfully spawned, False otherwise.

Source code in src/marianne/daemon/profiler/strace_manager.py
async def attach(self, pid: int) -> bool:
    """Attach ``strace -c -p <pid>`` for syscall summary collection.

    Args:
        pid: Target process PID to trace.

    Returns:
        True if strace was successfully spawned, False otherwise.
    """
    if not self._enabled:
        _logger.debug("strace_disabled", pid=pid)
        return False

    if _strace_path is None:
        _logger.warning("strace_not_available")
        return False

    if pid in self._attached:
        _logger.debug("strace_already_attached", pid=pid)
        return True

    try:
        proc = await asyncio.create_subprocess_exec(
            _strace_path,
            "-c",
            "-p",
            str(pid),
            "-e",
            "trace=all",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        self._attached[pid] = proc
        _logger.info("strace_attached", pid=pid, strace_pid=proc.pid)
        return True
    except PermissionError:
        _logger.warning("strace_permission_denied", pid=pid)
        return False
    except FileNotFoundError:
        _logger.warning("strace_not_found", pid=pid)
        return False
    except ProcessLookupError:
        _logger.debug("strace_target_already_exited", pid=pid)
        return False
    except OSError as exc:
        _logger.warning("strace_attach_failed", pid=pid, error=str(exc))
        return False
detach async
detach(pid)

Detach strace from a process and parse the summary output.

Sends SIGINT to the strace process (which causes it to print its -c summary table to stderr), then parses the output.

Parameters:

Name Type Description Default
pid int

Target process PID to stop tracing.

required

Returns:

Type Description
dict[str, Any] | None

Dict with syscall_counts and syscall_time_pct mappings,

dict[str, Any] | None

or None if the pid was not being traced.

Source code in src/marianne/daemon/profiler/strace_manager.py
async def detach(self, pid: int) -> dict[str, Any] | None:
    """Detach strace from a process and parse the summary output.

    Sends SIGINT to the strace process (which causes it to print its
    ``-c`` summary table to stderr), then parses the output.

    Args:
        pid: Target process PID to stop tracing.

    Returns:
        Dict with ``syscall_counts`` and ``syscall_time_pct`` mappings,
        or None if the pid was not being traced.
    """
    proc = self._attached.pop(pid, None)
    if proc is None:
        return None

    # Send SIGINT to strace so it prints the summary
    try:
        if proc.returncode is None:
            proc.send_signal(signal.SIGINT)
    except (ProcessLookupError, OSError):
        # strace already exited
        pass

    try:
        _, stderr = await asyncio.wait_for(proc.communicate(), timeout=5)
    except TimeoutError:
        _logger.warning("strace_detach_timeout", pid=pid)
        try:
            proc.kill()
            await proc.wait()
        except (ProcessLookupError, OSError):
            pass
        return None

    if not stderr:
        return None

    output = stderr.decode(errors="replace")
    _logger.debug("strace_output_received", pid=pid, output_len=len(output))
    return self._parse_strace_summary(output)
attach_full_trace async
attach_full_trace(pid, output_file)

Attach a full strace (strace -f -t -p PID -o file).

This is the on-demand deep-trace triggered by mzt top --trace PID.

Parameters:

Name Type Description Default
pid int

Target process PID.

required
output_file Path

Path to write the full trace output.

required

Returns:

Type Description
bool

True if strace was successfully spawned, False otherwise.

Source code in src/marianne/daemon/profiler/strace_manager.py
async def attach_full_trace(self, pid: int, output_file: Path) -> bool:
    """Attach a full strace (``strace -f -t -p PID -o file``).

    This is the on-demand deep-trace triggered by ``mzt top --trace PID``.

    Args:
        pid: Target process PID.
        output_file: Path to write the full trace output.

    Returns:
        True if strace was successfully spawned, False otherwise.
    """
    if not self._enabled:
        return False

    if _strace_path is None:
        _logger.warning("strace_not_available_full")
        return False

    if pid in self._full_traces:
        _logger.debug("full_trace_already_attached", pid=pid)
        return True

    output_file.parent.mkdir(parents=True, exist_ok=True)

    try:
        proc = await asyncio.create_subprocess_exec(
            _strace_path,
            "-f",
            "-t",
            "-p",
            str(pid),
            "-o",
            str(output_file),
            stdout=asyncio.subprocess.DEVNULL,
            stderr=asyncio.subprocess.PIPE,
        )
        self._full_traces[pid] = proc
        _logger.info(
            "full_trace_attached",
            pid=pid,
            strace_pid=proc.pid,
            output_file=str(output_file),
        )
        return True
    except PermissionError:
        _logger.warning("full_trace_permission_denied", pid=pid)
        return False
    except FileNotFoundError:
        _logger.warning("full_trace_strace_not_found", pid=pid)
        return False
    except ProcessLookupError:
        _logger.debug("full_trace_target_exited", pid=pid)
        return False
    except OSError as exc:
        _logger.warning("full_trace_attach_failed", pid=pid, error=str(exc))
        return False
detach_all async
detach_all()

Detach and terminate all strace processes.

Called during daemon shutdown for cleanup.

Source code in src/marianne/daemon/profiler/strace_manager.py
async def detach_all(self) -> None:
    """Detach and terminate all strace processes.

    Called during daemon shutdown for cleanup.
    """
    all_procs: list[tuple[int, asyncio.subprocess.Process]] = []
    all_procs.extend(self._attached.items())
    all_procs.extend(self._full_traces.items())

    self._attached.clear()
    self._full_traces.clear()

    for item in all_procs:
        proc = item[1]
        try:
            if proc.returncode is None:
                proc.send_signal(signal.SIGINT)
        except (ProcessLookupError, OSError):
            continue

    # Give them a moment to exit cleanly, then force-kill
    for item in all_procs:
        proc = item[1]
        try:
            await asyncio.wait_for(proc.wait(), timeout=2)
        except TimeoutError:
            try:
                proc.kill()
                await proc.wait()
            except (ProcessLookupError, OSError):
                pass
        except (ProcessLookupError, OSError):
            pass

    if all_procs:
        _logger.info("strace_detach_all", count=len(all_procs))
get_strace_pids
get_strace_pids()

Return PIDs of all running strace subprocesses.

Useful for registering with ProcessGroupManager so they get cleaned up on daemon shutdown.

Source code in src/marianne/daemon/profiler/strace_manager.py
def get_strace_pids(self) -> list[int]:
    """Return PIDs of all running strace subprocesses.

    Useful for registering with ProcessGroupManager so they
    get cleaned up on daemon shutdown.
    """
    pids: list[int] = []
    for proc in self._attached.values():
        if proc.pid is not None and proc.returncode is None:
            pids.append(proc.pid)
    for proc in self._full_traces.values():
        if proc.pid is not None and proc.returncode is None:
            pids.append(proc.pid)
    return pids

Functions

generate_resource_report async

generate_resource_report(job_id, storage)

Generate comprehensive resource report for a job.

Produces a text report designed for AI consumption (mzt diagnose --resources). Aggregates peak memory per sheet, total CPU-time, process spawn count, signal/kill events, zombie/OOM events, syscall hotspots, and anomaly history.

Parameters:

Name Type Description Default
job_id str

The job ID to generate the report for.

required
storage MonitorStorage

An initialized MonitorStorage instance.

required

Returns:

Type Description
str

Multi-line text report. Returns a short "no data" message if the

str

job has no profiler data.

Source code in src/marianne/daemon/profiler/storage.py
async def generate_resource_report(job_id: str, storage: MonitorStorage) -> str:
    """Generate comprehensive resource report for a job.

    Produces a text report designed for AI consumption (``mzt diagnose
    --resources``).  Aggregates peak memory per sheet, total CPU-time,
    process spawn count, signal/kill events, zombie/OOM events, syscall
    hotspots, and anomaly history.

    Args:
        job_id: The job ID to generate the report for.
        storage: An initialized ``MonitorStorage`` instance.

    Returns:
        Multi-line text report.  Returns a short "no data" message if the
        job has no profiler data.
    """
    profile = await storage.read_job_resource_profile(job_id)

    if not profile or profile.get("unique_pid_count", 0) == 0:
        return f"Resource Profile for job '{job_id}':\n  No profiler data available.\n"

    lines: list[str] = [f"Resource Profile for job '{job_id}':"]

    # -- Summary metrics --
    peak_rss = profile.get("peak_rss_mb", 0.0)
    spawn_count = profile.get("process_spawn_count", 0)
    unique_pids = profile.get("unique_pid_count", 0)
    lines.append(f"  Peak memory: {peak_rss:.1f} MB")
    lines.append(f"  Process spawns: {spawn_count}")
    lines.append(f"  Unique PIDs observed: {unique_pids}")

    # -- Per-sheet breakdown --
    sheet_metrics = profile.get("sheet_metrics", {})
    if sheet_metrics:
        lines.append("")
        lines.append("  Per-sheet resource peaks:")
        for sheet_num in sorted(sheet_metrics.keys(), key=lambda x: int(x)):
            sm = sheet_metrics[sheet_num]
            peak = sm.get("peak_rss_mb", 0.0)
            cpu = sm.get("max_cpu_pct", 0.0)
            lines.append(f"    Sheet {sheet_num}: peak RSS {peak:.1f} MB, max CPU {cpu:.0f}%")

    # -- Syscall hotspots --
    hotspots = profile.get("syscall_hotspots", {})
    if hotspots:
        sorted_sc = sorted(hotspots.items(), key=lambda x: x[1], reverse=True)
        lines.append("")
        lines.append("  Syscall hotspots (cumulative time %):")
        for name, pct in sorted_sc[:10]:
            lines.append(f"    {name}: {pct:.1f}%")

    # -- Process events (signals, kills, OOM) --
    # Read events for this job from the last 7 days
    events = await storage.read_events(since=time.time() - 86400 * 7, limit=10000)
    job_events = [e for e in events if e.job_id == job_id]

    if job_events:
        signal_events = [
            e for e in job_events
            if e.event_type in (EventType.SIGNAL, EventType.KILL)
        ]
        oom_events = [e for e in job_events if e.event_type == EventType.OOM]
        exit_events = [e for e in job_events if e.event_type == EventType.EXIT]

        if signal_events:
            lines.append("")
            lines.append(f"  Signals sent: {len(signal_events)}")
            for evt in signal_events[:10]:
                sig = f"signal={evt.signal_num}" if evt.signal_num else ""
                lines.append(f"    PID {evt.pid} {sig} {evt.details}")

        if oom_events:
            lines.append("")
            lines.append(f"  OOM events: {len(oom_events)}")
            for evt in oom_events[:5]:
                lines.append(f"    PID {evt.pid}: {evt.details}")

        # Retry count (spawn events > exit events indicates retries)
        spawn_events = [e for e in job_events if e.event_type == EventType.SPAWN]
        total_spawns = len(spawn_events)
        total_exits = len(exit_events)
        if total_spawns > 0:
            lines.append("")
            lines.append(f"  Process lifecycle: {total_spawns} spawns, {total_exits} exits")

    lines.append("")
    return "\n".join(lines)