Skip to content

collector

collector

Central profiler orchestrator for the Marianne daemon.

ProfilerCollector is the heart of the profiling subsystem. It runs a periodic collection loop that gathers system metrics, per-process data, GPU stats, and strace summaries into SystemSnapshot objects. These are persisted (SQLite + JSONL), fed to the AnomalyDetector, and published to the EventBus for downstream consumers.

Lifecycle::

collector = ProfilerCollector(config, monitor, pgroup, event_bus, manager)
await collector.start()
...
snapshot = await collector.collect_snapshot()
...
await collector.stop()

Classes

ProfilerCollector

ProfilerCollector(config, monitor, pgroup, event_bus, manager=None)

Central orchestrator for the daemon profiler subsystem.

Coordinates: - Periodic metric collection (system + per-process + GPU + strace) - SQLite + JSONL persistence via MonitorStorage - Heuristic anomaly detection via AnomalyDetector - EventBus integration for monitor.anomaly events - Process lifecycle tracking via sheet.started/completed/failed

Parameters

config: Profiler configuration (interval, storage paths, thresholds). monitor: The daemon's ResourceMonitor for system-level metrics. pgroup: The daemon's ProcessGroupManager for child process enumeration. event_bus: The daemon's EventBus for publishing anomaly events and subscribing to sheet lifecycle events. manager: Optional JobManager for mapping PIDs to job_id/sheet_num and reading running job / active sheet counts.

Source code in src/marianne/daemon/profiler/collector.py
def __init__(
    self,
    config: ProfilerConfig,
    monitor: ResourceMonitor,
    pgroup: ProcessGroupManager,
    event_bus: EventBus,
    manager: JobManager | None = None,
) -> None:
    self._config = config
    self._monitor = monitor
    self._pgroup = pgroup
    self._event_bus = event_bus
    self._manager = manager

    # Storage
    self._storage = MonitorStorage(
        db_path=config.storage_path.expanduser(),
        jsonl_path=config.jsonl_path.expanduser(),
        jsonl_max_bytes=config.jsonl_max_bytes,
    )

    # Strace management
    self._strace = StraceManager(enabled=config.strace_enabled)

    # Anomaly detection
    self._anomaly_detector = AnomalyDetector(config=config.anomaly)

    # State
    self._running = False
    self._loop_task: asyncio.Task[None] | None = None
    self._history: list[SystemSnapshot] = []
    self._latest_snapshot: SystemSnapshot | None = None
    self._known_pids: set[int] = set()
    self._sub_ids: list[str] = []
    self._recent_events: list[ProcessEvent] = []
    self._max_recent_events = 200
    # F-488: monotonic timestamp of the last retention cleanup.
    # Initialized to 0 so the first loop iteration runs cleanup
    # promptly on daemon startup (catches up on any backlog from
    # periods when the daemon wasn't running).
    self._last_cleanup_monotonic: float = 0.0
Functions
start async
start()

Initialize storage, subscribe to events, start collection loop.

Source code in src/marianne/daemon/profiler/collector.py
async def start(self) -> None:
    """Initialize storage, subscribe to events, start collection loop."""
    if self._running:
        return

    if not self._config.enabled:
        _logger.info("profiler.disabled")
        return

    await self._storage.initialize()

    # Subscribe to sheet lifecycle events
    self._sub_ids.append(
        self._event_bus.subscribe(
            self._on_sheet_started,
            event_filter=lambda e: e.get("event") == "sheet.started",
        )
    )
    self._sub_ids.append(
        self._event_bus.subscribe(
            self._on_sheet_completed,
            event_filter=lambda e: e.get("event") in (
                "sheet.completed", "sheet.failed"
            ),
        )
    )

    self._running = True
    self._loop_task = asyncio.create_task(
        self._collection_loop(), name="profiler-collection-loop"
    )
    _logger.info(
        "profiler.started",
        interval=self._config.interval_seconds,
        strace=self._strace.enabled,
        gpu=self._config.gpu_enabled,
    )
stop async
stop()

Stop collection loop, detach strace, unsubscribe from events.

Source code in src/marianne/daemon/profiler/collector.py
async def stop(self) -> None:
    """Stop collection loop, detach strace, unsubscribe from events."""
    self._running = False

    if self._loop_task is not None:
        self._loop_task.cancel()
        try:
            await self._loop_task
        except asyncio.CancelledError:
            pass
        self._loop_task = None

    # Detach all strace processes
    await self._strace.detach_all()

    # Unsubscribe from event bus
    for sub_id in self._sub_ids:
        self._event_bus.unsubscribe(sub_id)
    self._sub_ids.clear()

    # Close pooled storage connection
    await self._storage.close()

    _logger.info("profiler.stopped")
