Skip to content

Index

daemon

Marianne daemon service — long-running orchestration conductor.

Classes

BackpressureController

BackpressureController(monitor, rate_coordinator, learning_hub=None)

Manages system load through adaptive backpressure.

Assesses memory usage (as a fraction of the configured limit) and active rate limits to determine a PressureLevel. The scheduler calls can_start_sheet() before dispatching each sheet and gets back a (allowed, delay) tuple.

No internal lock is needed because: - ResourceMonitor methods are thread/coroutine-safe. - RateLimitCoordinator.active_limits is a property that reads a dict snapshot. - PressureLevel assessment is a pure function of those reads.

Source code in src/marianne/daemon/backpressure.py
def __init__(
    self,
    monitor: ResourceMonitor,
    rate_coordinator: RateLimitCoordinator,
    learning_hub: LearningHub | None = None,
) -> None:
    self._monitor = monitor
    self._rate_coordinator = rate_coordinator
    self._learning_hub = learning_hub
Functions
current_level
current_level()

Assess current pressure level from resource metrics.

Thresholds (memory as % of ResourceLimitConfig.max_memory_mb): - probe failure or monitor degraded → CRITICAL (fail-closed) - >95% or monitor not accepting work → CRITICAL - >85% or any active rate limit → HIGH - >70% → MEDIUM - >50% → LOW - otherwise → NONE

Note: is_accepting_work() also checks child process count against ResourceLimitConfig.max_processes. When process count exceeds 80% of the configured limit, is_accepting_work() returns False, which triggers the CRITICAL path here. This means high process counts indirectly escalate backpressure to CRITICAL even when memory usage is low.

TOCTOU fix (P007): memory is read once and reused for all threshold checks. is_accepting_work() is still called for its process-count check, but the memory percentage used in threshold comparisons comes from the single snapshot above.

Source code in src/marianne/daemon/backpressure.py
def current_level(self) -> PressureLevel:
    """Assess current pressure level from resource metrics.

    Thresholds (memory as % of ``ResourceLimitConfig.max_memory_mb``):
      - probe failure or monitor degraded  → CRITICAL (fail-closed)
      - >95% or monitor not accepting work → CRITICAL
      - >85% or any active rate limit      → HIGH
      - >70%                               → MEDIUM
      - >50%                               → LOW
      - otherwise                          → NONE

    Note: ``is_accepting_work()`` also checks child process count
    against ``ResourceLimitConfig.max_processes``.  When process
    count exceeds 80% of the configured limit, ``is_accepting_work()``
    returns ``False``, which triggers the CRITICAL path here.  This
    means high process counts indirectly escalate backpressure to
    CRITICAL even when memory usage is low.

    TOCTOU fix (P007): memory is read once and reused for all
    threshold checks.  ``is_accepting_work()`` is still called for
    its process-count check, but the memory percentage used in
    threshold comparisons comes from the single snapshot above.
    """
    # Single snapshot of memory — avoids TOCTOU where memory changes
    # between our percentage check and is_accepting_work()'s check.
    current_mem = self._monitor.current_memory_mb()
    if current_mem is None or self._monitor.is_degraded:
        return PressureLevel.CRITICAL

    max_mem = max(self._monitor.max_memory_mb, 1)
    memory_pct = current_mem / max_mem

    # is_accepting_work() re-reads memory internally, but we use our
    # snapshot for threshold decisions; it's only called for its
    # process-count gate (returns False when procs > 80% of limit).
    accepting_work = self._monitor.is_accepting_work()

    if memory_pct > 0.95 or not accepting_work:
        return PressureLevel.CRITICAL
    # Rate limit escalation: when any backend is actively rate-limited,
    # escalate to HIGH even if memory is below the 85% threshold.
    # Data flows via JobManager._on_rate_limit → RateLimitCoordinator.
    if memory_pct > 0.85 or self._rate_coordinator.active_limits:
        return PressureLevel.HIGH
    if memory_pct > 0.70:
        return PressureLevel.MEDIUM
    if memory_pct > 0.50:
        return PressureLevel.LOW
    return PressureLevel.NONE
can_start_sheet async
can_start_sheet()

Whether the scheduler may dispatch a sheet, and any delay.

Satisfies the BackpressureChecker protocol used by GlobalSheetScheduler.next_sheet().

Returns:

Type Description
bool

(allowed, delay_seconds). At CRITICAL level the sheet

float

is rejected (allowed=False). At lower levels a positive

tuple[bool, float]

delay is returned to slow dispatch.

Source code in src/marianne/daemon/backpressure.py
async def can_start_sheet(self) -> tuple[bool, float]:
    """Whether the scheduler may dispatch a sheet, and any delay.

    Satisfies the ``BackpressureChecker`` protocol used by
    ``GlobalSheetScheduler.next_sheet()``.

    Returns:
        ``(allowed, delay_seconds)``.  At CRITICAL level the sheet
        is rejected (``allowed=False``).  At lower levels a positive
        delay is returned to slow dispatch.
    """
    level = self.current_level()
    delay = _LEVEL_DELAYS[level]

    if level == PressureLevel.CRITICAL:
        _logger.warning(
            "backpressure.sheet_rejected",
            level=level.value,
        )
        return False, delay

    if delay > 0:
        _logger.info(
            "backpressure.sheet_delayed",
            level=level.value,
            delay_seconds=delay,
        )

    return True, delay
should_accept_job
should_accept_job()

Whether to accept new job submissions.

Returns False only under high resource pressure (memory or process count). Rate limits do NOT cause job rejection — they are per-instrument and handled at the sheet dispatch level by the baton and scheduler (F-149).

This prevents a rate limit on instrument A from blocking jobs that target instrument B.

Source code in src/marianne/daemon/backpressure.py
def should_accept_job(self) -> bool:
    """Whether to accept new job submissions.

    Returns ``False`` only under high **resource** pressure (memory
    or process count).  Rate limits do NOT cause job rejection —
    they are per-instrument and handled at the sheet dispatch level
    by the baton and scheduler (F-149).

    This prevents a rate limit on instrument A from blocking jobs
    that target instrument B.
    """
    current_mem = self._monitor.current_memory_mb()
    if current_mem is None or self._monitor.is_degraded:
        _logger.info(
            "backpressure.job_rejected",
            level="critical",
            reason="resource",
        )
        return False

    max_mem = max(self._monitor.max_memory_mb, 1)
    memory_pct = current_mem / max_mem
    accepting_work = self._monitor.is_accepting_work()

    if memory_pct > 0.85 or not accepting_work:
        _logger.info(
            "backpressure.job_rejected",
            level="high",
            reason="resource",
            memory_pct=round(memory_pct, 2),
        )
        return False

    return True
rejection_reason
rejection_reason()

Why would a job be rejected right now?

Returns:

Type Description
str | None

None if the system would accept the job.

str | None

"resource" if memory or process pressure is the cause.

Rate limits alone no longer cause rejection (F-149). They are per-instrument concerns handled at the sheet dispatch level. This prevents a rate limit on one instrument from blocking jobs targeting different instruments.

Source code in src/marianne/daemon/backpressure.py
def rejection_reason(self) -> str | None:
    """Why would a job be rejected right now?

    Returns:
        ``None`` if the system would accept the job.
        ``"resource"`` if memory or process pressure is the cause.

    Rate limits alone no longer cause rejection (F-149).  They are
    per-instrument concerns handled at the sheet dispatch level.
    This prevents a rate limit on one instrument from blocking jobs
    targeting different instruments.
    """
    current_mem = self._monitor.current_memory_mb()
    if current_mem is None or self._monitor.is_degraded:
        return "resource"

    max_mem = max(self._monitor.max_memory_mb, 1)
    memory_pct = current_mem / max_mem
    accepting_work = self._monitor.is_accepting_work()

    # Critical resource pressure — always resource
    if memory_pct > 0.95 or not accepting_work:
        return "resource"

    # High memory (>85%) — resource pressure regardless of rate limits
    if memory_pct > 0.85:
        return "resource"

    # No pressure — rate limits handled at sheet dispatch level
    return None
estimate_job_resource_needs async
estimate_job_resource_needs(job_config_hash)

Query learned resource patterns for similar job types.

Looks up RESOURCE_CORRELATION patterns from the learning store that match the given job config hash. Returns a ResourceEstimate if sufficient historical data exists, otherwise None.

This is advisory — the caller uses it to adjust thresholds, not to block jobs outright.

Parameters:

Name Type Description Default
job_config_hash str

Stable hash identifying the job type/config.

required

Returns:

Type Description
ResourceEstimate | None

ResourceEstimate with predicted peak memory, CPU-time,

ResourceEstimate | None

and confidence, or None if no data is available.

Source code in src/marianne/daemon/backpressure.py
async def estimate_job_resource_needs(
    self, job_config_hash: str
) -> ResourceEstimate | None:
    """Query learned resource patterns for similar job types.

    Looks up ``RESOURCE_CORRELATION`` patterns from the learning store
    that match the given job config hash.  Returns a ``ResourceEstimate``
    if sufficient historical data exists, otherwise ``None``.

    This is advisory — the caller uses it to adjust thresholds, not
    to block jobs outright.

    Args:
        job_config_hash: Stable hash identifying the job type/config.

    Returns:
        ``ResourceEstimate`` with predicted peak memory, CPU-time,
        and confidence, or ``None`` if no data is available.
    """
    if self._learning_hub is None or not self._learning_hub.is_running:
        return None

    from marianne.daemon.profiler.models import ResourceEstimate as RE
    from marianne.learning.patterns import PatternType

    try:
        store = self._learning_hub.store
        patterns = store.get_patterns(
            pattern_type=PatternType.RESOURCE_CORRELATION.value,
            limit=10,
            min_priority=0.0,
        )

        if not patterns:
            return None

        # Aggregate resource estimates from matching patterns
        peak_memory_values: list[float] = []
        cpu_values: list[float] = []
        confidence_sum = 0.0

        for pattern in patterns:
            desc = pattern.description or ""
            # Extract memory hints from description
            if "RSS" in desc or "memory" in desc.lower():
                # Use effectiveness_score as a rough confidence
                confidence_sum += pattern.effectiveness_score

            # Look for memory_bin context tags for peak memory hints
            tags = pattern.context_tags or ""
            if isinstance(tags, str):
                tag_str = tags
            elif isinstance(tags, list):
                tag_str = ",".join(tags)
            else:
                tag_str = str(tags)

            # Parse failure_rate from tags for signal strength
            if "memory_bin:" in tag_str:
                # Presence of memory correlation patterns gives us
                # a rough estimate based on bin boundaries
                if "<256MB" in tag_str:
                    peak_memory_values.append(256.0)
                elif "256-512MB" in tag_str:
                    peak_memory_values.append(512.0)
                elif "512MB-1GB" in tag_str:
                    peak_memory_values.append(1024.0)
                elif "1-2GB" in tag_str:
                    peak_memory_values.append(2048.0)
                elif ">2GB" in tag_str:
                    peak_memory_values.append(4096.0)

        if not peak_memory_values and confidence_sum == 0:
            return None

        avg_memory = (
            sum(peak_memory_values) / len(peak_memory_values)
            if peak_memory_values
            else 0.0
        )
        avg_cpu = (
            sum(cpu_values) / len(cpu_values)
            if cpu_values
            else 0.0
        )
        confidence = min(1.0, confidence_sum / max(len(patterns), 1))

        return RE(
            estimated_peak_memory_mb=avg_memory,
            estimated_cpu_seconds=avg_cpu,
            confidence=confidence,
        )

    except Exception:
        _logger.debug(
            "backpressure.resource_estimate_failed",
            job_config_hash=job_config_hash,
            exc_info=True,
        )
        return None

PressureLevel

Bases: Enum

Graduated pressure levels for adaptive load management.

DaemonConfig

Bases: BaseModel

Top-level configuration for the Marianne conductor.

Controls socket binding, PID file location, concurrency limits, resource constraints, and state backend selection. Follows the same Field() conventions as marianne.core.config.

HealthChecker

HealthChecker(manager, monitor, *, start_time=None, learning_store=None)

Daemon health and readiness probes.

Evolution v25: Entropy Response Activation - added periodic entropy monitoring and automatic diversity injection when collapse is detected.

Parameters

manager: The JobManager instance for job count queries. monitor: The ResourceMonitor instance for resource threshold checks. learning_store: Optional learning store for entropy monitoring. If None, entropy checks are disabled.

Source code in src/marianne/daemon/health.py
def __init__(
    self,
    manager: JobManager,
    monitor: ResourceMonitor,
    *,
    start_time: float | None = None,
    learning_store: GlobalLearningStore | None = None,
) -> None:
    self._manager = manager
    self._monitor = monitor
    self._start_time = start_time or time.monotonic()
    self._learning_store = learning_store
    self._last_entropy_check = 0.0
    self._entropy_check_task: asyncio.Task[None] | None = None
    self._completed_jobs_since_check = 0
Functions
liveness async
liveness()

Is the daemon process alive and responsive?

This is the cheapest possible check — if the daemon can execute this handler and return a response, it's alive. No resource checks or I/O are performed.

Source code in src/marianne/daemon/health.py
async def liveness(self) -> dict[str, Any]:
    """Is the daemon process alive and responsive?

    This is the cheapest possible check — if the daemon can execute
    this handler and return a response, it's alive.  No resource
    checks or I/O are performed.
    """
    return {
        "status": "ok",
        "pid": os.getpid(),
        "uptime_seconds": round(time.monotonic() - self._start_time, 1),
        "shutting_down": self._manager.shutting_down,
    }
readiness async
readiness()

Is the daemon ready to accept new jobs?

Checks resource thresholds via the monitor, job failure rate via the manager, and notification health. Returns "ready" when resources are within limits, the failure rate is not elevated, and notifications are functional; "not_ready" otherwise.

