Skip to content

monitor

monitor

Resource monitoring for the Marianne daemon.

Periodic background checks on memory, child process count, and zombie detection. Emits structured log warnings when limits are approached and triggers backpressure (e.g. job cancellation) when hard limits are exceeded.

Delegates system probing to SystemProbe (system_probe.py) which consolidates the psutil / /proc fallback pattern in one place.

Classes

ResourceSnapshot dataclass

ResourceSnapshot(timestamp, memory_usage_mb, child_process_count, running_jobs, active_sheets, zombie_pids=list(), probe_failed=False)

Point-in-time resource usage reading.

ResourceMonitor

ResourceMonitor(config, manager=None, pgroup=None)

Periodic resource monitoring for the daemon.

Checks memory usage, child process count, and zombie processes on a configurable interval. Emits structured log warnings when approaching limits and can trigger hard actions (job cancellation) when hard limits are exceeded.

Source code in src/marianne/daemon/monitor.py
def __init__(
    self,
    config: ResourceLimitConfig,
    manager: JobManager | None = None,
    pgroup: ProcessGroupManager | None = None,
) -> None:
    self._config = config
    self._manager = manager
    self._pgroup = pgroup
    self._task: asyncio.Task[None] | None = None
    self._running = False
    self._consecutive_failures = 0
    self._degraded = False
    self._prune_consecutive_failures = 0
    self._prune_disabled = False
    self._last_successful_check: float = 0.0
Attributes
seconds_since_last_check property
seconds_since_last_check

Seconds since the last successful monitoring check.

Returns float('inf') if no check has succeeded yet. Consumers can use this to detect stale resource data.

max_memory_mb property
max_memory_mb

Configured maximum memory in MB.

is_degraded property
is_degraded

Whether the monitor has entered degraded mode due to repeated failures.

Functions
start async
start(interval_seconds=15.0)

Start the periodic monitoring loop.

Source code in src/marianne/daemon/monitor.py
async def start(self, interval_seconds: float = 15.0) -> None:
    """Start the periodic monitoring loop."""
    if self._task is not None:
        return
    self._running = True
    self._task = asyncio.create_task(self._loop(interval_seconds))
    self._task.add_done_callback(self._on_loop_done)
    _logger.info("monitor.started", interval=interval_seconds)
stop async
stop()

Stop the monitoring loop.

Source code in src/marianne/daemon/monitor.py
async def stop(self) -> None:
    """Stop the monitoring loop."""
    self._running = False
    if self._task is not None:
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass
        self._task = None
    _logger.info("monitor.stopped")
check_now async
check_now()

Run an immediate resource check and return snapshot.

Source code in src/marianne/daemon/monitor.py
async def check_now(self) -> ResourceSnapshot:
    """Run an immediate resource check and return snapshot."""
    running_jobs = 0
    active_sheets = 0
    if self._manager is not None:
        running_jobs = self._manager.running_count
        active_sheets = self._manager.active_job_count

    mem = self._get_memory_usage_mb()
    procs = self._get_child_process_count()
    zombie_pids = self._check_for_zombies()
    probe_failed = mem is None or procs is None

    snapshot = ResourceSnapshot(
        timestamp=time.monotonic(),
        memory_usage_mb=mem if mem is not None else 0.0,
        child_process_count=procs if procs is not None else 0,
        running_jobs=running_jobs,
        active_sheets=active_sheets,
        zombie_pids=zombie_pids,
        probe_failed=probe_failed,
    )
    return snapshot
is_accepting_work
is_accepting_work()

Check if resource usage is below warning thresholds.

Returns True when both memory and process counts are below WARN_THRESHOLD percent of their configured limits. Used by HealthChecker.readiness() to signal backpressure.

Fail-closed: returns False when probes fail or monitor is degraded.

Source code in src/marianne/daemon/monitor.py
def is_accepting_work(self) -> bool:
    """Check if resource usage is below warning thresholds.

    Returns True when both memory and process counts are below
    ``WARN_THRESHOLD`` percent of their configured limits.  Used by
    ``HealthChecker.readiness()`` to signal backpressure.

    Fail-closed: returns False when probes fail or monitor is degraded.
    """
    if self._degraded:
        return False
    mem = self._get_memory_usage_mb()
    procs = self._get_child_process_count()
    if mem is None or procs is None:
        return False
    mem_pct = _compute_percent(mem, self._config.max_memory_mb)
    proc_pct = _compute_percent(procs, self._config.max_processes)
    return mem_pct < self.WARN_THRESHOLD and proc_pct < self.WARN_THRESHOLD
current_memory_mb
current_memory_mb()

Current RSS memory in MB, or None if probes fail.

Source code in src/marianne/daemon/monitor.py
def current_memory_mb(self) -> float | None:
    """Current RSS memory in MB, or None if probes fail."""
    return self._get_memory_usage_mb()
update_limits
update_limits(new_limits)

Hot-apply new resource limits from a SIGHUP config reload.

Replaces the internal _config reference. Safe because the monitor only reads _config during periodic checks, and asyncio's single-threaded event loop prevents concurrent access.

Source code in src/marianne/daemon/monitor.py
def update_limits(self, new_limits: ResourceLimitConfig) -> None:
    """Hot-apply new resource limits from a SIGHUP config reload.

    Replaces the internal ``_config`` reference.  Safe because the
    monitor only reads ``_config`` during periodic checks, and
    asyncio's single-threaded event loop prevents concurrent access.
    """
    old = self._config
    if (
        old.max_memory_mb != new_limits.max_memory_mb
        or old.max_processes != new_limits.max_processes
    ):
        _logger.info(
            "monitor.limits_updated",
            old_memory_mb=old.max_memory_mb,
            new_memory_mb=new_limits.max_memory_mb,
            old_max_processes=old.max_processes,
            new_max_processes=new_limits.max_processes,
        )
    self._config = new_limits
set_manager
set_manager(manager)

Wire up the job manager reference after construction.

Called by DaemonProcess after both the monitor and manager are created, avoiding the circular dependency of needing both at init.

Source code in src/marianne/daemon/monitor.py
def set_manager(self, manager: JobManager) -> None:
    """Wire up the job manager reference after construction.

    Called by DaemonProcess after both the monitor and manager are
    created, avoiding the circular dependency of needing both at init.
    """
    self._manager = manager

Functions