collect_snapshot async
collect_snapshot()

Gather all metrics into a single SystemSnapshot.

Steps: 1. System memory via SystemProbe 2. Per-process metrics via psutil (with PID → job mapping) 3. GPU metrics via GpuProbe 4. Load average via os.getloadavg() 5. Strace summaries for attached PIDs 6. Pressure level from BackpressureController 7. Running jobs / active sheets from JobManager

Source code in src/marianne/daemon/profiler/collector.py
async def collect_snapshot(self) -> SystemSnapshot:
    """Gather all metrics into a single SystemSnapshot.

    Steps:
    1. System memory via SystemProbe
    2. Per-process metrics via psutil (with PID → job mapping)
    3. GPU metrics via GpuProbe
    4. Load average via os.getloadavg()
    5. Strace summaries for attached PIDs
    6. Pressure level from BackpressureController
    7. Running jobs / active sheets from JobManager
    """
    now = time.time()
    daemon_pid = os.getpid()

    # 1. System memory
    system_total = 0.0
    system_available = 0.0
    system_used = 0.0
    daemon_rss = SystemProbe.get_memory_mb() or 0.0

    try:
        system_total, system_available, system_used = self._get_system_memory()
    except Exception:
        _logger.debug("profiler.system_memory_probe_failed", exc_info=True)

    # 2. Per-process metrics
    processes, zombie_count, zombie_pids = await self._collect_process_metrics()

    # 3. GPU metrics
    gpus_raw: list[Any] = []
    if self._config.gpu_enabled:
        try:
            gpus_raw = await GpuProbe.get_gpu_metrics_async()
        except Exception:
            _logger.debug("profiler.gpu_probe_failed", exc_info=True)

    # Convert gpu_probe.GpuMetric (dataclass) → profiler.models.GpuMetric (Pydantic)
    from marianne.daemon.profiler.models import GpuMetric as PydanticGpuMetric
    gpus = [
        PydanticGpuMetric(
            index=g.index,
            utilization_pct=g.utilization_pct,
            memory_used_mb=g.memory_used_mb,
            memory_total_mb=g.memory_total_mb,
            temperature_c=g.temperature_c,
        )
        for g in gpus_raw
    ]

    # 4. Load average
    load1 = load5 = load15 = 0.0
    try:
        load1, load5, load15 = os.getloadavg()
    except OSError:
        pass

    # 5. Strace summaries — merge into process metrics
    for proc in processes:
        if proc.pid in self._strace.attached_pids:
            # Don't detach — just note it's being traced.
            # The summary is collected on detach (process exit).
            pass

    # 6. Pressure level
    pressure_level = "none"
    if self._manager is not None:
        try:
            pressure_level = self._manager.backpressure.current_level().value
        except Exception:
            _logger.debug("profiler.pressure_level_failed", exc_info=True)

    # 7. Running jobs / active sheets
    running_jobs = 0
    active_sheets = 0
    if self._manager is not None:
        running_jobs = self._manager.running_count
        active_sheets = self._manager.active_job_count

    # 8. Per-job progress from live CheckpointState
    job_progress_list: list[JobProgress] = []
    if self._manager is not None:
        for jid, state in self._manager._live_states.items():
            job_progress_list.append(
                JobProgress(
                    job_id=jid,
                    total_sheets=getattr(state, "total_sheets", 1),
                    last_completed_sheet=getattr(state, "last_completed_sheet", 0),
                    current_sheet=getattr(state, "current_sheet", None),
                    status=getattr(state, "status", "unknown"),
                )
            )

    # 9. Conductor uptime
    conductor_uptime = 0.0
    if self._manager is not None:
        conductor_uptime = self._manager.uptime_seconds

    snapshot = SystemSnapshot(
        timestamp=now,
        daemon_pid=daemon_pid,
        system_memory_total_mb=system_total,
        system_memory_available_mb=system_available,
        system_memory_used_mb=system_used,
        daemon_rss_mb=daemon_rss,
        load_avg_1=load1,
        load_avg_5=load5,
        load_avg_15=load15,
        processes=processes,
        gpus=gpus,
        pressure_level=pressure_level,
        running_jobs=running_jobs,
        active_sheets=active_sheets,
        zombie_count=zombie_count,
        zombie_pids=zombie_pids,
        job_progress=job_progress_list,
        conductor_uptime_seconds=conductor_uptime,
    )

    self._latest_snapshot = snapshot
    return snapshot
get_resource_context_for_pid
get_resource_context_for_pid(pid)

Get current resource context for a specific PID.