Source code in src/marianne/daemon/health.py
async def readiness(self) -> dict[str, Any]:
    """Is the daemon ready to accept new jobs?

    Checks resource thresholds via the monitor, job failure rate
    via the manager, and notification health.  Returns ``"ready"``
    when resources are within limits, the failure rate is not
    elevated, and notifications are functional; ``"not_ready"``
    otherwise.
    """
    snapshot = await self._monitor.check_now()
    accepting = self._monitor.is_accepting_work()
    failure_elevated = self._manager.failure_rate_elevated
    notif_degraded = self._manager.notifications_degraded

    shutting_down = self._manager.shutting_down
    is_ready = (
        accepting
        and not shutting_down
        and not failure_elevated
        and not notif_degraded
    )
    return {
        "status": "ready" if is_ready else "not_ready",
        "running_jobs": self._manager.running_count,
        "memory_mb": round(snapshot.memory_usage_mb, 1),
        "child_processes": snapshot.child_process_count,
        "accepting_work": is_ready,
        "shutting_down": shutting_down,
        "failure_rate_elevated": failure_elevated,
        "notifications_degraded": notif_degraded,
        "uptime_seconds": round(time.monotonic() - self._start_time, 1),
    }
on_job_completed
on_job_completed()

Called by JobManager when a job completes.

Evolution v25: Entropy Response Activation - tracks completed jobs to trigger entropy checks every 10 completions.

Source code in src/marianne/daemon/health.py
def on_job_completed(self) -> None:
    """Called by JobManager when a job completes.

    Evolution v25: Entropy Response Activation - tracks completed jobs
    to trigger entropy checks every 10 completions.
    """
    self._completed_jobs_since_check += 1

    # Check entropy every 10 completed jobs (in addition to time-based checks)
    if self._completed_jobs_since_check >= 10:
        self._completed_jobs_since_check = 0
        # Schedule async check without blocking
        if self._learning_store is not None:
            asyncio.create_task(self._check_entropy_and_respond())
start_periodic_checks async
start_periodic_checks()

Start background task for periodic entropy checks.

Evolution v25: Entropy Response Activation - runs entropy checks at configured intervals (default 1 hour).

Source code in src/marianne/daemon/health.py
async def start_periodic_checks(self) -> None:
    """Start background task for periodic entropy checks.

    Evolution v25: Entropy Response Activation - runs entropy checks
    at configured intervals (default 1 hour).
    """
    if self._learning_store is None:
        return

    async def _check_loop() -> None:
        config = self._manager._config.learning
        interval = config.entropy_check_interval_seconds

        while not self._manager.shutting_down:
            await asyncio.sleep(interval)
            await self._check_entropy_and_respond()

    self._entropy_check_task = asyncio.create_task(_check_loop())
stop_periodic_checks async
stop_periodic_checks()

Stop the periodic entropy check task.

Evolution v25: Entropy Response Activation - cleanly shuts down the entropy monitoring loop.

Source code in src/marianne/daemon/health.py
async def stop_periodic_checks(self) -> None:
    """Stop the periodic entropy check task.

    Evolution v25: Entropy Response Activation - cleanly shuts down
    the entropy monitoring loop.
    """
    if self._entropy_check_task is not None:
        self._entropy_check_task.cancel()
        try:
            await self._entropy_check_task
        except asyncio.CancelledError:
            pass
        self._entropy_check_task = None

JobService

JobService(*, output=None, global_learning_store=None, rate_limit_callback=None, event_callback=None, state_publish_callback=None, registry=None, token_warning_threshold=None, token_error_threshold=None, pgroup_manager=None)

Core job execution service.

Encapsulates the logic from CLI run/resume/pause commands into a reusable service that can be called from CLI, daemon, dashboard, or MCP server.

All user-facing output goes through the OutputProtocol abstraction, allowing different frontends (Rich console, structlog, SSE, null) to receive execution events without code changes.

Source code in src/marianne/daemon/job_service.py
def __init__(
    self,
    *,
    output: OutputProtocol | None = None,
    global_learning_store: GlobalLearningStore | None = None,
    rate_limit_callback: RateLimitCallback | None = None,
    event_callback: EventCallback | None = None,
    state_publish_callback: StatePublishCallback | None = None,
    registry: JobRegistry | None = None,
    token_warning_threshold: int | None = None,
    token_error_threshold: int | None = None,
    pgroup_manager: ProcessGroupManager | None = None,
) -> None:
    self._output = output or NullOutput()
    self._learning_store = global_learning_store
    self._rate_limit_callback = rate_limit_callback
    self._event_callback = event_callback
    self._state_publish_callback = state_publish_callback
    self._registry = registry
    self._token_warning_threshold = token_warning_threshold
    self._token_error_threshold = token_error_threshold
    self._pgroup_manager = pgroup_manager
    self._notification_consecutive_failures = 0
    self._notifications_degraded = False
Attributes
notifications_degraded property
notifications_degraded

Whether notification delivery is degraded.

Returns True after _NOTIFICATION_DEGRADED_THRESHOLD consecutive notification failures. Readable by HealthChecker.readiness() to signal degraded notification capability to operators.

Functions
pause_job async
pause_job(job_id, workspace)

Pause a running job via signal file.

Mirrors the logic in cli/commands/pause.py::_pause_job(): Creates a pause signal file that the runner polls at sheet boundaries.

Parameters:

Name Type Description Default
job_id str

Job identifier to pause.

required
workspace Path

Workspace directory containing job state.

required

Returns:

Type Description
bool

True if pause signal was created successfully.

Raises:

Type Description
JobSubmissionError

If job not found or not in a pausable state.

Source code in src/marianne/daemon/job_service.py
async def pause_job(self, job_id: str, workspace: Path) -> bool:
    """Pause a running job via signal file.

    Mirrors the logic in cli/commands/pause.py::_pause_job():
    Creates a pause signal file that the runner polls at sheet boundaries.

    Args:
        job_id: Job identifier to pause.
        workspace: Workspace directory containing job state.

    Returns:
        True if pause signal was created successfully.

    Raises:
        JobSubmissionError: If job not found or not in a pausable state.
    """
    found_state, found_backend = await self._find_job_state(job_id, workspace)
    await found_backend.close()

    if found_state.status != JobStatus.RUNNING:
        raise JobSubmissionError(
            f"Job '{job_id}' is {found_state.status.value}, not running. "
            "Only running jobs can be paused."
        )

    # Create pause signal file
    signal_file = workspace / f".marianne-pause-{job_id}"
    signal_file.touch()

    self._output.job_event(
        job_id,
        "pause_signal_sent",
        {
            "signal_file": str(signal_file),
        },
    )

    return True
get_status async
get_status(job_id, workspace, backend_type='sqlite')

Get job status from state backend.

Parameters:

Name Type Description Default
job_id str

Job identifier.

required
workspace Path

Workspace directory containing job state.

required
backend_type str

State backend type (default "sqlite" for daemon).

'sqlite'

Returns:

Type Description
CheckpointState | None

CheckpointState if found, None if job doesn't exist.

Source code in src/marianne/daemon/job_service.py
async def get_status(
    self,
    job_id: str,
    workspace: Path,
    backend_type: str = "sqlite",
) -> CheckpointState | None:
    """Get job status from state backend.

    Args:
        job_id: Job identifier.
        workspace: Workspace directory containing job state.
        backend_type: State backend type (default "sqlite" for daemon).

    Returns:
        CheckpointState if found, None if job doesn't exist.
    """
    state_backend = self._create_state_backend(workspace, backend_type)
    try:
        return await state_backend.load(job_id)
    finally:
        await state_backend.close()

JobManager

JobManager(config, *, start_time=None, monitor=None, pgroup=None)

Manages concurrent job execution within the daemon.

Wraps JobService with task tracking and concurrency control. Each submitted job becomes an asyncio.Task that the manager tracks from start to completion/cancellation.

Source code in src/marianne/daemon/manager.py
def __init__(
    self,
    config: DaemonConfig,
    *,
    start_time: float | None = None,
    monitor: ResourceMonitor | None = None,
    pgroup: ProcessGroupManager | None = None,
) -> None:
    self._config = config
    self._start_time = start_time or time.monotonic()
    self._pgroup = pgroup

    # Phase 3: Centralized learning hub.
    # Single GlobalLearningStore shared across all jobs — pattern
    # discoveries in Job A are instantly available to Job B.
    self._learning_hub = LearningHub()

    # Deferred to start() where the learning hub's store is available.
    self._service: JobService | None = None
    self._jobs: dict[str, asyncio.Task[Any]] = {}
    self._job_meta: dict[str, JobMeta] = {}
    # Live CheckpointState per running job — populated by
    # _PublishingBackend on every state_backend.save() so the
    # conductor can serve status from memory, not disk.
    # Keyed by conductor job_id (which may be deduplicated, e.g.
    # "issue-solver-2"), not the config's name field.
    self._live_states: dict[str, CheckpointState] = {}
    # In-process pause events per job — set by pause_job(), checked by
    # the runner at sheet boundaries.  Keyed by conductor job_id.
    self._pause_events: dict[str, asyncio.Event] = {}
    # Explicit config.name → conductor_id mapping.  Populated in
    # _run_job_task when the config is parsed (config.name becomes
    # known).  Used by _on_state_published as a fallback when
    # state.job_id doesn't match any _job_meta key — O(1) lookup
    # instead of the fragile linear scan it replaces.
    self._config_name_to_conductor_id: dict[str, str] = {}
    # Jobs queued as PENDING during rate limit backpressure.
    # Keyed by conductor job_id.  Auto-started when limits clear.
    self._pending_jobs: dict[str, JobRequest] = {}
    self._concurrency_semaphore = asyncio.Semaphore(
        config.max_concurrent_jobs,
    )
    self._id_gen_lock = asyncio.Lock()
    self._shutting_down = False
    self._shutdown_event = asyncio.Event()
    self._recent_failures: deque[float] = deque()
    # v25: Optional entropy check callback set by process.py after health checker init
    self._entropy_check_callback: Callable[[], None] | None = None

    # Phase 3: Global sheet scheduler — lazily initialized via property.
    # Infrastructure is built and tested but not yet wired into the
    # execution path.  Currently, jobs run monolithically via
    # JobService.start_job().  Lazy init avoids allocating resources
    # until Phase 3 is actually wired.
    self._scheduler_instance: GlobalSheetScheduler | None = None

    # Phase 3: Cross-job rate limit coordination.
    # Built and tested; wired into the scheduler so next_sheet()
    # skips rate-limited backends.  Not yet active because the
    # scheduler itself is not yet driving execution.
    self._rate_coordinator = RateLimitCoordinator()

    # Fleet management — tracks running fleets for fleet-level operations
    self._fleet_records: dict[str, Any] = {}

    # Phase 3: Backpressure controller.
    # Uses a single ResourceMonitor instance shared with DaemonProcess
    # for both periodic monitoring and point-in-time backpressure checks.
    # When no monitor is injected (e.g. unit tests), a standalone one
    # is created that only does point-in-time reads.
    self._monitor = monitor or ResourceMonitor(config.resource_limits, manager=self)
    self._backpressure = BackpressureController(
        self._monitor, self._rate_coordinator,
    )

    # Persistent job registry — survives daemon restarts.
    db_path = config.state_db_path.expanduser()
    self._registry = JobRegistry(db_path)

    # Event bus for routing runner and observer events to consumers.
    self._event_bus = EventBus(
        max_queue_size=config.observer.max_queue_size,
    )

    # Semantic analyzer — LLM-based analysis of sheet completions.
    # Initialized in start() after the event bus is ready.
    self._semantic_analyzer: SemanticAnalyzer | None = None

    # Phase 4: Completion snapshots — captures workspace artifacts
    # at job completion with TTL-based cleanup.
    self._snapshot_manager = SnapshotManager()

    # Observer event recorder — persists per-job observer events to JSONL.
    # Initialized eagerly, started in start() after event bus.
    self._observer_recorder: ObserverRecorder | None = None

    # Baton adapter — the execution engine for all jobs.
    # Initialized in start(). Import deferred to avoid circular import.
    from marianne.daemon.baton.adapter import BatonAdapter
    self._baton_adapter: BatonAdapter | None = None
    self._baton_loop_task: asyncio.Task[Any] | None = None
Attributes
uptime_seconds property
uptime_seconds

Seconds since the daemon started (monotonic clock).

shutting_down property
shutting_down

Whether the manager is in the process of shutting down.

running_count property
running_count

Number of currently running jobs.

active_job_count property
active_job_count

Number of concurrently executing jobs (used for fair-share scheduling).

Currently returns running_count (job-level granularity). Phase 3 will replace this with self._scheduler.active_count for sheet-level granularity once per-sheet dispatch is wired.

failure_rate_elevated property
failure_rate_elevated

Whether the recent job failure rate is elevated.

Returns True if more than _FAILURE_RATE_THRESHOLD unexpected exceptions have occurred within the last _FAILURE_RATE_WINDOW seconds. Used by HealthChecker.readiness() to degrade the health signal when systemic failures are occurring.

notifications_degraded property
notifications_degraded

Whether notification delivery is degraded (forwarded from JobService).

scheduler property
scheduler

Access the global sheet scheduler for cross-job coordination.

rate_coordinator property
rate_coordinator

Access the rate limit coordinator for cross-job rate limiting.

backpressure property
backpressure

Access the backpressure controller for load management.

learning_hub property
learning_hub

Access the centralized learning hub.

observer_recorder property
observer_recorder

Access the observer event recorder for IPC.

event_bus property
event_bus

Access the event bus for subscribing to events.

Functions
start async
start()

Start daemon subsystems (learning hub, monitor, etc.).

Source code in src/marianne/daemon/manager.py
async def start(self) -> None:
    """Start daemon subsystems (learning hub, monitor, etc.)."""
    # Open the async registry connection (tables + WAL mode)
    await self._registry.open()

    # Recover orphaned jobs (left running/queued from previous daemon).
    # Pause-aware: check each orphan's checkpoint to distinguish truly
    # running jobs from those that were mid-pause when the daemon died.
    orphans = await self._registry.get_orphaned_jobs()
    if orphans:
        failed_count = 0
        paused_count = 0
        for orphan in orphans:
            target_status = self._classify_orphan(orphan)
            await self._registry.update_status(
                orphan.job_id,
                target_status,
                error_message=(
                    "Daemon restarted while job was active"
                    if target_status == DaemonJobStatus.FAILED
                    else None
                ),
            )
            if target_status == DaemonJobStatus.PAUSED:
                paused_count += 1
            else:
                failed_count += 1
        _logger.info(
            "manager.orphans_recovered",
            count=len(orphans),
            failed=failed_count,
            paused=paused_count,
        )

    # Restore ALL job metadata from registry into memory so that
    # RPC handlers (status, resume, pause, errors, …) work for
    # jobs from previous daemon sessions without per-method fallback.
    # F-077: Also restore hook_config so on_success hooks fire after restart.
    all_records = await self._registry.list_jobs(limit=10_000)
    for record in all_records:
        if record.job_id not in self._job_meta:
            # Restore hook_config from registry (F-077: was missing,
            # causing on_success hooks to silently stop after restart)
            hook_config: list[dict[str, Any]] | None = None
            hook_json = await self._registry.get_hook_config(
                record.job_id,
            )
            if hook_json:
                import json
                hook_config = json.loads(hook_json)

            self._job_meta[record.job_id] = JobMeta(
                job_id=record.job_id,
                config_path=Path(record.config_path),
                workspace=Path(record.workspace),
                submitted_at=record.submitted_at,
                started_at=record.started_at,
                status=record.status,
                error_message=record.error_message,
                hook_config=hook_config,
            )
    if all_records:
        _logger.info(
            "manager.registry_restored",
            total=len(all_records),
            loaded=len(self._job_meta),
        )

    await self._learning_hub.start()
    await self._event_bus.start()
    # Create service with shared store now that the hub is initialized
    self._service = JobService(
        output=StructuredOutput(event_bus=self._event_bus),
        global_learning_store=self._learning_hub.store,
        rate_limit_callback=self._on_rate_limit,
        event_callback=self._on_event,
        state_publish_callback=self._on_state_published,
        registry=self._registry,
        token_warning_threshold=self._config.preflight.token_warning_threshold,
        token_error_threshold=self._config.preflight.token_error_threshold,
        pgroup_manager=self._pgroup,
    )
    # Start semantic analyzer after event bus (needs bus for subscription).
    # Failure must not prevent the conductor from starting.
    try:
        from marianne.execution.setup import create_backend_from_config

        semantic_backend = create_backend_from_config(
            self._config.learning.backend,
        )
        self._semantic_analyzer = SemanticAnalyzer(
            config=self._config.learning,
            backend=semantic_backend,
            learning_hub=self._learning_hub,
            live_states=self._live_states,
        )
        await self._semantic_analyzer.start(self._event_bus)
    except (OSError, ValueError, RuntimeError, TypeError):
        _logger.warning(
            "manager.semantic_analyzer_start_failed",
            exc_info=True,
        )
        self._semantic_analyzer = None

    # Start observer recorder after event bus (needs bus for subscription).
    # Guard: observer.enabled, NOT persist_events. The ring buffer serves
    # mzt top even when persistence is off.
    if self._config.observer.enabled:
        self._observer_recorder = ObserverRecorder(
            config=self._config.observer,
        )
        await self._observer_recorder.start(self._event_bus)

    # Initialize the baton execution engine.
    from marianne.daemon.baton.adapter import BatonAdapter
    from marianne.daemon.baton.backend_pool import BackendPool
    from marianne.instruments.loader import load_all_profiles
    from marianne.instruments.registry import InstrumentRegistry

    # Build instrument registry with all available profiles
    profiles = load_all_profiles()
    registry = InstrumentRegistry()
    for profile in profiles.values():
        registry.register(profile, override=True)

    self._baton_adapter = BatonAdapter(
        event_bus=self._event_bus,
        max_concurrent_sheets=self._config.max_concurrent_sheets,
        persist_callback=self._on_baton_persist,
    )
    self._baton_adapter.set_backend_pool(
        BackendPool(registry, pgroup=self._pgroup)
    )

    # Populate per-model concurrency from instrument profiles
    for profile in profiles.values():
        for model in profile.models:
            self._baton_adapter._baton.set_model_concurrency(
                profile.name, model.name, model.max_concurrent,
            )

    # Start the baton's event loop as a background task
    self._baton_loop_task = asyncio.create_task(
        self._baton_adapter.run(),
        name="baton-loop",
    )
    _logger.info("manager.baton_adapter_started")

    # Recover paused orphans through the baton.
    await self._recover_baton_orphans()

    _logger.info(
        "manager.started",
        scheduler_status="lazy_not_wired",
        scheduler_note="Phase 3 scheduler is lazily initialized and not "
        "yet driving execution. Jobs run through the baton engine.",
        semantic_analyzer="active" if self._semantic_analyzer else "unavailable",
        observer_recorder="active" if self._observer_recorder else "unavailable",
        baton_adapter="active",
    )
apply_config
apply_config(new_config)

Hot-apply reloadable config fields from a SIGHUP reload.

Compares the new config against the current one and applies changes that can be safely updated at runtime. Rebuilds the concurrency semaphore if max_concurrent_jobs changed.

Safe because asyncio is single-threaded — this runs in the event loop, so no concurrent access to _config or _concurrency_semaphore is possible.

Source code in src/marianne/daemon/manager.py
def apply_config(self, new_config: DaemonConfig) -> None:
    """Hot-apply reloadable config fields from a SIGHUP reload.

    Compares the new config against the current one and applies
    changes that can be safely updated at runtime.  Rebuilds the
    concurrency semaphore if ``max_concurrent_jobs`` changed.

    Safe because asyncio is single-threaded — this runs in the
    event loop, so no concurrent access to ``_config`` or
    ``_concurrency_semaphore`` is possible.
    """
    old = self._config

    # Rebuild semaphore if concurrency limit changed
    if new_config.max_concurrent_jobs != old.max_concurrent_jobs:
        _logger.info(
            "manager.config_reloaded",
            field="max_concurrent_jobs",
            old_value=old.max_concurrent_jobs,
            new_value=new_config.max_concurrent_jobs,
        )
        self._concurrency_semaphore = asyncio.Semaphore(
            new_config.max_concurrent_jobs,
        )

    # Log other changed reloadable fields
    _reloadable_fields = [
        "job_timeout_seconds",
        "shutdown_timeout_seconds",
        "max_job_history",
        "monitor_interval_seconds",
    ]
    for field_name in _reloadable_fields:
        old_val = getattr(old, field_name)
        new_val = getattr(new_config, field_name)
        if old_val != new_val:
            _logger.info(
                "manager.config_reloaded",
                field=field_name,
                old_value=old_val,
                new_value=new_val,
            )

    self._config = new_config

    # Propagate preflight thresholds to job service for new runners
    if self._service is not None:
        self._service._token_warning_threshold = new_config.preflight.token_warning_threshold
        self._service._token_error_threshold = new_config.preflight.token_error_threshold
update_job_config_metadata
update_job_config_metadata(job_id, *, config_path=None, workspace=None)

Update config-derived metadata in the in-memory job map.

Source code in src/marianne/daemon/manager.py
def update_job_config_metadata(
    self,
    job_id: str,
    *,
    config_path: Path | None = None,
    workspace: Path | None = None,
) -> None:
    """Update config-derived metadata in the in-memory job map."""
    meta = self._job_meta.get(job_id)
    if meta is None:
        return
    if config_path is not None:
        meta.config_path = config_path
    if workspace is not None:
        meta.workspace = workspace
submit_job async
submit_job(request)

Validate config, create task, return immediately.

Source code in src/marianne/daemon/manager.py
async def submit_job(self, request: JobRequest) -> JobResponse:
    """Validate config, create task, return immediately."""
    if self._shutting_down:
        return JobResponse(
            job_id="",
            status="rejected",
            message="Daemon is shutting down",
        )

    if not self._backpressure.should_accept_job():
        # F-149: should_accept_job() only rejects for resource pressure
        # (memory/processes). Rate limits are per-instrument and handled
        # at the sheet dispatch level — they don't block job submission.
        return JobResponse(
            job_id="",
            status="rejected",
            message="System under high resource pressure — try again later",
        )

    job_id = self._get_job_id(request.config_path.stem)

    # Validate config exists and resolve workspace BEFORE acquiring the
    # lock. Config parsing is expensive and doesn't need serialization
    # — it's idempotent and job-independent.
    if not request.config_path.exists():
        return JobResponse(
            job_id=job_id,
            status="rejected",
            message=f"Config file not found: {request.config_path}",
        )

    # Fleet detection: route fleet configs to the fleet manager
    from marianne.daemon.fleet import is_fleet_config, submit_fleet

    if is_fleet_config(request.config_path):
        from marianne.core.config.fleet import FleetConfig

        try:
            with open(request.config_path) as f:
                raw = yaml.safe_load(f)
            fleet_config = FleetConfig.model_validate(raw)
        except Exception as exc:
            return JobResponse(
                job_id=job_id,
                status="rejected",
                message=f"Failed to parse fleet config: {exc}",
            )
        return await submit_fleet(self, request.config_path, fleet_config)

    # Parse config for workspace resolution and hook extraction.
    # When workspace is provided explicitly, parsing is best-effort
    # (hooks won't be available if it fails, but the job still runs).
    from marianne.core.config import JobConfig

    parsed_config: JobConfig | None = None
    if request.workspace:
        workspace = request.workspace
        try:
            parsed_config = JobConfig.from_yaml(request.config_path)
        except (ValueError, OSError, KeyError, yaml.YAMLError):
            _logger.debug(
                "manager.config_parse_for_hooks_failed",
                job_id=job_id,
                config_path=str(request.config_path),
            )
    else:
        try:
            parsed_config = JobConfig.from_yaml(request.config_path)
            workspace = parsed_config.workspace
        except (ValueError, OSError, KeyError, yaml.YAMLError) as exc:
            _logger.error(
                "manager.config_parse_failed",
                job_id=job_id,
                config_path=str(request.config_path),
                exc_info=True,
            )
            return JobResponse(
                job_id=job_id,
                status="rejected",
                message=(
                    f"Failed to parse config file: "
                    f"{request.config_path} ({exc}). "
                    "Cannot determine workspace. "
                    "Fix the config or pass --workspace explicitly."
                ),
            )

    # Resolve relative workspace against client_cwd (working directory fix).
    # When the CLI sends client_cwd, relative workspace paths from the
    # config should resolve against where the user invoked the command,
    # not where the daemon was spawned.
    if request.client_cwd and not workspace.is_absolute():
        workspace = (request.client_cwd / workspace).resolve()
        _logger.debug(
            "manager.workspace_resolved_from_client_cwd",
            job_id=job_id,
            client_cwd=str(request.client_cwd),
            workspace=str(workspace),
        )

    # Extract hook config from parsed config for daemon-owned execution.
    hook_config_list: list[dict[str, Any]] | None = None
    concert_config_dict: dict[str, Any] | None = None
    if parsed_config and parsed_config.on_success:
        hook_config_list = [
            h.model_dump(mode="json") for h in parsed_config.on_success
        ]
    if parsed_config and parsed_config.concert.enabled:
        concert_config_dict = parsed_config.concert.model_dump(mode="json")

    # Early workspace validation: reject jobs whose workspace parent
    # doesn't exist or isn't writable, instead of failing deep in
    # JobService.start_job(). Workspace itself may not exist yet —
    # it gets created by JobService — but the parent must be valid.
    ws_parent = workspace.parent
    if not ws_parent.exists():
        return JobResponse(
            job_id=job_id,
            status="rejected",
            message=(
                f"Workspace parent directory does not exist: {ws_parent}. "
                "Create the parent directory or change the workspace path."
            ),
        )
    if not os.access(ws_parent, os.W_OK):
        return JobResponse(
            job_id=job_id,
            status="rejected",
            message=(
                f"Workspace parent directory is not writable: {ws_parent}. "
                "Fix permissions or change the workspace path."
            ),
        )

    # Serialize only the duplicate-check → register → insert window
    # to prevent TOCTOU races between concurrent submissions.
    async with self._id_gen_lock:
        # Reject if a job with this name is already active
        existing = self._job_meta.get(job_id)
        if existing and existing.status in (DaemonJobStatus.QUEUED, DaemonJobStatus.RUNNING):
            return JobResponse(
                job_id=job_id,
                status="rejected",
                message=(
                    f"Job '{job_id}' is already {existing.status.value}. "
                    "Use 'mzt pause' or 'mzt cancel' first, or wait for it to finish."
                ),
            )

        # Auto-detect changed score file on re-run (#103).
        # When a COMPLETED job exists and --fresh wasn't set, check
        # if the score file was modified after the last run completed.
        if not request.fresh:
            record = await self._registry.get_job(job_id)
            if (
                record is not None
                and record.status == DaemonJobStatus.COMPLETED
                and _should_auto_fresh(request.config_path, record.completed_at)
            ):
                request = request.model_copy(update={"fresh": True})
                _logger.info(
                    "auto_fresh.score_changed",
                    job_id=job_id,
                    config_path=str(request.config_path),
                    message="Score file modified since last completed run — starting fresh",
                )

        meta = JobMeta(
            job_id=job_id,
            config_path=request.config_path,
            workspace=workspace,
            chain_depth=request.chain_depth,
            hook_config=hook_config_list,
            concert_config=concert_config_dict,
        )
        # Register in DB first — if this fails, no phantom in-memory entry
        await self._registry.register_job(job_id, request.config_path, workspace)
        self._job_meta[job_id] = meta

        # Persist hook config to registry for restart resilience
        if hook_config_list:
            import json
            await self._registry.store_hook_config(
                job_id, json.dumps(hook_config_list),
            )

    try:
        task = asyncio.create_task(
            self._run_job_task(job_id, request),
            name=f"job-{job_id}",
        )
    except RuntimeError:
        # Clean up metadata if task creation fails
        # RuntimeError is raised by asyncio when no running event loop
        self._job_meta.pop(job_id, None)
        await self._registry.update_status(
            job_id, DaemonJobStatus.FAILED, error_message="Task creation failed",
        )
        raise
    self._jobs[job_id] = task
    task.add_done_callback(lambda t: self._on_task_done(job_id, t))

    _logger.info(
        "job.submitted",
        job_id=job_id,
        config_path=str(request.config_path),
    )

    return JobResponse(
        job_id=job_id,
        status="accepted",
        message=f"Job queued (concurrency limit: {self._config.max_concurrent_jobs})",
    )