Returns a dict suitable for embedding in sheet event data: rss_mb, cpu_pct, syscall_hotspot, anomalies_active.

If the PID is not found in the latest snapshot, returns a dict with all values set to None/empty.

Source code in src/marianne/daemon/profiler/collector.py
def get_resource_context_for_pid(self, pid: int) -> dict[str, Any]:
    """Get current resource context for a specific PID.

    Returns a dict suitable for embedding in sheet event data:
    ``rss_mb``, ``cpu_pct``, ``syscall_hotspot``, ``anomalies_active``.

    If the PID is not found in the latest snapshot, returns a dict
    with all values set to None/empty.
    """
    context: dict[str, Any] = {
        "rss_mb": None,
        "cpu_pct": None,
        "syscall_hotspot": None,
        "anomalies_active": [],
    }

    snapshot = self._latest_snapshot
    if snapshot is None:
        return context

    # Add daemon-level RSS as fallback context
    context["daemon_rss_mb"] = snapshot.daemon_rss_mb

    # Find the process in the latest snapshot
    proc_match: ProcessMetric | None = None
    for proc in snapshot.processes:
        if proc.pid == pid:
            proc_match = proc
            break

    if proc_match is not None:
        context["rss_mb"] = round(proc_match.rss_mb, 1)
        context["cpu_pct"] = round(proc_match.cpu_percent, 1)

        # Build syscall hotspot summary from time percentages
        if proc_match.syscall_time_pct:
            top_syscall = max(
                proc_match.syscall_time_pct.items(),
                key=lambda x: x[1],
            )
            context["syscall_hotspot"] = (
                f"{top_syscall[0]} {top_syscall[1]:.0%}"
            )

    # Collect active anomalies from the most recent detection cycle.
    # We check the anomaly detector against the latest snapshot and history.
    try:
        anomalies = self._anomaly_detector.detect(snapshot, self._history)
        context["anomalies_active"] = [
            a.anomaly_type.value for a in anomalies
        ]
    except Exception:
        _logger.debug("profiler.anomaly_check_for_context_failed", exc_info=True)

    return context
get_resource_context
get_resource_context()

Get general resource context (not PID-specific).

Useful when no specific PID is available for the event.

Source code in src/marianne/daemon/profiler/collector.py
def get_resource_context(self) -> dict[str, Any]:
    """Get general resource context (not PID-specific).

    Useful when no specific PID is available for the event.
    """
    context: dict[str, Any] = {
        "rss_mb": None,
        "cpu_pct": None,
        "syscall_hotspot": None,
        "anomalies_active": [],
    }

    snapshot = self._latest_snapshot
    if snapshot is None:
        return context

    context["daemon_rss_mb"] = snapshot.daemon_rss_mb
    context["pressure_level"] = snapshot.pressure_level
    context["running_jobs"] = snapshot.running_jobs
    context["active_sheets"] = snapshot.active_sheets

    try:
        anomalies = self._anomaly_detector.detect(snapshot, self._history)
        context["anomalies_active"] = [
            a.anomaly_type.value for a in anomalies
        ]
    except Exception:
        _logger.debug("profiler.anomaly_check_for_context_failed", exc_info=True)

    return context
get_latest_snapshot
get_latest_snapshot()

Return the latest snapshot as a JSON-serializable dict.

Used by the daemon.top IPC method.

Source code in src/marianne/daemon/profiler/collector.py
def get_latest_snapshot(self) -> dict[str, Any] | None:
    """Return the latest snapshot as a JSON-serializable dict.

    Used by the ``daemon.top`` IPC method.
    """
    if self._latest_snapshot is None:
        return None
    return self._latest_snapshot.model_dump(mode="json")
get_jsonl_path
get_jsonl_path()

Return the JSONL streaming log path.

Used by the daemon.top.stream IPC method.

Source code in src/marianne/daemon/profiler/collector.py
def get_jsonl_path(self) -> str | None:
    """Return the JSONL streaming log path.

    Used by the ``daemon.top.stream`` IPC method.
    """
    if self._config.jsonl_path:
        return str(self._config.jsonl_path.expanduser())
    return None
get_recent_events
get_recent_events(limit=50)

Return recent process events as JSON-serializable dicts.

Used by the daemon.events IPC method.

Source code in src/marianne/daemon/profiler/collector.py
def get_recent_events(self, limit: int = 50) -> list[dict[str, Any]]:
    """Return recent process events as JSON-serializable dicts.

    Used by the ``daemon.events`` IPC method.
    """
    events = self._recent_events[-limit:]
    return [e.model_dump(mode="json") for e in events]

Functions