get_job_status async
get_job_status(job_id, workspace=None)

Get full status of a specific job.

Resolution order (no workspace/disk fallback): 1. Live in-memory state (running jobs) 2. Registry checkpoint (historical jobs — persisted on every save) 3. Basic metadata (jobs that never ran / pre-checkpoint registry)

Source code in src/marianne/daemon/manager.py
async def get_job_status(self, job_id: str, workspace: Path | None = None) -> dict[str, Any]:
    """Get full status of a specific job.

    Resolution order (no workspace/disk fallback):
    1. Live in-memory state (running jobs)
    2. Registry checkpoint (historical jobs — persisted on every save)
    3. Basic metadata (jobs that never ran / pre-checkpoint registry)
    """
    _ = workspace  # Unused — daemon is the single source of truth

    meta = self._job_meta.get(job_id)
    record: JobRecord | None = None
    if meta is None:
        # Check the persistent registry for historical jobs
        record = await self._registry.get_job(job_id)
        if record is None:
            raise JobSubmissionError(f"Job '{job_id}' not found")

    # 1. Live in-memory state (running jobs)
    live = self._live_states.get(job_id)
    if live is not None:
        return live.model_dump(mode="json")

    # 2. Registry checkpoint (historical/terminal jobs)
    #    Skip if meta shows an active status — the checkpoint is stale
    #    between resume acceptance and the first new state save.
    _active = (DaemonJobStatus.QUEUED, DaemonJobStatus.RUNNING)
    if meta is None or meta.status not in _active:
        try:
            checkpoint_json = await self._registry.load_checkpoint(job_id)
            if checkpoint_json is not None:
                import json
                data: dict[str, Any] = json.loads(checkpoint_json)
                # Override checkpoint status with the registry's
                # authoritative status. The checkpoint may have been
                # persisted before a cancel/fail was recorded in the
                # registry's status column.
                authoritative_status = (
                    meta.status.value if meta is not None
                    else (record.status.value if record is not None else None)
                )
                if authoritative_status and data.get("status") != authoritative_status:
                    data["status"] = authoritative_status
                return data
        except Exception:
            _logger.debug(
                "get_job_status.registry_checkpoint_failed",
                job_id=job_id,
                exc_info=True,
            )

    # 2b. Detect stale RUNNING status (no live state + no running task).
    #     This happens when meta was restored from the registry after a
    #     daemon restart but the job's process no longer exists.
    if meta is not None and meta.status == DaemonJobStatus.RUNNING:
        task = self._jobs.get(job_id)
        if task is None or task.done():
            _logger.info(
                "get_job_status.stale_running_corrected",
                job_id=job_id,
            )
            await self._set_job_status(
                job_id, DaemonJobStatus.FAILED,
            )
            # Now fall through to return the corrected checkpoint
            # or metadata below.
            try:
                checkpoint_json = await self._registry.load_checkpoint(
                    job_id,
                )
                if checkpoint_json is not None:
                    import json as _json
                    data = _json.loads(checkpoint_json)
                    # Override the checkpoint's stale status
                    data["status"] = "failed"
                    return data
            except Exception:
                _logger.debug(
                    "get_job_status.checkpoint_load_after_fail",
                    job_id=job_id,
                    exc_info=True,
                )

    # 3. Basic metadata (job never produced a checkpoint, or active job
    #    whose registry checkpoint is stale)
    if meta is not None:
        return meta.to_dict()
    assert record is not None  # guaranteed by the check above
    return record.to_dict()
pause_job async
pause_job(job_id)

Send pause signal to a running job via in-process event.

Prefers the in-process _pause_events dict (set during _run_managed_task). Falls back to JobService.pause_job when no event exists (shouldn't happen in daemon mode, but guards against edge cases).

Source code in src/marianne/daemon/manager.py
async def pause_job(self, job_id: str) -> bool:
    """Send pause signal to a running job via in-process event.

    Prefers the in-process ``_pause_events`` dict (set during
    ``_run_managed_task``).  Falls back to ``JobService.pause_job``
    when no event exists (shouldn't happen in daemon mode, but
    guards against edge cases).
    """
    meta = self._job_meta.get(job_id)
    if meta is None:
        raise JobSubmissionError(f"Job '{job_id}' not found")
    if meta.status != DaemonJobStatus.RUNNING:
        raise JobSubmissionError(
            f"Job '{job_id}' is {meta.status.value}, not running"
        )

    # Verify there's an actual running task (guards against stale
    # "running" status restored from registry after daemon restart)
    task = self._jobs.get(job_id)
    if task is None or task.done():
        await self._set_job_status(job_id, DaemonJobStatus.FAILED)
        raise JobSubmissionError(
            f"Job '{job_id}' has no running process "
            f"(stale status after daemon restart)"
        )

    # Baton path: inject PauseJob event directly into the baton.
    # The baton sets job.paused=True immediately, which stops dispatch
    # for this job regardless of whether sheets are active.
    if self._baton_adapter is not None:
        from marianne.daemon.baton.events import PauseJob
        await self._baton_adapter._baton.inbox.put(
            PauseJob(job_id=job_id)
        )
        _logger.info("job.baton_pause_sent", job_id=job_id)
        await self._set_job_status(job_id, DaemonJobStatus.PAUSED)
        return True

    # Prefer in-process event (no filesystem access needed)
    event = self._pause_events.get(job_id)
    if event is not None:
        event.set()
        _logger.info("job.pause_event_set", job_id=job_id)
        return True

    # Fallback: filesystem-based pause via JobService
    return await self._checked_service.pause_job(meta.job_id, meta.workspace)
resume_job async
resume_job(job_id, workspace=None, config_path=None, no_reload=False)

Resume a paused or failed job by creating a new task.

If an old task for this job is still running (e.g., not yet fully paused), it is cancelled before the new resume task is created to prevent detached/duplicate execution.

Parameters:

Name Type Description Default
job_id str

ID of the job to resume.

required
workspace Path | None

Optional workspace override.

None
config_path Path | None

Optional new config file path. When provided, updates meta.config_path so the resume task loads the new config.

None
no_reload bool

If True, skip auto-reload from disk and use cached config snapshot. Threaded from CLI --no-reload flag.

False
Source code in src/marianne/daemon/manager.py
async def resume_job(
    self,
    job_id: str,
    workspace: Path | None = None,
    config_path: Path | None = None,
    no_reload: bool = False,
) -> JobResponse:
    """Resume a paused or failed job by creating a new task.

    If an old task for this job is still running (e.g., not yet fully
    paused), it is cancelled before the new resume task is created to
    prevent detached/duplicate execution.

    Args:
        job_id: ID of the job to resume.
        workspace: Optional workspace override.
        config_path: Optional new config file path. When provided, updates
            meta.config_path so the resume task loads the new config.
        no_reload: If True, skip auto-reload from disk and use cached
            config snapshot. Threaded from CLI ``--no-reload`` flag.
    """
    meta = self._job_meta.get(job_id)
    if meta is None:
        raise JobSubmissionError(f"Score '{job_id}' not found")
    _resumable = (
        DaemonJobStatus.PAUSED,
        DaemonJobStatus.PAUSED_AT_CHAIN,
        DaemonJobStatus.FAILED,
        DaemonJobStatus.CANCELLED,
    )
    if meta.status not in _resumable:
        raise JobSubmissionError(
            f"Score '{job_id}' is {meta.status.value}, "
            "only PAUSED, PAUSED_AT_CHAIN, FAILED, or CANCELLED scores can be resumed"
        )

    # PAUSED_AT_CHAIN: trigger the held chain instead of normal resume
    if meta.status == DaemonJobStatus.PAUSED_AT_CHAIN and meta.held_chain_hook:
        return await self._resume_held_chain(job_id, meta)

    # Cancel stale task to prevent detached execution
    old_task = self._jobs.pop(job_id, None)
    if old_task is not None and not old_task.done():
        old_task.cancel(msg=f"stale task replaced by resume of {job_id}")
        _logger.info("job.resume_cancelled_stale_task", job_id=job_id)

    # Apply new config path before creating the task (task reads meta.config_path)
    if config_path is not None:
        meta.config_path = config_path

    ws = workspace or meta.workspace
    await self._set_job_status(job_id, DaemonJobStatus.QUEUED)

    task = asyncio.create_task(
        self._resume_job_task(job_id, ws, no_reload=no_reload),
        name=f"job-resume-{job_id}",
    )
    self._jobs[job_id] = task
    task.add_done_callback(lambda t: self._on_task_done(job_id, t))

    return JobResponse(
        job_id=job_id,
        status="accepted",
        message="Job resume queued",
    )
modify_job async
modify_job(job_id, config_path, workspace=None)

Pause a running job and queue automatic resume with new config.

If the job is already paused/failed/cancelled, resume immediately. If running, send pause signal and store pending_modify — _on_task_done will trigger the resume when the task completes (pauses).

Source code in src/marianne/daemon/manager.py
async def modify_job(
    self, job_id: str, config_path: Path, workspace: Path | None = None,
) -> JobResponse:
    """Pause a running job and queue automatic resume with new config.

    If the job is already paused/failed/cancelled, resume immediately.
    If running, send pause signal and store pending_modify — _on_task_done
    will trigger the resume when the task completes (pauses).
    """
    meta = self._job_meta.get(job_id)
    if meta is None:
        raise JobSubmissionError(f"Job '{job_id}' not found")

    ws = workspace or meta.workspace

    # Already resumable — resume immediately with new config
    _resumable = (
        DaemonJobStatus.PAUSED,
        DaemonJobStatus.FAILED,
        DaemonJobStatus.CANCELLED,
    )
    if meta.status in _resumable:
        meta.config_path = config_path
        return await self.resume_job(job_id, ws)

    if meta.status != DaemonJobStatus.RUNNING:
        raise JobSubmissionError(
            f"Job '{job_id}' is {meta.status.value}, cannot modify"
        )

    # Send pause signal
    await self.pause_job(job_id)

    # Store pending action — _on_task_done will resume when the job pauses
    meta.pending_modify = (config_path, ws)

    # Baton path: the old task sits on wait_for_completion() and won't
    # exit just because the baton paused dispatch. Cancel the task so
    # _on_task_done fires and triggers the deferred resume.
    if self._baton_adapter is not None:
        old_task = self._jobs.get(job_id)
        if old_task is not None and not old_task.done():
            old_task.cancel(msg=f"modify: paused for config reload on {job_id}")
            _logger.info("job.modify_cancelled_baton_task", job_id=job_id)

    return JobResponse(
        job_id=job_id,
        status="accepted",
        message=f"Pause signal sent. Will resume with {config_path.name} when paused.",
    )
cancel_job async
cancel_job(job_id)

Cancel a running or pending job.

For running jobs: sends the cancel signal and updates in-memory status immediately, then defers heavyweight I/O to a background task.

For pending jobs (queued during rate limit backpressure): removes from the pending queue and updates the registry.

Source code in src/marianne/daemon/manager.py
async def cancel_job(self, job_id: str) -> bool:
    """Cancel a running or pending job.

    For running jobs: sends the cancel signal and updates in-memory
    status immediately, then defers heavyweight I/O to a background task.

    For pending jobs (queued during rate limit backpressure): removes
    from the pending queue and updates the registry.
    """
    # Check pending jobs first (not yet started)
    if job_id in self._pending_jobs:
        del self._pending_jobs[job_id]
        await self._set_job_status(job_id, DaemonJobStatus.CANCELLED)
        # Defer scheduler cleanup
        cleanup = asyncio.create_task(
            self._cancel_cleanup(job_id),
            name=f"cancel-cleanup-{job_id}",
        )
        cleanup.add_done_callback(
            lambda t: log_task_exception(t, _logger, "cancel_cleanup.failed"),
        )
        _logger.info("job.pending_cancelled", job_id=job_id)
        return True

    task = self._jobs.get(job_id)
    if task is None:
        return False

    task.cancel(msg=f"explicit cancel_job({job_id}) via IPC")
    await self._set_job_status(job_id, DaemonJobStatus.CANCELLED)

    # Defer scheduler cleanup so the IPC handler can
    # respond immediately.  In-memory meta is already authoritative.
    cleanup = asyncio.create_task(
        self._cancel_cleanup(job_id),
        name=f"cancel-cleanup-{job_id}",
    )
    cleanup.add_done_callback(
        lambda t: log_task_exception(t, _logger, "cancel_cleanup.failed"),
    )

    _logger.info("job.cancelled", job_id=job_id)
    return True
list_jobs async
list_jobs()

List all jobs with live progress data.

In-memory _job_meta is authoritative for active jobs. Live state from _live_states enriches active entries with sheet-level progress so mzt list shows completion counts. The registry fills in historical jobs.

Source code in src/marianne/daemon/manager.py
async def list_jobs(self) -> list[dict[str, Any]]:
    """List all jobs with live progress data.

    In-memory ``_job_meta`` is authoritative for active jobs.
    Live state from ``_live_states`` enriches active entries with
    sheet-level progress so ``mzt list`` shows completion counts.
    The registry fills in historical jobs.
    """
    seen: set[str] = set()
    result: list[dict[str, Any]] = []

    # Active jobs first — enrich with live progress
    for meta in self._job_meta.values():
        entry = meta.to_dict()
        live = self._live_states.get(meta.job_id)
        if live is not None:
            completed = sum(
                1 for s in live.sheets.values()
                if s.status.value == "completed"
            )
            entry["progress_completed"] = completed
            entry["progress_total"] = len(live.sheets)
        result.append(entry)
        seen.add(meta.job_id)

    # Historical jobs from registry
    for record in await self._registry.list_jobs():
        if record.job_id not in seen:
            result.append(record.to_dict())

    return result
clear_jobs async
clear_jobs(statuses=None, older_than_seconds=None, job_ids=None)

Clear terminal jobs from registry and in-memory metadata.

Parameters:

Name Type Description Default
statuses list[str] | None

Status filter (defaults to terminal statuses).

None
older_than_seconds float | None

Age filter in seconds.

None
job_ids list[str] | None

Only clear these specific job IDs.

None

Returns:

Type Description
dict[str, Any]

Dict with "deleted" count.

Source code in src/marianne/daemon/manager.py
async def clear_jobs(
    self,
    statuses: list[str] | None = None,
    older_than_seconds: float | None = None,
    job_ids: list[str] | None = None,
) -> dict[str, Any]:
    """Clear terminal jobs from registry and in-memory metadata.

    Args:
        statuses: Status filter (defaults to terminal statuses).
        older_than_seconds: Age filter in seconds.
        job_ids: Only clear these specific job IDs.

    Returns:
        Dict with "deleted" count.
    """
    safe_statuses = set(statuses or ["completed", "failed", "cancelled"])
    safe_statuses -= {"queued", "running", "pending"}  # Never clear active jobs

    to_remove: list[str] = []
    now = time.time()
    for jid, meta in self._job_meta.items():
        if job_ids is not None and jid not in job_ids:
            continue
        if meta.status.value not in safe_statuses:
            continue
        if older_than_seconds is not None:
            if (now - meta.submitted_at) < older_than_seconds:
                continue
        to_remove.append(jid)

    for jid in to_remove:
        self._job_meta.pop(jid, None)
        self._live_states.pop(jid, None)

    deleted = await self._registry.delete_jobs(
        job_ids=job_ids,
        statuses=list(safe_statuses),
        older_than_seconds=older_than_seconds,
    )

    _logger.info(
        "manager.clear_jobs",
        in_memory_removed=len(to_remove),
        registry_deleted=deleted,
    )
    return {"deleted": deleted}
clear_rate_limits async
clear_rate_limits(instrument=None)

Clear active rate limits from the coordinator and baton.

Removes the active rate limit so new sheets can be dispatched immediately. Clears both the RateLimitCoordinator (used by the legacy runner and scheduler) and the baton's per-instrument InstrumentState (used by the baton dispatch loop).

Parameters:

Name Type Description Default
instrument str | None

Instrument name to clear, or None for all.

None

Returns:

Type Description
dict[str, Any]

Dict with cleared count and instrument filter.

Source code in src/marianne/daemon/manager.py
async def clear_rate_limits(
    self,
    instrument: str | None = None,
) -> dict[str, Any]:
    """Clear active rate limits from the coordinator and baton.

    Removes the active rate limit so new sheets can be dispatched
    immediately.  Clears both the ``RateLimitCoordinator`` (used by
    the legacy runner and scheduler) and the baton's per-instrument
    ``InstrumentState`` (used by the baton dispatch loop).

    Args:
        instrument: Instrument name to clear, or ``None`` for all.

    Returns:
        Dict with ``cleared`` count and ``instrument`` filter.
    """
    cleared = await self.rate_coordinator.clear_limits(
        instrument=instrument,
    )
    baton_cleared = 0
    if self._baton_adapter is not None:
        baton_cleared = self._baton_adapter.clear_instrument_rate_limit(
            instrument,
        )
        # Kick the baton event loop so dispatch_ready() runs for the
        # newly-PENDING sheets. Without this, the loop blocks on
        # inbox.get() and cleared sheets sit idle until an unrelated
        # event arrives.
        from marianne.daemon.baton.events import DispatchRetry

        self._baton_adapter._baton.inbox.put_nowait(DispatchRetry())
    _logger.info(
        "manager.clear_rate_limits",
        instrument=instrument,
        coordinator_cleared=cleared,
        baton_cleared=baton_cleared,
    )
    # Start any pending jobs now that limits are cleared
    await self._start_pending_jobs()
    return {
        "cleared": cleared + baton_cleared,
        "instrument": instrument,
    }
get_job_errors async
get_job_errors(job_id, workspace=None)

Get errors for a specific job.

Returns the CheckpointState for the CLI to extract error information. Uses the same resolution as get_job_status: live state first, then registry, never workspace files.

Source code in src/marianne/daemon/manager.py
async def get_job_errors(self, job_id: str, workspace: Path | None = None) -> dict[str, Any]:
    """Get errors for a specific job.

    Returns the CheckpointState for the CLI to extract error information.
    Uses the same resolution as get_job_status: live state first, then
    registry, never workspace files.
    """
    _ = workspace  # Conductor DB is the sole source of truth
    state_dict = await self.get_job_status(job_id)
    return {"state": state_dict}
get_diagnostic_report async
get_diagnostic_report(job_id, workspace=None)

Get diagnostic data for a specific job.

Returns the CheckpointState plus workspace path. Uses the same resolution as get_job_status: live state first, then registry.

Source code in src/marianne/daemon/manager.py
async def get_diagnostic_report(
    self, job_id: str, workspace: Path | None = None,
) -> dict[str, Any]:
    """Get diagnostic data for a specific job.

    Returns the CheckpointState plus workspace path. Uses the same
    resolution as get_job_status: live state first, then registry.
    """
    ws = await self._resolve_job_workspace(job_id, workspace)
    state_dict = await self.get_job_status(job_id)
    return {
        "state": state_dict,
        "workspace": str(ws),
    }
get_execution_history async
get_execution_history(job_id, workspace=None, sheet_num=None, limit=50)

Get execution history for a specific job.

Requires the SQLite state backend for history records.

Source code in src/marianne/daemon/manager.py
async def get_execution_history(
    self, job_id: str, workspace: Path | None = None,
    sheet_num: int | None = None, limit: int = 50,
) -> dict[str, Any]:
    """Get execution history for a specific job.

    Requires the SQLite state backend for history records.
    """
    ws = await self._resolve_job_workspace(job_id, workspace)

    from marianne.state import SQLiteStateBackend

    sqlite_path = ws / ".marianne-state.db"
    records: list[dict[str, Any]] = []
    has_history = False

    if sqlite_path.exists():
        backend = SQLiteStateBackend(sqlite_path)
        try:
            if hasattr(backend, 'get_execution_history'):
                records = await backend.get_execution_history(
                    job_id=job_id, sheet_num=sheet_num, limit=limit,
                )
                has_history = True
        finally:
            await backend.close()

    return {
        "job_id": job_id,
        "records": records,
        "has_history": has_history,
    }
recover_job async
recover_job(job_id, workspace=None, sheet_num=None, dry_run=False)

Get state for recover operation.

Returns the job state and workspace for the CLI to run validations locally. The actual validation logic stays in the CLI command to avoid duplicating ValidationEngine setup in the daemon.

Source code in src/marianne/daemon/manager.py
async def recover_job(
    self, job_id: str, workspace: Path | None = None,
    sheet_num: int | None = None, dry_run: bool = False,
) -> dict[str, Any]:
    """Get state for recover operation.

    Returns the job state and workspace for the CLI to run
    validations locally. The actual validation logic stays
    in the CLI command to avoid duplicating ValidationEngine
    setup in the daemon.
    """
    ws = await self._resolve_job_workspace(job_id, workspace)
    state = await self._checked_service.get_status(job_id, ws)
    if state is None:
        raise JobSubmissionError(f"No state found for job '{job_id}'")

    return {
        "state": state.model_dump(mode="json"),
        "workspace": str(ws),
        "dry_run": dry_run,
        "sheet_num": sheet_num,
    }
get_daemon_status async
get_daemon_status()

Build daemon status summary.

Returns all fields required by the DaemonStatus Pydantic model so DaemonClient.status() can deserialize without crashing.

Source code in src/marianne/daemon/manager.py
async def get_daemon_status(self) -> dict[str, Any]:
    """Build daemon status summary.

    Returns all fields required by the ``DaemonStatus`` Pydantic model
    so ``DaemonClient.status()`` can deserialize without crashing.
    """
    mem = self._monitor.current_memory_mb()
    return {
        "pid": os.getpid(),
        "uptime_seconds": round(time.monotonic() - self._start_time, 1),
        "running_jobs": self.running_count,
        "total_jobs_active": self.active_job_count,
        "memory_usage_mb": round(mem, 1) if mem is not None else 0.0,
        "version": getattr(marianne, "__version__", "0.1.0"),
    }
shutdown async
shutdown(graceful=True)

Cancel all running jobs, optionally waiting for sheets.

Deregisters all active jobs from the global sheet scheduler to clean up any pending sheets, running-sheet tracking, and dependency data before the daemon exits.

Source code in src/marianne/daemon/manager.py
async def shutdown(self, graceful: bool = True) -> None:
    """Cancel all running jobs, optionally waiting for sheets.

    Deregisters all active jobs from the global sheet scheduler
    to clean up any pending sheets, running-sheet tracking, and
    dependency data before the daemon exits.
    """
    self._shutting_down = True

    if graceful:
        timeout = self._config.shutdown_timeout_seconds
        _logger.info(
            "manager.shutting_down",
            graceful=True,
            timeout=timeout,
            running_jobs=self.running_count,
        )

        # Wait for running tasks to complete (up to timeout)
        running = [t for t in self._jobs.values() if not t.done()]
        if running:
            _, pending = await asyncio.wait(
                running, timeout=timeout,
            )
            for task in pending:
                task.cancel(msg="graceful shutdown timeout exceeded")
            if pending:
                results = await asyncio.gather(*pending, return_exceptions=True)
                for result in results:
                    if isinstance(result, BaseException):
                        _logger.warning(
                            "manager.shutdown_task_exception",
                            error=str(result),
                            error_type=type(result).__name__,
                        )
    else:
        _logger.info("manager.shutting_down", graceful=False)
        for task in self._jobs.values():
            if not task.done():
                task.cancel(msg="non-graceful shutdown")
        if self._jobs:
            results = await asyncio.gather(
                *self._jobs.values(), return_exceptions=True,
            )
            for result in results:
                if isinstance(result, BaseException):
                    _logger.warning(
                        "manager.shutdown_task_exception",
                        error=str(result),
                        error_type=type(result).__name__,
                    )

    # Deregister all known jobs from the scheduler to clean up
    # heap entries, running-sheet tracking, and dependency data.
    # Uses _job_meta (not _jobs) because task done-callbacks may
    # have already cleared entries from _jobs during cancellation.
    # Guard: only touch the scheduler if it was ever initialized.
    if self._scheduler_instance is not None:
        for job_id in list(self._job_meta.keys()):
            await self._scheduler_instance.deregister_job(job_id)

    self._jobs.clear()

    # Stop the baton adapter: send ShutdownRequested, wait for the
    # event loop to exit, cancel if it doesn't, close backend pool.
    if self._baton_adapter is not None:
        try:
            from marianne.daemon.baton.events import ShutdownRequested
            self._baton_adapter._baton.inbox.put_nowait(
                ShutdownRequested(graceful=graceful)
            )
            # Wait for the baton loop to exit (bounded by 5s)
            if self._baton_loop_task is not None and not self._baton_loop_task.done():
                try:
                    await asyncio.wait_for(self._baton_loop_task, timeout=5.0)
                except (TimeoutError, asyncio.CancelledError):
                    self._baton_loop_task.cancel()
                    try:
                        await self._baton_loop_task
                    except (asyncio.CancelledError, Exception):
                        pass
            # Cancel any remaining musician tasks
            await self._baton_adapter.shutdown()
            _logger.info("manager.baton_adapter_stopped")
        except Exception:
            _logger.warning(
                "manager.baton_adapter_stop_failed", exc_info=True,
            )

    # Stop all observers for any remaining jobs
    for jid in list(self._job_meta.keys()):
        await self._stop_observer(jid)

    # Stop observer recorder before event bus (needs bus for unsubscribe).
    if self._observer_recorder is not None:
        try:
            await self._observer_recorder.stop(self._event_bus)
        except (OSError, RuntimeError):
            _logger.warning(
                "manager.observer_recorder_stop_failed",
                exc_info=True,
            )

    # Stop semantic analyzer before event bus (needs bus for unsubscribe,
    # and learning hub for final writes during drain).
    if self._semantic_analyzer is not None:
        try:
            await self._semantic_analyzer.stop(self._event_bus)
        except asyncio.CancelledError:
            raise
        except (OSError, RuntimeError):
            _logger.warning(
                "manager.semantic_analyzer_stop_failed",
                exc_info=True,
            )

    # Shutdown event bus
    await self._event_bus.shutdown()

    # Stop centralized learning hub (final persist + cleanup)
    await self._learning_hub.stop()

    # Final checkpoint flush: persist ALL live states to registry before
    # closing. Fire-and-forget saves from _on_baton_state_sync may be
    # pending — this synchronous flush ensures no progress is lost.
    flushed = 0
    for jid, live in self._live_states.items():
        try:
            checkpoint_json = live.model_dump_json()
            await self._registry.save_checkpoint(jid, checkpoint_json)
            await self._registry.update_status(
                jid, live.status.value if hasattr(live.status, 'value') else str(live.status),
            )
            flushed += 1
        except Exception:
            _logger.warning(
                "manager.shutdown_flush_failed",
                job_id=jid, exc_info=True,
            )
    if flushed:
        _logger.info("manager.shutdown_checkpoint_flush", flushed=flushed)

    await self._registry.close()
    self._shutdown_event.set()
    _logger.info("manager.shutdown_complete")
wait_for_shutdown async
wait_for_shutdown()

Block until shutdown is complete.

Source code in src/marianne/daemon/manager.py
async def wait_for_shutdown(self) -> None:
    """Block until shutdown is complete."""
    await self._shutdown_event.wait()

JobMeta dataclass

JobMeta(job_id, config_path, workspace, submitted_at=time(), started_at=None, status=QUEUED, error_message=None, error_traceback=None, chain_depth=None, hook_config=None, concert_config=None, completed_new_work=False, observer=None, pending_modify=None, held_chain_hook=None)

Metadata tracked per job in the manager.

Functions
to_dict
to_dict()

Serialize to a dict suitable for JSON-RPC responses.

Source code in src/marianne/daemon/manager.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dict suitable for JSON-RPC responses."""
    result: dict[str, Any] = {
        "job_id": self.job_id,
        "status": self.status,
        "config_path": str(self.config_path),
        "workspace": str(self.workspace),
        "submitted_at": self.submitted_at,
        "started_at": self.started_at,
    }
    if self.error_message:
        result["error_message"] = self.error_message
    if self.error_traceback:
        result["error_traceback"] = self.error_traceback
    if self.chain_depth is not None:
        result["chain_depth"] = self.chain_depth
    return result

ResourceMonitor

ResourceMonitor(config, manager=None, pgroup=None)

Periodic resource monitoring for the daemon.

Checks memory usage, child process count, and zombie processes on a configurable interval. Emits structured log warnings when approaching limits and can trigger hard actions (job cancellation) when hard limits are exceeded.

Source code in src/marianne/daemon/monitor.py
def __init__(
    self,
    config: ResourceLimitConfig,
    manager: JobManager | None = None,
    pgroup: ProcessGroupManager | None = None,
) -> None:
    self._config = config
    self._manager = manager
    self._pgroup = pgroup
    self._task: asyncio.Task[None] | None = None
    self._running = False
    self._consecutive_failures = 0
    self._degraded = False
    self._prune_consecutive_failures = 0
    self._prune_disabled = False
    self._last_successful_check: float = 0.0
Attributes
seconds_since_last_check property
seconds_since_last_check

Seconds since the last successful monitoring check.

Returns float('inf') if no check has succeeded yet. Consumers can use this to detect stale resource data.

max_memory_mb property
max_memory_mb

Configured maximum memory in MB.

is_degraded property
is_degraded

Whether the monitor has entered degraded mode due to repeated failures.

Functions
start async
start(interval_seconds=15.0)

Start the periodic monitoring loop.

Source code in src/marianne/daemon/monitor.py
async def start(self, interval_seconds: float = 15.0) -> None:
    """Start the periodic monitoring loop."""
    if self._task is not None:
        return
    self._running = True
    self._task = asyncio.create_task(self._loop(interval_seconds))
    self._task.add_done_callback(self._on_loop_done)
    _logger.info("monitor.started", interval=interval_seconds)
stop async
stop()

Stop the monitoring loop.

Source code in src/marianne/daemon/monitor.py
async def stop(self) -> None:
    """Stop the monitoring loop."""
    self._running = False
    if self._task is not None:
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass
        self._task = None
    _logger.info("monitor.stopped")
check_now async
check_now()

Run an immediate resource check and return snapshot.

Source code in src/marianne/daemon/monitor.py
async def check_now(self) -> ResourceSnapshot:
    """Run an immediate resource check and return snapshot."""
    running_jobs = 0
    active_sheets = 0
    if self._manager is not None:
        running_jobs = self._manager.running_count
        active_sheets = self._manager.active_job_count

    mem = self._get_memory_usage_mb()
    procs = self._get_child_process_count()
    zombie_pids = self._check_for_zombies()
    probe_failed = mem is None or procs is None

    snapshot = ResourceSnapshot(
        timestamp=time.monotonic(),
        memory_usage_mb=mem if mem is not None else 0.0,
        child_process_count=procs if procs is not None else 0,
        running_jobs=running_jobs,
        active_sheets=active_sheets,
        zombie_pids=zombie_pids,
        probe_failed=probe_failed,
    )
    return snapshot
is_accepting_work
is_accepting_work()

Check if resource usage is below warning thresholds.

Returns True when both memory and process counts are below WARN_THRESHOLD percent of their configured limits. Used by HealthChecker.readiness() to signal backpressure.

Fail-closed: returns False when probes fail or monitor is degraded.

Source code in src/marianne/daemon/monitor.py
def is_accepting_work(self) -> bool:
    """Check if resource usage is below warning thresholds.

    Returns True when both memory and process counts are below
    ``WARN_THRESHOLD`` percent of their configured limits.  Used by
    ``HealthChecker.readiness()`` to signal backpressure.

    Fail-closed: returns False when probes fail or monitor is degraded.
    """
    if self._degraded:
        return False
    mem = self._get_memory_usage_mb()
    procs = self._get_child_process_count()
    if mem is None or procs is None:
        return False
    mem_pct = _compute_percent(mem, self._config.max_memory_mb)
    proc_pct = _compute_percent(procs, self._config.max_processes)
    return mem_pct < self.WARN_THRESHOLD and proc_pct < self.WARN_THRESHOLD
current_memory_mb
current_memory_mb()

Current RSS memory in MB, or None if probes fail.

Source code in src/marianne/daemon/monitor.py
def current_memory_mb(self) -> float | None:
    """Current RSS memory in MB, or None if probes fail."""
    return self._get_memory_usage_mb()
update_limits
update_limits(new_limits)

Hot-apply new resource limits from a SIGHUP config reload.

Replaces the internal _config reference. Safe because the monitor only reads _config during periodic checks, and asyncio's single-threaded event loop prevents concurrent access.

Source code in src/marianne/daemon/monitor.py
def update_limits(self, new_limits: ResourceLimitConfig) -> None:
    """Hot-apply new resource limits from a SIGHUP config reload.

    Replaces the internal ``_config`` reference.  Safe because the
    monitor only reads ``_config`` during periodic checks, and
    asyncio's single-threaded event loop prevents concurrent access.
    """
    old = self._config
    if (
        old.max_memory_mb != new_limits.max_memory_mb
        or old.max_processes != new_limits.max_processes
    ):
        _logger.info(
            "monitor.limits_updated",
            old_memory_mb=old.max_memory_mb,
            new_memory_mb=new_limits.max_memory_mb,
            old_max_processes=old.max_processes,
            new_max_processes=new_limits.max_processes,
        )
    self._config = new_limits
set_manager
set_manager(manager)

Wire up the job manager reference after construction.

Called by DaemonProcess after both the monitor and manager are created, avoiding the circular dependency of needing both at init.

Source code in src/marianne/daemon/monitor.py
def set_manager(self, manager: JobManager) -> None:
    """Wire up the job manager reference after construction.

    Called by DaemonProcess after both the monitor and manager are
    created, avoiding the circular dependency of needing both at init.
    """
    self._manager = manager

ResourceSnapshot dataclass

ResourceSnapshot(timestamp, memory_usage_mb, child_process_count, running_jobs, active_sheets, zombie_pids=list(), probe_failed=False)

Point-in-time resource usage reading.

ConsoleOutput

ConsoleOutput(console=None)

Bases: OutputProtocol

Rich Console wrapper for CLI backwards compatibility.

Bridges the OutputProtocol to Rich Console, so the existing CLI can adopt the protocol without changing its visual output.

Source code in src/marianne/daemon/output.py
def __init__(self, console: Any | None = None) -> None:
    if console is None:
        from rich.console import Console

        console = Console()
    self._console = console

NullOutput

Bases: OutputProtocol

No-op output for testing.

OutputProtocol

Bases: ABC

Abstract output for job execution feedback.

Replaces tight coupling to Rich Console. Implementations: - NullOutput: no-op for tests - StructuredOutput: structlog for daemon - ConsoleOutput: wraps Rich for CLI backwards compat

StructuredOutput

StructuredOutput(*, event_bus=None)

Bases: OutputProtocol

Structured logging output for daemon mode.

Routes all output through structlog, producing structured JSON events that daemon consumers (SSE, gRPC, log aggregators) can parse.

Source code in src/marianne/daemon/output.py
def __init__(self, *, event_bus: EventBus | None = None) -> None:
    from marianne.core.logging import get_logger

    self._logger = get_logger("daemon.output")
    self._event_bus = event_bus

ProcessGroupManager

ProcessGroupManager()

Manages the daemon's process group to prevent orphans.

The daemon calls setup() early in its lifecycle to become the process group leader. During shutdown, kill_all_children() sends SIGTERM to the entire group, ensuring no child process (including deeply nested MCP servers) survives the daemon.

An atexit handler provides last-resort cleanup even if the normal shutdown path is skipped.

Source code in src/marianne/daemon/pgroup.py
def __init__(self) -> None:
    self._original_pgid: int = os.getpgrp()
    self._is_leader: bool = False
    self._atexit_registered: bool = False
    # PIDs of processes spawned by backends (claude, etc.).
    # Used to identify orphaned children: if a tracked PID is dead
    # but its children survive, those children are orphans.
    self._tracked_backend_pids: set[int] = set()
Attributes
is_leader property
is_leader

Whether the daemon is the process group leader.

original_pgid property
original_pgid

Process group ID before setup() was called.

Functions
track_backend_pid
track_backend_pid(pid)

Register a backend process PID for orphan tracking.

When a backend (claude, gemini-cli, etc.) spawns a process, call this with the process PID. On cleanup, any surviving children of dead tracked PIDs are killed as orphans — regardless of what they're called. This replaces cmdline pattern matching with ancestry-based detection.

Source code in src/marianne/daemon/pgroup.py
def track_backend_pid(self, pid: int) -> None:
    """Register a backend process PID for orphan tracking.

    When a backend (claude, gemini-cli, etc.) spawns a process, call
    this with the process PID. On cleanup, any surviving children of
    dead tracked PIDs are killed as orphans — regardless of what
    they're called. This replaces cmdline pattern matching with
    ancestry-based detection.
    """
    self._tracked_backend_pids.add(pid)
    _logger.debug("pgroup.track_backend", pid=pid)
untrack_backend_pid
untrack_backend_pid(pid)

Remove a backend PID from tracking after clean exit.

Source code in src/marianne/daemon/pgroup.py
def untrack_backend_pid(self, pid: int) -> None:
    """Remove a backend PID from tracking after clean exit."""
    self._tracked_backend_pids.discard(pid)
    _logger.debug("pgroup.untrack_backend", pid=pid)
setup
setup()

Create a new process group with the daemon as leader.

Must be called early in daemon startup, before spawning any child processes. Idempotent — safe to call multiple times.

Source code in src/marianne/daemon/pgroup.py
def setup(self) -> None:
    """Create a new process group with the daemon as leader.

    Must be called early in daemon startup, before spawning any
    child processes.  Idempotent — safe to call multiple times.
    """
    if self._is_leader:
        return

    try:
        os.setpgrp()
        self._is_leader = True
        _logger.info(
            "pgroup.setup_complete",
            pid=os.getpid(),
            pgid=os.getpgrp(),
            original_pgid=self._original_pgid,
        )
    except OSError as exc:
        # May fail if already the group leader (e.g. after daemonize)
        if os.getpid() == os.getpgrp():
            self._is_leader = True
            _logger.debug(
                "pgroup.already_leader",
                pid=os.getpid(),
            )
        else:
            _logger.warning(
                "pgroup.setup_failed",
                error=str(exc),
                pid=os.getpid(),
            )

    # Register last-resort atexit cleanup
    if self._is_leader and not self._atexit_registered:
        atexit.register(self._atexit_cleanup)
        self._atexit_registered = True
kill_all_children
kill_all_children(sig=SIGTERM)

Send signal to all processes in our group except ourselves.

Parameters:

Name Type Description Default
sig int

Signal number to send (default SIGTERM for graceful stop).

SIGTERM

Returns:

Type Description
int

The process group ID that was signaled, or 0 if no signal sent.

Source code in src/marianne/daemon/pgroup.py
def kill_all_children(self, sig: int = signal.SIGTERM) -> int:
    """Send signal to all processes in our group except ourselves.

    Args:
        sig: Signal number to send (default SIGTERM for graceful stop).

    Returns:
        The process group ID that was signaled, or 0 if no signal sent.
    """
    if not self._is_leader:
        _logger.debug("pgroup.not_leader_skip_kill")
        return 0

    pgid = os.getpgrp()
    my_pid = os.getpid()

    # Count children before signaling for logging
    child_count = self._count_group_members(pgid, exclude_pid=my_pid)

    if child_count == 0:
        _logger.debug("pgroup.no_children_to_signal")
        return pgid

    try:
        # Temporarily ignore the signal in our own process so killpg
        # doesn't terminate us along with the children.
        old_handler = signal.signal(sig, signal.SIG_IGN)
        try:
            sent = _safe_killpg(pgid, sig, context="pgroup.kill_all_children")
        finally:
            signal.signal(sig, old_handler)

        if sent:
            _logger.info(
                "pgroup.signaled_children",
                signal=signal.Signals(sig).name,
                pgid=pgid,
                child_count=child_count,
            )
        return pgid
    except ProcessLookupError:
        _logger.debug("pgroup.no_processes_in_group", pgid=pgid)
        return 0
    except PermissionError:
        _logger.warning("pgroup.permission_denied", pgid=pgid)
        return 0
cleanup_orphans
cleanup_orphans()

Find and clean up orphaned child processes in the daemon's tree.

Detects two categories: 1. Zombie children — reaped via waitpid 2. Orphaned MCP servers — processes whose parent has died (reparented to init/PID 1) that still match MCP patterns

Note: This only scans the daemon's own child tree. For processes that escaped the tree entirely (reparented to init), use reap_orphaned_backends() which does a system-wide scan.

Returns:

Type Description
list[int]

List of PIDs that were cleaned up.

Source code in src/marianne/daemon/pgroup.py
def cleanup_orphans(self) -> list[int]:
    """Find and clean up orphaned child processes in the daemon's tree.

    Detects two categories:
    1. Zombie children — reaped via waitpid
    2. Orphaned MCP servers — processes whose parent has died
       (reparented to init/PID 1) that still match MCP patterns

    Note: This only scans the daemon's own child tree.  For processes
    that escaped the tree entirely (reparented to init), use
    reap_orphaned_backends() which does a system-wide scan.

    Returns:
        List of PIDs that were cleaned up.
    """
    orphans: list[int] = []

    # Strategy 1: psutil-based deep scan (preferred)
    try:
        import psutil

        current = psutil.Process(os.getpid())

        for child in current.children(recursive=True):
            try:
                if not child.is_running():
                    continue

                status = child.status()

                # Reap zombies
                if status == psutil.STATUS_ZOMBIE:
                    try:
                        os.waitpid(child.pid, os.WNOHANG)
                    except ChildProcessError:
                        pass
                    orphans.append(child.pid)
                    _logger.debug(
                        "pgroup.reaped_zombie",
                        pid=child.pid,
                    )
                    continue

                # NOTE: Ancestry-based orphan killing disabled.
                # The F-481 rewrite removed cmdline filtering, making
                # this kill ANY child not parented by us when dead
                # backends exist — including Chrome, pytest, pyright,
                # and other legitimate processes in the daemon's tree.
                # The per-job PID tracking in the conductor DB will
                # replace this.  Until then, only zombie reaping above
                # is active.  Orphaned MCP servers accumulate but
                # don't crash the system.

            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue

    except ImportError:
        # Strategy 2: /proc fallback (Linux only)
        orphans.extend(self._cleanup_orphans_proc())

    if orphans:
        _logger.info(
            "pgroup.orphan_cleanup",
            cleaned_count=len(orphans),
            pids=orphans,
        )

    return orphans
reap_orphaned_backends
reap_orphaned_backends()

System-wide scan for orphaned backend child processes.

.. warning:: DISABLED — This method is a no-op.

The F-481 rewrite removed cmdline pattern filtering and replaced it with ancestry-only detection (ppid in {0, 1}). Without filtering, this kills EVERY user-owned process parented by init/systemd — including the user's systemd session manager, terminal emulators, and dbus. On WSL2, killing systemd --user cascades into systemd-poweroff.service and shuts down the entire VM (observed 9 times, exit code 9, all terminals dead).

The replacement is per-job PID tracking in the conductor DB (see composer-notes.yaml "PROCESS CLEANUP SIMPLIFICATION"). Until that's implemented, orphaned MCP/LSP servers from dead backends accumulate but don't crash the system.

Returns:

Type Description
list[int]

Empty list (no-op).

Source code in src/marianne/daemon/pgroup.py
def reap_orphaned_backends(self) -> list[int]:
    """System-wide scan for orphaned backend child processes.

    .. warning:: DISABLED — This method is a no-op.

       The F-481 rewrite removed cmdline pattern filtering and replaced
       it with ancestry-only detection (ppid in {0, 1}).  Without
       filtering, this kills EVERY user-owned process parented by
       init/systemd — including the user's systemd session manager,
       terminal emulators, and dbus.  On WSL2, killing ``systemd
       --user`` cascades into ``systemd-poweroff.service`` and shuts
       down the entire VM (observed 9 times, exit code 9, all
       terminals dead).

       The replacement is per-job PID tracking in the conductor DB
       (see composer-notes.yaml "PROCESS CLEANUP SIMPLIFICATION").
       Until that's implemented, orphaned MCP/LSP servers from dead
       backends accumulate but don't crash the system.

    Returns:
        Empty list (no-op).
    """
    # Drain dead PIDs from the tracking set so it doesn't grow
    # unboundedly — but do NOT act on them.
    if self._tracked_backend_pids:
        dead: set[int] = set()
        for bpid in self._tracked_backend_pids:
            try:
                os.kill(bpid, 0)
            except OSError:
                dead.add(bpid)
        if dead:
            self._tracked_backend_pids -= dead
            _logger.debug(
                "pgroup.drained_dead_backend_pids",
                count=len(dead),
                pids=list(dead),
            )
    return []

DaemonProcess

DaemonProcess(config)

Long-running Marianne daemon process.

Composes DaemonServer (IPC), JobManager (job tracking), and ResourceMonitor (limits) into a single lifecycle.

Source code in src/marianne/daemon/process.py
def __init__(self, config: DaemonConfig) -> None:
    self._config = config
    self._signal_received = asyncio.Event()
    self._pgroup = ProcessGroupManager()
    self._start_time = time.monotonic()
    self._signal_tasks: list[asyncio.Task[Any]] = []
    self._profiler: Any = None  # Set in run() step 8.5
    self._correlation: Any = None  # Set in run() step 8.6
Functions
run async
run()

Main daemon lifecycle: boot, serve, shutdown.

Source code in src/marianne/daemon/process.py
async def run(self) -> None:
    """Main daemon lifecycle: boot, serve, shutdown."""
    # 1. Write PID file
    _write_pid(self._config.pid_file)

    try:
        # 2. Set up process group (fixes issue #38 — orphan prevention)
        self._pgroup.setup()

        # 2.5. Reap orphaned backend children from previous runs.
        # Claude CLI spawns LSP/MCP servers that outlive it; if a
        # previous daemon or CLI session crashed, these accumulate.
        startup_reaped = self._pgroup.reap_orphaned_backends()
        if startup_reaped:
            _logger.info(
                "daemon.startup_orphan_reap",
                count=len(startup_reaped),
                pids=startup_reaped,
            )

        # 3. Create components — single ResourceMonitor shared
        #    between periodic monitoring and backpressure checks.
        from marianne.daemon.ipc.handler import RequestHandler
        from marianne.daemon.ipc.server import DaemonServer
        from marianne.daemon.manager import JobManager
        from marianne.daemon.monitor import ResourceMonitor

        # Create monitor first (without manager ref — set after).
        self._monitor = ResourceMonitor(
            self._config.resource_limits, pgroup=self._pgroup,
        )
        # Pass the single monitor into JobManager for backpressure.
        self._manager = JobManager(
            self._config,
            start_time=self._start_time,
            monitor=self._monitor,
            pgroup=self._pgroup,
        )
        # Now wire the manager back into the monitor for job counts.
        self._monitor.set_manager(self._manager)
        await self._manager.start()

        # Warn about unenforced / reserved config fields.
        # Each entry: (field, current_value, default, event, message)
        _unenforced_fields: list[tuple[str, object, object, str, str]] = [
            (
                "max_api_calls_per_minute",
                self._config.resource_limits.max_api_calls_per_minute, 60,
                "config.unenforced_rate_limit",
                "max_api_calls_per_minute is set but NOT YET ENFORCED. "
                "Rate limiting currently works through externally-reported "
                "events via RateLimitCoordinator.",
            ),
            (
                "state_backend_type",
                self._config.state_backend_type, "sqlite",
                "config.reserved_field_ignored",
                "state_backend_type is reserved for future use and has no "
                "effect. Daemon state persistence is not yet implemented.",
            ),
            (
                "state_db_path",
                str(self._config.state_db_path), "~/.marianne/daemon-state.db",
                "config.reserved_field_ignored",
                "state_db_path is reserved for future use and has no "
                "effect. Daemon state persistence is not yet implemented.",
            ),
            (
                "max_concurrent_sheets",
                self._config.max_concurrent_sheets, 10,
                "config.reserved_field_ignored",
                "max_concurrent_sheets is reserved for Phase 3 scheduler "
                "and has no effect. Jobs currently run monolithically "
                "via JobService.",
            ),
        ]
        for field_name, current, default, event, msg in _unenforced_fields:
            if current != default:
                _logger.warning(event, field=field_name, value=current, message=msg)

        handler = RequestHandler()

        # 4. Create health checker
        from marianne.daemon.health import HealthChecker

        health = HealthChecker(
            self._manager,
            self._monitor,
            start_time=self._start_time,
            learning_store=(
                self._manager._learning_hub.store
                if self._manager._learning_hub
                else None
            ),
        )

        # 5. Register RPC methods (adapt JobManager to handler signature)
        self._register_methods(handler, self._manager, health)

        # 6. Start server
        server = DaemonServer(
            self._config.socket.path,
            handler,
            permissions=self._config.socket.permissions,
            max_connections=self._config.socket.backlog,
        )
        await server.start()

        # 7. Install signal handlers (tracked to surface exceptions)
        loop = asyncio.get_running_loop()

        from collections.abc import Callable

        def _make_signal_callback(
            signum: signal.Signals,
        ) -> Callable[[], None]:
            """Create a signal callback that captures ``signum`` by value."""
            def _cb() -> None:
                self._track_signal_task(
                    asyncio.create_task(
                        self._handle_signal(signum, self._manager, server),
                    ),
                )
            return _cb

        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, _make_signal_callback(sig))

        def _sighup_callback() -> None:
            self._track_signal_task(
                asyncio.create_task(self._handle_sighup()),
            )

        loop.add_signal_handler(signal.SIGHUP, _sighup_callback)

        # 8. Start resource monitor
        interval = self._config.monitor_interval_seconds
        await self._monitor.start(interval_seconds=interval)

        # 8.4. Start entropy monitoring (v25 evolution)
        # Wire health checker callback into manager for job completion tracking
        self._manager._entropy_check_callback = health.on_job_completed
        await health.start_periodic_checks()

        # 8.5. Start profiler collector (after monitor, needs event bus)
        if self._config.profiler.enabled:
            from marianne.daemon.profiler.collector import ProfilerCollector

            self._profiler = ProfilerCollector(
                config=self._config.profiler,
                monitor=self._monitor,
                pgroup=self._pgroup,
                event_bus=self._manager.event_bus,
                manager=self._manager,
            )
            await self._profiler.start()

        # 8.6. Start correlation analyzer (after profiler, needs storage + learning hub)
        if self._config.profiler.enabled and self._profiler is not None:
            from marianne.daemon.profiler.correlation import CorrelationAnalyzer

            self._correlation = CorrelationAnalyzer(
                storage=self._profiler._storage,
                learning_hub=self._manager.learning_hub,
                config=self._config.profiler.correlation,
            )
            await self._correlation.start(self._manager.event_bus)

        # 9. Run until shutdown
        _logger.info(
            "daemon.started",
            pid=os.getpid(),
            socket=str(self._config.socket.path),
        )
        await self._manager.wait_for_shutdown()

        # 10. Cleanup — correlation, profiler, entropy, monitor
        if self._correlation is not None:
            await self._correlation.stop()
        if self._profiler is not None:
            await self._profiler.stop()
        await health.stop_periodic_checks()
        await self._monitor.stop()
        await server.stop()

        # 11. Kill remaining children in process group (issue #38)
        self._pgroup.kill_all_children()
        orphans = self._pgroup.cleanup_orphans()
        if orphans:
            _logger.info(
                "daemon.shutdown_orphans_cleaned",
                count=len(orphans),
            )

        _logger.info("daemon.stopped")
    finally:
        # Always remove PID file, even on crash
        self._config.pid_file.unlink(missing_ok=True)

GlobalSheetScheduler

GlobalSheetScheduler(config)

Cross-job sheet-level scheduler using a priority min-heap.

Status: Phase 3 infrastructure — built and tested, not yet wired into the execution path.

Manages a global priority queue of sheets from all active jobs. Enforces: - max_concurrent_sheets from DaemonConfig - Per-job fair-share limits (penalty-based, not hard-block) - Per-job hard cap at fair_share * max_per_job_multiplier - DAG dependency awareness via completed-set tracking

The scheduler does NOT own sheet execution — it decides which sheet should run next and the JobManager performs the actual execution.

Integration plan (future): 1. JobManager._run_job_task() calls register_job() with all sheets parsed from the JobConfig. 2. A dispatch loop calls next_sheet() and spawns per-sheet tasks instead of calling JobService.start_job() monolithically. 3. Each per-sheet task calls mark_complete() on finish. 4. cancel_job() / shutdown calls deregister_job() for cleanup.

Source code in src/marianne/daemon/scheduler.py
def __init__(self, config: DaemonConfig) -> None:
    self._config = config
    self._max_concurrent = config.max_concurrent_sheets

    # Priority heap
    self._queue: list[SheetEntry] = []

    # Tracking
    self._running: dict[str, set[int]] = {}  # job_id → running sheet_nums
    self._active_count = 0

    # Per-job DAG tracking
    self._job_completed: dict[str, set[int]] = {}
    self._job_deps: dict[str, dict[int, set[int]]] = {}  # job_id → {sheet → {deps}}
    self._job_all_sheets: dict[str, list[SheetInfo]] = {}  # for re-enqueue

    # Fair-share tuning
    self._fair_share_overage_penalty = 20.0
    self._max_per_job_multiplier = 2.0

    # Rate-limiter error tolerance: after this many consecutive
    # rate-limiter errors, a sheet is dropped from the queue to
    # prevent infinite cycling.
    self._max_rate_limit_skips = 10

    # Optional downstream integrations (set via setters)
    self._rate_limiter: RateLimitChecker | None = None
    self._backpressure: BackpressureChecker | None = None

    # Concurrency control
    self._lock = asyncio.Lock()
Attributes
active_count property
active_count

Number of sheets currently executing.

queued_count property
queued_count

Number of sheets waiting in the queue.

Functions
set_rate_limiter
set_rate_limiter(rate_limiter)

Wire up the rate limit coordinator (called once during init).

Source code in src/marianne/daemon/scheduler.py
def set_rate_limiter(self, rate_limiter: RateLimitChecker) -> None:
    """Wire up the rate limit coordinator (called once during init)."""
    self._rate_limiter = rate_limiter
set_backpressure
set_backpressure(backpressure)

Wire up the backpressure controller (called once during init).

Source code in src/marianne/daemon/scheduler.py
def set_backpressure(self, backpressure: BackpressureChecker) -> None:
    """Wire up the backpressure controller (called once during init)."""
    self._backpressure = backpressure
register_job async
register_job(job_id, sheets, dependencies=None)

Register a job's sheets with the scheduler.

Enqueues sheets whose dependencies are already satisfied (or have no dependencies). Remaining sheets are enqueued as their deps complete via mark_complete().

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
sheets list[SheetInfo]

All sheets for this job.

required
dependencies dict[int, set[int]] | None

Optional DAG — {sheet_num: {dependency_sheet_nums}}. If None, all sheets are independent.

None

Raises:

Type Description
ValueError

If dependencies contain a cycle (would deadlock).

Source code in src/marianne/daemon/scheduler.py
async def register_job(
    self,
    job_id: str,
    sheets: list[SheetInfo],
    dependencies: dict[int, set[int]] | None = None,
) -> None:
    """Register a job's sheets with the scheduler.

    Enqueues sheets whose dependencies are already satisfied (or have
    no dependencies).  Remaining sheets are enqueued as their deps
    complete via ``mark_complete()``.

    Args:
        job_id: Unique job identifier.
        sheets: All sheets for this job.
        dependencies: Optional DAG — ``{sheet_num: {dependency_sheet_nums}}``.
                      If None, all sheets are independent.

    Raises:
        ValueError: If dependencies contain a cycle (would deadlock).
    """
    if dependencies:
        cycle = self._detect_cycle(dependencies)
        if cycle:
            raise ValueError(
                f"Circular dependency detected in job {job_id}: "
                f"{' → '.join(str(n) for n in cycle)}"
            )

    # Validate sheet_num uniqueness — duplicates would silently corrupt
    # DAG tracking and completion-set logic.
    seen_nums: set[int] = set()
    for info in sheets:
        if info.sheet_num in seen_nums:
            raise ValueError(
                f"Duplicate sheet_num {info.sheet_num} in job {job_id}"
            )
        seen_nums.add(info.sheet_num)

    # Warn on dependencies referencing sheet_nums not in the sheets list.
    # These deps can never be satisfied, so affected sheets will stay
    # blocked forever.  This is a config error, not a hard failure.
    if dependencies:
        for sheet_num, dep_set in dependencies.items():
            phantom_deps = dep_set - seen_nums
            if phantom_deps:
                _logger.warning(
                    "scheduler.phantom_dependencies",
                    job_id=job_id,
                    sheet_num=sheet_num,
                    missing_deps=sorted(phantom_deps),
                    msg="Dependencies reference non-existent sheet_nums; "
                        "affected sheet will never become ready",
                )

    async with self._lock:
        # Guard against duplicate registration: remove stale heap
        # entries from any prior registration of this job_id.
        if job_id in self._job_all_sheets:
            self._queue = [
                e for e in self._queue if e.info.job_id != job_id
            ]
            heapq.heapify(self._queue)
            # Adjust active count for any running sheets being discarded
            old_running = self._running.pop(job_id, set())
            self._active_count = max(0, self._active_count - len(old_running))
            _logger.warning(
                "scheduler.duplicate_register",
                job_id=job_id,
                msg="Re-registering job; previous entries purged",
            )

        deps = dependencies or {}
        self._job_deps[job_id] = deps
        self._job_completed[job_id] = set()
        self._running[job_id] = set()
        self._job_all_sheets[job_id] = list(sheets)

        # Enqueue sheets whose deps are already met
        enqueued = 0
        for info in sheets:
            sheet_deps = deps.get(info.sheet_num, set())
            if not sheet_deps:
                entry = self._make_entry(info)
                heapq.heappush(self._queue, entry)
                enqueued += 1

        _logger.info(
            "scheduler.job_registered",
            job_id=job_id,
            total_sheets=len(sheets),
            immediately_ready=enqueued,
        )
deregister_job async
deregister_job(job_id)

Remove all pending sheets for a cancelled/completed job.

Source code in src/marianne/daemon/scheduler.py
async def deregister_job(self, job_id: str) -> None:
    """Remove all pending sheets for a cancelled/completed job."""
    async with self._lock:
        # Early return for unknown job_ids — avoids needless O(n) queue
        # rebuild and ensures the log message reflects real deregistrations.
        if job_id not in self._job_all_sheets:
            _logger.debug(
                "scheduler.deregister_unknown_job",
                job_id=job_id,
            )
            return

        # Rebuild queue without this job's sheets
        self._queue = [
            e for e in self._queue if e.info.job_id != job_id
        ]
        heapq.heapify(self._queue)

        self._running.pop(job_id, None)
        self._job_completed.pop(job_id, None)
        self._job_deps.pop(job_id, None)
        self._job_all_sheets.pop(job_id, None)

        # Recount active
        self._active_count = sum(
            len(s) for s in self._running.values()
        )

        _logger.info("scheduler.job_deregistered", job_id=job_id)
next_sheet async
next_sheet()

Pop the highest-priority ready sheet, respecting limits.

Returns None if no sheet can run (concurrency full, backpressure active, queue empty, or all queued sheets are rate-limited).

Single-caller constraint: The backpressure delay (asyncio.sleep) runs outside the scheduler lock. If multiple coroutines call next_sheet() concurrently, they may all pass the backpressure check before any acquires the lock, effectively bypassing the delay for concurrent callers. The intended usage is a single dispatch loop calling next_sheet() sequentially — the JobManager dispatch loop satisfies this constraint.

Source code in src/marianne/daemon/scheduler.py
async def next_sheet(self) -> SheetEntry | None:
    """Pop the highest-priority ready sheet, respecting limits.

    Returns None if no sheet can run (concurrency full, backpressure
    active, queue empty, or all queued sheets are rate-limited).

    **Single-caller constraint:** The backpressure delay (``asyncio.sleep``)
    runs *outside* the scheduler lock.  If multiple coroutines call
    ``next_sheet()`` concurrently, they may all pass the backpressure
    check before any acquires the lock, effectively bypassing the delay
    for concurrent callers.  The intended usage is a single dispatch
    loop calling ``next_sheet()`` sequentially — the ``JobManager``
    dispatch loop satisfies this constraint.
    """
    # Check backpressure first (outside scheduler lock per lock ordering).
    # TOCTOU note: backpressure state may change between this check and
    # the lock acquisition below, but this is acceptable — backpressure
    # is advisory and the delay is best-effort.
    if self._backpressure is not None:
        allowed, delay = await self._backpressure.can_start_sheet()
        if not allowed:
            return None
        if delay > 0:
            await asyncio.sleep(delay)

    async with self._lock:
        if self._active_count >= self._max_concurrent:
            return None

        # Re-score all queued entries against current running state.
        # Priorities depend on fair-share (which changes as sheets
        # dispatch), so enqueue-time scores go stale.  With typical
        # queue sizes of 10-100 entries, the O(n log n) rebuild is
        # negligible compared to API call latency.
        refreshed: list[SheetEntry] = []
        for entry in self._queue:
            rescored = SheetEntry(
                priority=self._calculate_priority(entry.info),
                submitted_at=entry.submitted_at,
                info=entry.info,
                rate_limit_skip_count=entry.rate_limit_skip_count,
            )
            refreshed.append(rescored)
        heapq.heapify(refreshed)
        self._queue = refreshed

        skipped: list[SheetEntry] = []
        dropped: list[SheetEntry] = []
        result: SheetEntry | None = None

        while self._queue:
            entry = heapq.heappop(self._queue)
            job_id = entry.info.job_id

            # Check per-job hard cap
            job_running = len(self._running.get(job_id, set()))
            registered_job_count = len(self._job_all_sheets)
            fair_share = self._fair_share(registered_job_count)
            hard_cap = int(fair_share * self._max_per_job_multiplier)
            hard_cap = max(1, hard_cap)

            if job_running >= hard_cap:
                skipped.append(entry)
                continue

            # Check rate limiting (outside-lock would violate ordering,
            # but rate_limiter._lock is #2, scheduler._lock is #1 — OK
            # to call while holding #1)
            if self._rate_limiter is not None:
                try:
                    is_limited, _ = await self._rate_limiter.is_rate_limited(
                        entry.info.backend_type,
                        model=entry.info.model,
                    )
                except (RuntimeError, ValueError, OSError) as e:
                    entry.rate_limit_skip_count += 1
                    if entry.rate_limit_skip_count >= self._max_rate_limit_skips:
                        _logger.error(
                            "scheduler.rate_limiter_error_exhausted",
                            job_id=job_id,
                            sheet_num=entry.info.sheet_num,
                            skip_count=entry.rate_limit_skip_count,
                            max_skips=self._max_rate_limit_skips,
                            error=str(e),
                            error_type=type(e).__name__,
                        )
                        dropped.append(entry)
                    else:
                        _logger.warning(
                            "scheduler.rate_limiter_error",
                            job_id=job_id,
                            sheet_num=entry.info.sheet_num,
                            skip_count=entry.rate_limit_skip_count,
                            max_skips=self._max_rate_limit_skips,
                            error=str(e),
                            error_type=type(e).__name__,
                        )
                        skipped.append(entry)
                    continue
                if is_limited:
                    skipped.append(entry)
                    continue

            # This sheet can run
            result = entry
            break

        # Put skipped entries back
        for s in skipped:
            heapq.heappush(self._queue, s)

        if result is not None:
            job_id = result.info.job_id
            self._running.setdefault(job_id, set()).add(
                result.info.sheet_num,
            )
            self._active_count += 1

            _logger.debug(
                "scheduler.sheet_dispatched",
                job_id=job_id,
                sheet_num=result.info.sheet_num,
                priority=round(result.priority, 2),
                active=self._active_count,
            )

        return result
mark_complete async
mark_complete(job_id, sheet_num, success)

Mark a sheet as done and enqueue newly-ready dependents.

Parameters:

Name Type Description Default
job_id str

The job that owns this sheet.

required
sheet_num int

Which sheet completed.

required
success bool

Whether execution succeeded.

required
Source code in src/marianne/daemon/scheduler.py
async def mark_complete(
    self, job_id: str, sheet_num: int, success: bool,
) -> None:
    """Mark a sheet as done and enqueue newly-ready dependents.

    Args:
        job_id: The job that owns this sheet.
        sheet_num: Which sheet completed.
        success: Whether execution succeeded.
    """
    async with self._lock:
        # Early-return for unknown job_id to prevent memory leaks
        # from stale or misrouted completion messages.
        if job_id not in self._job_all_sheets:
            _logger.debug(
                "scheduler.mark_complete_unknown_job",
                job_id=job_id,
                sheet_num=sheet_num,
            )
            return

        running = self._running.get(job_id)
        actually_was_running = False
        if running is not None and sheet_num in running:
            running.discard(sheet_num)
            actually_was_running = True

        if actually_was_running:
            self._active_count = max(0, self._active_count - 1)

        completed = self._job_completed.setdefault(job_id, set())
        completed.add(sheet_num)

        _logger.debug(
            "scheduler.sheet_completed",
            job_id=job_id,
            sheet_num=sheet_num,
            success=success,
            active=self._active_count,
        )

        # Enqueue newly-ready dependent sheets
        self._enqueue_ready_dependents(job_id)
get_stats async
get_stats()

Return a snapshot of scheduler state.

Source code in src/marianne/daemon/scheduler.py
async def get_stats(self) -> SchedulerStats:
    """Return a snapshot of scheduler state."""
    async with self._lock:
        per_job_running = {
            jid: len(sheets) for jid, sheets in self._running.items()
        }
        per_job_queued: dict[str, int] = {}
        for entry in self._queue:
            jid = entry.info.job_id
            per_job_queued[jid] = per_job_queued.get(jid, 0) + 1

        return SchedulerStats(
            queued=len(self._queue),
            active=self._active_count,
            max_concurrent=self._max_concurrent,
            per_job_running=per_job_running,
            per_job_queued=per_job_queued,
        )

SchedulerStats dataclass

SchedulerStats(queued, active, max_concurrent, per_job_running, per_job_queued)

Statistics snapshot from the scheduler.

SheetEntry dataclass

SheetEntry(priority, submitted_at, info, rate_limit_skip_count=0)

Priority queue entry for a schedulable sheet.

Ordered by (priority, submitted_at) — lower priority value = higher urgency. The info field is excluded from comparison so that heapq ordering is based solely on the numeric sort keys.

SheetInfo dataclass

SheetInfo(job_id, sheet_num, backend_type='claude_cli', model=None, estimated_cost=0.0, dag_depth=0, job_priority=5, retries_so_far=0)

Metadata about a sheet waiting to be scheduled.

DaemonStatus

Bases: BaseModel

Current status snapshot of the running daemon.

Returned by health check / status queries. Provides a lightweight overview without per-job detail.

JobRequest

Bases: BaseModel

Request to submit a job to the daemon.

Sent by clients (CLI, dashboard) to the daemon over IPC. The daemon validates the config and either accepts or rejects.

JobResponse

Bases: BaseModel

Response from the daemon after a job submission.

Returned immediately — does not wait for job completion. Clients poll status separately via DaemonStatus or job-specific queries.