Skip to content

manager

manager

Job manager for the Marianne daemon.

Maps job IDs to asyncio.Tasks, enforces concurrency limits via semaphore, routes IPC requests to JobService, and cancels all tasks on shutdown.

Classes

DaemonJobStatus

Bases: str, Enum

Status values for daemon-managed jobs.

Inherits from str so meta.status serializes directly as a plain string in JSON/dict output — no .value calls needed.

JobMeta dataclass

JobMeta(job_id, config_path, workspace, submitted_at=time(), started_at=None, status=QUEUED, error_message=None, error_traceback=None, chain_depth=None, hook_config=None, concert_config=None, completed_new_work=False, observer=None, pending_modify=None, held_chain_hook=None)

Metadata tracked per job in the manager.

Functions
to_dict
to_dict()

Serialize to a dict suitable for JSON-RPC responses.

Source code in src/marianne/daemon/manager.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dict suitable for JSON-RPC responses."""
    result: dict[str, Any] = {
        "job_id": self.job_id,
        "status": self.status,
        "config_path": str(self.config_path),
        "workspace": str(self.workspace),
        "submitted_at": self.submitted_at,
        "started_at": self.started_at,
    }
    if self.error_message:
        result["error_message"] = self.error_message
    if self.error_traceback:
        result["error_traceback"] = self.error_traceback
    if self.chain_depth is not None:
        result["chain_depth"] = self.chain_depth
    return result

DaemonResourceChecker

DaemonResourceChecker(monitor)

Bridges ResourceMonitor to ParallelExecutor's ResourceChecker protocol.

Used by JobManager to provide backpressure hints to the parallel executor during fanout stages.

Source code in src/marianne/daemon/manager.py
def __init__(self, monitor: ResourceMonitor) -> None:
    self._monitor = monitor
Functions
can_start_parallel_sheet async
can_start_parallel_sheet()

Check if system resource pressure allows another parallel sheet.

Source code in src/marianne/daemon/manager.py
async def can_start_parallel_sheet(self) -> bool:
    """Check if system resource pressure allows another parallel sheet."""
    return self._monitor.is_accepting_work()

JobManager

JobManager(config, *, start_time=None, monitor=None, pgroup=None)

Manages concurrent job execution within the daemon.

Wraps JobService with task tracking and concurrency control. Each submitted job becomes an asyncio.Task that the manager tracks from start to completion/cancellation.

Source code in src/marianne/daemon/manager.py
def __init__(
    self,
    config: DaemonConfig,
    *,
    start_time: float | None = None,
    monitor: ResourceMonitor | None = None,
    pgroup: ProcessGroupManager | None = None,
) -> None:
    self._config = config
    self._start_time = start_time or time.monotonic()
    self._pgroup = pgroup

    # Phase 3: Centralized learning hub.
    # Single GlobalLearningStore shared across all jobs — pattern
    # discoveries in Job A are instantly available to Job B.
    self._learning_hub = LearningHub()

    # Deferred to start() where the learning hub's store is available.
    self._service: JobService | None = None
    self._jobs: dict[str, asyncio.Task[Any]] = {}
    self._job_meta: dict[str, JobMeta] = {}
    # Live CheckpointState per running job — populated by
    # _PublishingBackend on every state_backend.save() so the
    # conductor can serve status from memory, not disk.
    # Keyed by conductor job_id (which may be deduplicated, e.g.
    # "issue-solver-2"), not the config's name field.
    self._live_states: dict[str, CheckpointState] = {}
    # In-process pause events per job — set by pause_job(), checked by
    # the runner at sheet boundaries.  Keyed by conductor job_id.
    self._pause_events: dict[str, asyncio.Event] = {}
    # Explicit config.name → conductor_id mapping.  Populated in
    # _run_job_task when the config is parsed (config.name becomes
    # known).  Used by _on_state_published as a fallback when
    # state.job_id doesn't match any _job_meta key — O(1) lookup
    # instead of the fragile linear scan it replaces.
    self._config_name_to_conductor_id: dict[str, str] = {}
    # Jobs queued as PENDING during rate limit backpressure.
    # Keyed by conductor job_id.  Auto-started when limits clear.
    self._pending_jobs: dict[str, JobRequest] = {}
    self._concurrency_semaphore = asyncio.Semaphore(
        config.max_concurrent_jobs,
    )
    self._id_gen_lock = asyncio.Lock()
    self._shutting_down = False
    self._shutdown_event = asyncio.Event()
    self._recent_failures: deque[float] = deque()
    # v25: Optional entropy check callback set by process.py after health checker init
    self._entropy_check_callback: Callable[[], None] | None = None

    # Phase 3: Global sheet scheduler — lazily initialized via property.
    # Infrastructure is built and tested but not yet wired into the
    # execution path.  Currently, jobs run monolithically via
    # JobService.start_job().  Lazy init avoids allocating resources
    # until Phase 3 is actually wired.
    self._scheduler_instance: GlobalSheetScheduler | None = None

    # Phase 3: Cross-job rate limit coordination.
    # Built and tested; wired into the scheduler so next_sheet()
    # skips rate-limited backends.  Not yet active because the
    # scheduler itself is not yet driving execution.
    self._rate_coordinator = RateLimitCoordinator()

    # Fleet management — tracks running fleets for fleet-level operations
    self._fleet_records: dict[str, Any] = {}

    # Phase 3: Backpressure controller.
    # Uses a single ResourceMonitor instance shared with DaemonProcess
    # for both periodic monitoring and point-in-time backpressure checks.
    # When no monitor is injected (e.g. unit tests), a standalone one
    # is created that only does point-in-time reads.
    self._monitor = monitor or ResourceMonitor(config.resource_limits, manager=self)
    self._backpressure = BackpressureController(
        self._monitor, self._rate_coordinator,
    )

    # Persistent job registry — survives daemon restarts.
    db_path = config.state_db_path.expanduser()
    self._registry = JobRegistry(db_path)

    # Event bus for routing runner and observer events to consumers.
    self._event_bus = EventBus(
        max_queue_size=config.observer.max_queue_size,
    )

    # Semantic analyzer — LLM-based analysis of sheet completions.
    # Initialized in start() after the event bus is ready.
    self._semantic_analyzer: SemanticAnalyzer | None = None

    # Phase 4: Completion snapshots — captures workspace artifacts
    # at job completion with TTL-based cleanup.
    self._snapshot_manager = SnapshotManager()

    # Observer event recorder — persists per-job observer events to JSONL.
    # Initialized eagerly, started in start() after event bus.
    self._observer_recorder: ObserverRecorder | None = None

    # Baton adapter — the execution engine for all jobs.
    # Initialized in start(). Import deferred to avoid circular import.
    from marianne.daemon.baton.adapter import BatonAdapter
    self._baton_adapter: BatonAdapter | None = None
    self._baton_loop_task: asyncio.Task[Any] | None = None
Attributes
uptime_seconds property
uptime_seconds

Seconds since the daemon started (monotonic clock).

shutting_down property
shutting_down

Whether the manager is in the process of shutting down.

running_count property
running_count

Number of currently running jobs.

active_job_count property
active_job_count

Number of concurrently executing jobs (used for fair-share scheduling).

Currently returns running_count (job-level granularity). Phase 3 will replace this with self._scheduler.active_count for sheet-level granularity once per-sheet dispatch is wired.

failure_rate_elevated property
failure_rate_elevated

Whether the recent job failure rate is elevated.

Returns True if more than _FAILURE_RATE_THRESHOLD unexpected exceptions have occurred within the last _FAILURE_RATE_WINDOW seconds. Used by HealthChecker.readiness() to degrade the health signal when systemic failures are occurring.

notifications_degraded property
notifications_degraded

Whether notification delivery is degraded (forwarded from JobService).

scheduler property
scheduler

Access the global sheet scheduler for cross-job coordination.

rate_coordinator property
rate_coordinator

Access the rate limit coordinator for cross-job rate limiting.

backpressure property
backpressure

Access the backpressure controller for load management.

learning_hub property
learning_hub

Access the centralized learning hub.

observer_recorder property
observer_recorder

Access the observer event recorder for IPC.

event_bus property
event_bus

Access the event bus for subscribing to events.

Functions
start async
start()

Start daemon subsystems (learning hub, monitor, etc.).

Source code in src/marianne/daemon/manager.py
async def start(self) -> None:
    """Start daemon subsystems (learning hub, monitor, etc.)."""
    # Open the async registry connection (tables + WAL mode)
    await self._registry.open()

    # Recover orphaned jobs (left running/queued from previous daemon).
    # Pause-aware: check each orphan's checkpoint to distinguish truly
    # running jobs from those that were mid-pause when the daemon died.
    orphans = await self._registry.get_orphaned_jobs()
    if orphans:
        failed_count = 0
        paused_count = 0
        for orphan in orphans:
            target_status = self._classify_orphan(orphan)
            await self._registry.update_status(
                orphan.job_id,
                target_status,
                error_message=(
                    "Daemon restarted while job was active"
                    if target_status == DaemonJobStatus.FAILED
                    else None
                ),
            )
            if target_status == DaemonJobStatus.PAUSED:
                paused_count += 1
            else:
                failed_count += 1
        _logger.info(
            "manager.orphans_recovered",
            count=len(orphans),
            failed=failed_count,
            paused=paused_count,
        )

    # Restore ALL job metadata from registry into memory so that
    # RPC handlers (status, resume, pause, errors, …) work for
    # jobs from previous daemon sessions without per-method fallback.
    # F-077: Also restore hook_config so on_success hooks fire after restart.
    all_records = await self._registry.list_jobs(limit=10_000)
    for record in all_records:
        if record.job_id not in self._job_meta:
            # Restore hook_config from registry (F-077: was missing,
            # causing on_success hooks to silently stop after restart)
            hook_config: list[dict[str, Any]] | None = None
            hook_json = await self._registry.get_hook_config(
                record.job_id,
            )
            if hook_json:
                import json
                hook_config = json.loads(hook_json)

            self._job_meta[record.job_id] = JobMeta(
                job_id=record.job_id,
                config_path=Path(record.config_path),
                workspace=Path(record.workspace),
                submitted_at=record.submitted_at,
                started_at=record.started_at,
                status=record.status,
                error_message=record.error_message,
                hook_config=hook_config,
            )
    if all_records:
        _logger.info(
            "manager.registry_restored",
            total=len(all_records),
            loaded=len(self._job_meta),
        )

    await self._learning_hub.start()
    await self._event_bus.start()
    # Create service with shared store now that the hub is initialized
    self._service = JobService(
        output=StructuredOutput(event_bus=self._event_bus),
        global_learning_store=self._learning_hub.store,
        rate_limit_callback=self._on_rate_limit,
        event_callback=self._on_event,
        state_publish_callback=self._on_state_published,
        registry=self._registry,
        token_warning_threshold=self._config.preflight.token_warning_threshold,
        token_error_threshold=self._config.preflight.token_error_threshold,
        pgroup_manager=self._pgroup,
    )
    # Start semantic analyzer after event bus (needs bus for subscription).
    # Failure must not prevent the conductor from starting.
    try:
        from marianne.execution.setup import create_backend_from_config

        semantic_backend = create_backend_from_config(
            self._config.learning.backend,
        )
        self._semantic_analyzer = SemanticAnalyzer(
            config=self._config.learning,
            backend=semantic_backend,
            learning_hub=self._learning_hub,
            live_states=self._live_states,
        )
        await self._semantic_analyzer.start(self._event_bus)
    except (OSError, ValueError, RuntimeError, TypeError):
        _logger.warning(
            "manager.semantic_analyzer_start_failed",
            exc_info=True,
        )
        self._semantic_analyzer = None

    # Start observer recorder after event bus (needs bus for subscription).
    # Guard: observer.enabled, NOT persist_events. The ring buffer serves
    # mzt top even when persistence is off.
    if self._config.observer.enabled:
        self._observer_recorder = ObserverRecorder(
            config=self._config.observer,
        )
        await self._observer_recorder.start(self._event_bus)

    # Initialize the baton execution engine.
    from marianne.daemon.baton.adapter import BatonAdapter
    from marianne.daemon.baton.backend_pool import BackendPool
    from marianne.instruments.loader import load_all_profiles
    from marianne.instruments.registry import InstrumentRegistry

    # Build instrument registry with all available profiles
    profiles = load_all_profiles()
    registry = InstrumentRegistry()
    for profile in profiles.values():
        registry.register(profile, override=True)

    self._baton_adapter = BatonAdapter(
        event_bus=self._event_bus,
        max_concurrent_sheets=self._config.max_concurrent_sheets,
        persist_callback=self._on_baton_persist,
    )
    self._baton_adapter.set_backend_pool(
        BackendPool(registry, pgroup=self._pgroup)
    )

    # Populate per-model concurrency from instrument profiles
    for profile in profiles.values():
        for model in profile.models:
            self._baton_adapter._baton.set_model_concurrency(
                profile.name, model.name, model.max_concurrent,
            )

    # Start the baton's event loop as a background task
    self._baton_loop_task = asyncio.create_task(
        self._baton_adapter.run(),
        name="baton-loop",
    )
    _logger.info("manager.baton_adapter_started")

    # Recover paused orphans through the baton.
    await self._recover_baton_orphans()

    _logger.info(
        "manager.started",
        scheduler_status="lazy_not_wired",
        scheduler_note="Phase 3 scheduler is lazily initialized and not "
        "yet driving execution. Jobs run through the baton engine.",
        semantic_analyzer="active" if self._semantic_analyzer else "unavailable",
        observer_recorder="active" if self._observer_recorder else "unavailable",
        baton_adapter="active",
    )
apply_config
apply_config(new_config)

Hot-apply reloadable config fields from a SIGHUP reload.

Compares the new config against the current one and applies changes that can be safely updated at runtime. Rebuilds the concurrency semaphore if max_concurrent_jobs changed.

Safe because asyncio is single-threaded — this runs in the event loop, so no concurrent access to _config or _concurrency_semaphore is possible.

Source code in src/marianne/daemon/manager.py
def apply_config(self, new_config: DaemonConfig) -> None:
    """Hot-apply reloadable config fields from a SIGHUP reload.

    Compares the new config against the current one and applies
    changes that can be safely updated at runtime.  Rebuilds the
    concurrency semaphore if ``max_concurrent_jobs`` changed.

    Safe because asyncio is single-threaded — this runs in the
    event loop, so no concurrent access to ``_config`` or
    ``_concurrency_semaphore`` is possible.
    """
    old = self._config

    # Rebuild semaphore if concurrency limit changed
    if new_config.max_concurrent_jobs != old.max_concurrent_jobs:
        _logger.info(
            "manager.config_reloaded",
            field="max_concurrent_jobs",
            old_value=old.max_concurrent_jobs,
            new_value=new_config.max_concurrent_jobs,
        )
        self._concurrency_semaphore = asyncio.Semaphore(
            new_config.max_concurrent_jobs,
        )

    # Log other changed reloadable fields
    _reloadable_fields = [
        "job_timeout_seconds",
        "shutdown_timeout_seconds",
        "max_job_history",
        "monitor_interval_seconds",
    ]
    for field_name in _reloadable_fields:
        old_val = getattr(old, field_name)
        new_val = getattr(new_config, field_name)
        if old_val != new_val:
            _logger.info(
                "manager.config_reloaded",
                field=field_name,
                old_value=old_val,
                new_value=new_val,
            )

    self._config = new_config

    # Propagate preflight thresholds to job service for new runners
    if self._service is not None:
        self._service._token_warning_threshold = new_config.preflight.token_warning_threshold
        self._service._token_error_threshold = new_config.preflight.token_error_threshold
update_job_config_metadata
update_job_config_metadata(job_id, *, config_path=None, workspace=None)

Update config-derived metadata in the in-memory job map.

Source code in src/marianne/daemon/manager.py
def update_job_config_metadata(
    self,
    job_id: str,
    *,
    config_path: Path | None = None,
    workspace: Path | None = None,
) -> None:
    """Update config-derived metadata in the in-memory job map."""
    meta = self._job_meta.get(job_id)
    if meta is None:
        return
    if config_path is not None:
        meta.config_path = config_path
    if workspace is not None:
        meta.workspace = workspace
submit_job async
submit_job(request)

Validate config, create task, return immediately.

Source code in src/marianne/daemon/manager.py
async def submit_job(self, request: JobRequest) -> JobResponse:
    """Validate config, create task, return immediately."""
    if self._shutting_down:
        return JobResponse(
            job_id="",
            status="rejected",
            message="Daemon is shutting down",
        )

    if not self._backpressure.should_accept_job():
        # F-149: should_accept_job() only rejects for resource pressure
        # (memory/processes). Rate limits are per-instrument and handled
        # at the sheet dispatch level — they don't block job submission.
        return JobResponse(
            job_id="",
            status="rejected",
            message="System under high resource pressure — try again later",
        )

    job_id = self._get_job_id(request.config_path.stem)

    # Validate config exists and resolve workspace BEFORE acquiring the
    # lock. Config parsing is expensive and doesn't need serialization
    # — it's idempotent and job-independent.
    if not request.config_path.exists():
        return JobResponse(
            job_id=job_id,
            status="rejected",
            message=f"Config file not found: {request.config_path}",
        )

    # Fleet detection: route fleet configs to the fleet manager
    from marianne.daemon.fleet import is_fleet_config, submit_fleet

    if is_fleet_config(request.config_path):
        from marianne.core.config.fleet import FleetConfig

        try:
            with open(request.config_path) as f:
                raw = yaml.safe_load(f)
            fleet_config = FleetConfig.model_validate(raw)
        except Exception as exc:
            return JobResponse(
                job_id=job_id,
                status="rejected",
                message=f"Failed to parse fleet config: {exc}",
            )
        return await submit_fleet(self, request.config_path, fleet_config)

    # Parse config for workspace resolution and hook extraction.
    # When workspace is provided explicitly, parsing is best-effort
    # (hooks won't be available if it fails, but the job still runs).
    from marianne.core.config import JobConfig

    parsed_config: JobConfig | None = None
    if request.workspace:
        workspace = request.workspace
        try:
            parsed_config = JobConfig.from_yaml(request.config_path)
        except (ValueError, OSError, KeyError, yaml.YAMLError):
            _logger.debug(
                "manager.config_parse_for_hooks_failed",
                job_id=job_id,
                config_path=str(request.config_path),
            )
    else:
        try:
            parsed_config = JobConfig.from_yaml(request.config_path)
            workspace = parsed_config.workspace
        except (ValueError, OSError, KeyError, yaml.YAMLError) as exc:
            _logger.error(
                "manager.config_parse_failed",
                job_id=job_id,
                config_path=str(request.config_path),
                exc_info=True,
            )
            return JobResponse(
                job_id=job_id,
                status="rejected",
                message=(
                    f"Failed to parse config file: "
                    f"{request.config_path} ({exc}). "
                    "Cannot determine workspace. "
                    "Fix the config or pass --workspace explicitly."
                ),
            )

    # Resolve relative workspace against client_cwd (working directory fix).
    # When the CLI sends client_cwd, relative workspace paths from the
    # config should resolve against where the user invoked the command,
    # not where the daemon was spawned.
    if request.client_cwd and not workspace.is_absolute():
        workspace = (request.client_cwd / workspace).resolve()
        _logger.debug(
            "manager.workspace_resolved_from_client_cwd",
            job_id=job_id,
            client_cwd=str(request.client_cwd),
            workspace=str(workspace),
        )

    # Extract hook config from parsed config for daemon-owned execution.
    hook_config_list: list[dict[str, Any]] | None = None
    concert_config_dict: dict[str, Any] | None = None
    if parsed_config and parsed_config.on_success:
        hook_config_list = [
            h.model_dump(mode="json") for h in parsed_config.on_success
        ]
    if parsed_config and parsed_config.concert.enabled:
        concert_config_dict = parsed_config.concert.model_dump(mode="json")

    # Early workspace validation: reject jobs whose workspace parent
    # doesn't exist or isn't writable, instead of failing deep in
    # JobService.start_job(). Workspace itself may not exist yet —
    # it gets created by JobService — but the parent must be valid.
    ws_parent = workspace.parent
    if not ws_parent.exists():
        return JobResponse(
            job_id=job_id,
            status="rejected",
            message=(
                f"Workspace parent directory does not exist: {ws_parent}. "
                "Create the parent directory or change the workspace path."
            ),
        )
    if not os.access(ws_parent, os.W_OK):
        return JobResponse(
            job_id=job_id,
            status="rejected",
            message=(
                f"Workspace parent directory is not writable: {ws_parent}. "
                "Fix permissions or change the workspace path."
            ),
        )

    # Serialize only the duplicate-check → register → insert window
    # to prevent TOCTOU races between concurrent submissions.
    async with self._id_gen_lock:
        # Reject if a job with this name is already active
        existing = self._job_meta.get(job_id)
        if existing and existing.status in (DaemonJobStatus.QUEUED, DaemonJobStatus.RUNNING):
            return JobResponse(
                job_id=job_id,
                status="rejected",
                message=(
                    f"Job '{job_id}' is already {existing.status.value}. "
                    "Use 'mzt pause' or 'mzt cancel' first, or wait for it to finish."
                ),
            )

        # Auto-detect changed score file on re-run (#103).
        # When a COMPLETED job exists and --fresh wasn't set, check
        # if the score file was modified after the last run completed.
        if not request.fresh:
            record = await self._registry.get_job(job_id)
            if (
                record is not None
                and record.status == DaemonJobStatus.COMPLETED
                and _should_auto_fresh(request.config_path, record.completed_at)
            ):
                request = request.model_copy(update={"fresh": True})
                _logger.info(
                    "auto_fresh.score_changed",
                    job_id=job_id,
                    config_path=str(request.config_path),
                    message="Score file modified since last completed run — starting fresh",
                )

        meta = JobMeta(
            job_id=job_id,
            config_path=request.config_path,
            workspace=workspace,
            chain_depth=request.chain_depth,
            hook_config=hook_config_list,
            concert_config=concert_config_dict,
        )
        # Register in DB first — if this fails, no phantom in-memory entry
        await self._registry.register_job(job_id, request.config_path, workspace)
        self._job_meta[job_id] = meta

        # Persist hook config to registry for restart resilience
        if hook_config_list:
            import json
            await self._registry.store_hook_config(
                job_id, json.dumps(hook_config_list),
            )

    try:
        task = asyncio.create_task(
            self._run_job_task(job_id, request),
            name=f"job-{job_id}",
        )
    except RuntimeError:
        # Clean up metadata if task creation fails
        # RuntimeError is raised by asyncio when no running event loop
        self._job_meta.pop(job_id, None)
        await self._registry.update_status(
            job_id, DaemonJobStatus.FAILED, error_message="Task creation failed",
        )
        raise
    self._jobs[job_id] = task
    task.add_done_callback(lambda t: self._on_task_done(job_id, t))

    _logger.info(
        "job.submitted",
        job_id=job_id,
        config_path=str(request.config_path),
    )

    return JobResponse(
        job_id=job_id,
        status="accepted",
        message=f"Job queued (concurrency limit: {self._config.max_concurrent_jobs})",
    )
get_job_status async
get_job_status(job_id, workspace=None)

Get full status of a specific job.

Resolution order (no workspace/disk fallback): 1. Live in-memory state (running jobs) 2. Registry checkpoint (historical jobs — persisted on every save) 3. Basic metadata (jobs that never ran / pre-checkpoint registry)

Source code in src/marianne/daemon/manager.py
async def get_job_status(self, job_id: str, workspace: Path | None = None) -> dict[str, Any]:
    """Get full status of a specific job.

    Resolution order (no workspace/disk fallback):
    1. Live in-memory state (running jobs)
    2. Registry checkpoint (historical jobs — persisted on every save)
    3. Basic metadata (jobs that never ran / pre-checkpoint registry)
    """
    _ = workspace  # Unused — daemon is the single source of truth

    meta = self._job_meta.get(job_id)
    record: JobRecord | None = None
    if meta is None:
        # Check the persistent registry for historical jobs
        record = await self._registry.get_job(job_id)
        if record is None:
            raise JobSubmissionError(f"Job '{job_id}' not found")

    # 1. Live in-memory state (running jobs)
    live = self._live_states.get(job_id)
    if live is not None:
        return live.model_dump(mode="json")

    # 2. Registry checkpoint (historical/terminal jobs)
    #    Skip if meta shows an active status — the checkpoint is stale
    #    between resume acceptance and the first new state save.
    _active = (DaemonJobStatus.QUEUED, DaemonJobStatus.RUNNING)
    if meta is None or meta.status not in _active:
        try:
            checkpoint_json = await self._registry.load_checkpoint(job_id)
            if checkpoint_json is not None:
                import json
                data: dict[str, Any] = json.loads(checkpoint_json)
                # Override checkpoint status with the registry's
                # authoritative status. The checkpoint may have been
                # persisted before a cancel/fail was recorded in the
                # registry's status column.
                authoritative_status = (
                    meta.status.value if meta is not None
                    else (record.status.value if record is not None else None)
                )
                if authoritative_status and data.get("status") != authoritative_status:
                    data["status"] = authoritative_status
                return data
        except Exception:
            _logger.debug(
                "get_job_status.registry_checkpoint_failed",
                job_id=job_id,
                exc_info=True,
            )

    # 2b. Detect stale RUNNING status (no live state + no running task).
    #     This happens when meta was restored from the registry after a
    #     daemon restart but the job's process no longer exists.
    if meta is not None and meta.status == DaemonJobStatus.RUNNING:
        task = self._jobs.get(job_id)
        if task is None or task.done():
            _logger.info(
                "get_job_status.stale_running_corrected",
                job_id=job_id,
            )
            await self._set_job_status(
                job_id, DaemonJobStatus.FAILED,
            )
            # Now fall through to return the corrected checkpoint
            # or metadata below.
            try:
                checkpoint_json = await self._registry.load_checkpoint(
                    job_id,
                )
                if checkpoint_json is not None:
                    import json as _json
                    data = _json.loads(checkpoint_json)
                    # Override the checkpoint's stale status
                    data["status"] = "failed"
                    return data
            except Exception:
                _logger.debug(
                    "get_job_status.checkpoint_load_after_fail",
                    job_id=job_id,
                    exc_info=True,
                )

    # 3. Basic metadata (job never produced a checkpoint, or active job
    #    whose registry checkpoint is stale)
    if meta is not None:
        return meta.to_dict()
    assert record is not None  # guaranteed by the check above
    return record.to_dict()
pause_job async
pause_job(job_id)

Send pause signal to a running job via in-process event.

Prefers the in-process _pause_events dict (set during _run_managed_task). Falls back to JobService.pause_job when no event exists (shouldn't happen in daemon mode, but guards against edge cases).

Source code in src/marianne/daemon/manager.py
async def pause_job(self, job_id: str) -> bool:
    """Send pause signal to a running job via in-process event.

    Prefers the in-process ``_pause_events`` dict (set during
    ``_run_managed_task``).  Falls back to ``JobService.pause_job``
    when no event exists (shouldn't happen in daemon mode, but
    guards against edge cases).
    """
    meta = self._job_meta.get(job_id)
    if meta is None:
        raise JobSubmissionError(f"Job '{job_id}' not found")
    if meta.status != DaemonJobStatus.RUNNING:
        raise JobSubmissionError(
            f"Job '{job_id}' is {meta.status.value}, not running"
        )

    # Verify there's an actual running task (guards against stale
    # "running" status restored from registry after daemon restart)
    task = self._jobs.get(job_id)
    if task is None or task.done():
        await self._set_job_status(job_id, DaemonJobStatus.FAILED)
        raise JobSubmissionError(
            f"Job '{job_id}' has no running process "
            f"(stale status after daemon restart)"
        )

    # Baton path: inject PauseJob event directly into the baton.
    # The baton sets job.paused=True immediately, which stops dispatch
    # for this job regardless of whether sheets are active.
    if self._baton_adapter is not None:
        from marianne.daemon.baton.events import PauseJob
        await self._baton_adapter._baton.inbox.put(
            PauseJob(job_id=job_id)
        )
        _logger.info("job.baton_pause_sent", job_id=job_id)
        await self._set_job_status(job_id, DaemonJobStatus.PAUSED)
        return True

    # Prefer in-process event (no filesystem access needed)
    event = self._pause_events.get(job_id)
    if event is not None:
        event.set()
        _logger.info("job.pause_event_set", job_id=job_id)
        return True

    # Fallback: filesystem-based pause via JobService
    return await self._checked_service.pause_job(meta.job_id, meta.workspace)
resume_job async
resume_job(job_id, workspace=None, config_path=None, no_reload=False)

Resume a paused or failed job by creating a new task.

If an old task for this job is still running (e.g., not yet fully paused), it is cancelled before the new resume task is created to prevent detached/duplicate execution.

Parameters:

Name Type Description Default
job_id str

ID of the job to resume.

required
workspace Path | None

Optional workspace override.

None
config_path Path | None

Optional new config file path. When provided, updates meta.config_path so the resume task loads the new config.

None
no_reload bool

If True, skip auto-reload from disk and use cached config snapshot. Threaded from CLI --no-reload flag.

False
Source code in src/marianne/daemon/manager.py
async def resume_job(
    self,
    job_id: str,
    workspace: Path | None = None,
    config_path: Path | None = None,
    no_reload: bool = False,
) -> JobResponse:
    """Resume a paused or failed job by creating a new task.

    If an old task for this job is still running (e.g., not yet fully
    paused), it is cancelled before the new resume task is created to
    prevent detached/duplicate execution.

    Args:
        job_id: ID of the job to resume.
        workspace: Optional workspace override.
        config_path: Optional new config file path. When provided, updates
            meta.config_path so the resume task loads the new config.
        no_reload: If True, skip auto-reload from disk and use cached
            config snapshot. Threaded from CLI ``--no-reload`` flag.
    """
    meta = self._job_meta.get(job_id)
    if meta is None:
        raise JobSubmissionError(f"Score '{job_id}' not found")
    _resumable = (
        DaemonJobStatus.PAUSED,
        DaemonJobStatus.PAUSED_AT_CHAIN,
        DaemonJobStatus.FAILED,
        DaemonJobStatus.CANCELLED,
    )
    if meta.status not in _resumable:
        raise JobSubmissionError(
            f"Score '{job_id}' is {meta.status.value}, "
            "only PAUSED, PAUSED_AT_CHAIN, FAILED, or CANCELLED scores can be resumed"
        )

    # PAUSED_AT_CHAIN: trigger the held chain instead of normal resume
    if meta.status == DaemonJobStatus.PAUSED_AT_CHAIN and meta.held_chain_hook:
        return await self._resume_held_chain(job_id, meta)

    # Cancel stale task to prevent detached execution
    old_task = self._jobs.pop(job_id, None)
    if old_task is not None and not old_task.done():
        old_task.cancel(msg=f"stale task replaced by resume of {job_id}")
        _logger.info("job.resume_cancelled_stale_task", job_id=job_id)

    # Apply new config path before creating the task (task reads meta.config_path)
    if config_path is not None:
        meta.config_path = config_path

    ws = workspace or meta.workspace
    await self._set_job_status(job_id, DaemonJobStatus.QUEUED)

    task = asyncio.create_task(
        self._resume_job_task(job_id, ws, no_reload=no_reload),
        name=f"job-resume-{job_id}",
    )
    self._jobs[job_id] = task
    task.add_done_callback(lambda t: self._on_task_done(job_id, t))

    return JobResponse(
        job_id=job_id,
        status="accepted",
        message="Job resume queued",
    )
modify_job async
modify_job(job_id, config_path, workspace=None)

Pause a running job and queue automatic resume with new config.

If the job is already paused/failed/cancelled, resume immediately. If running, send pause signal and store pending_modify — _on_task_done will trigger the resume when the task completes (pauses).

Source code in src/marianne/daemon/manager.py
async def modify_job(
    self, job_id: str, config_path: Path, workspace: Path | None = None,
) -> JobResponse:
    """Pause a running job and queue automatic resume with new config.

    If the job is already paused/failed/cancelled, resume immediately.
    If running, send pause signal and store pending_modify — _on_task_done
    will trigger the resume when the task completes (pauses).
    """
    meta = self._job_meta.get(job_id)
    if meta is None:
        raise JobSubmissionError(f"Job '{job_id}' not found")

    ws = workspace or meta.workspace

    # Already resumable — resume immediately with new config
    _resumable = (
        DaemonJobStatus.PAUSED,
        DaemonJobStatus.FAILED,
        DaemonJobStatus.CANCELLED,
    )
    if meta.status in _resumable:
        meta.config_path = config_path
        return await self.resume_job(job_id, ws)

    if meta.status != DaemonJobStatus.RUNNING:
        raise JobSubmissionError(
            f"Job '{job_id}' is {meta.status.value}, cannot modify"
        )

    # Send pause signal
    await self.pause_job(job_id)

    # Store pending action — _on_task_done will resume when the job pauses
    meta.pending_modify = (config_path, ws)

    # Baton path: the old task sits on wait_for_completion() and won't
    # exit just because the baton paused dispatch. Cancel the task so
    # _on_task_done fires and triggers the deferred resume.
    if self._baton_adapter is not None:
        old_task = self._jobs.get(job_id)
        if old_task is not None and not old_task.done():
            old_task.cancel(msg=f"modify: paused for config reload on {job_id}")
            _logger.info("job.modify_cancelled_baton_task", job_id=job_id)

    return JobResponse(
        job_id=job_id,
        status="accepted",
        message=f"Pause signal sent. Will resume with {config_path.name} when paused.",
    )
cancel_job async
cancel_job(job_id)

Cancel a running or pending job.

For running jobs: sends the cancel signal and updates in-memory status immediately, then defers heavyweight I/O to a background task.

For pending jobs (queued during rate limit backpressure): removes from the pending queue and updates the registry.

Source code in src/marianne/daemon/manager.py
async def cancel_job(self, job_id: str) -> bool:
    """Cancel a running or pending job.

    For running jobs: sends the cancel signal and updates in-memory
    status immediately, then defers heavyweight I/O to a background task.

    For pending jobs (queued during rate limit backpressure): removes
    from the pending queue and updates the registry.
    """
    # Check pending jobs first (not yet started)
    if job_id in self._pending_jobs:
        del self._pending_jobs[job_id]
        await self._set_job_status(job_id, DaemonJobStatus.CANCELLED)
        # Defer scheduler cleanup
        cleanup = asyncio.create_task(
            self._cancel_cleanup(job_id),
            name=f"cancel-cleanup-{job_id}",
        )
        cleanup.add_done_callback(
            lambda t: log_task_exception(t, _logger, "cancel_cleanup.failed"),
        )
        _logger.info("job.pending_cancelled", job_id=job_id)
        return True

    task = self._jobs.get(job_id)
    if task is None:
        return False

    task.cancel(msg=f"explicit cancel_job({job_id}) via IPC")
    await self._set_job_status(job_id, DaemonJobStatus.CANCELLED)

    # Defer scheduler cleanup so the IPC handler can
    # respond immediately.  In-memory meta is already authoritative.
    cleanup = asyncio.create_task(
        self._cancel_cleanup(job_id),
        name=f"cancel-cleanup-{job_id}",
    )
    cleanup.add_done_callback(
        lambda t: log_task_exception(t, _logger, "cancel_cleanup.failed"),
    )

    _logger.info("job.cancelled", job_id=job_id)
    return True
list_jobs async
list_jobs()

List all jobs with live progress data.

In-memory _job_meta is authoritative for active jobs. Live state from _live_states enriches active entries with sheet-level progress so mzt list shows completion counts. The registry fills in historical jobs.

Source code in src/marianne/daemon/manager.py
async def list_jobs(self) -> list[dict[str, Any]]:
    """List all jobs with live progress data.

    In-memory ``_job_meta`` is authoritative for active jobs.
    Live state from ``_live_states`` enriches active entries with
    sheet-level progress so ``mzt list`` shows completion counts.
    The registry fills in historical jobs.
    """
    seen: set[str] = set()
    result: list[dict[str, Any]] = []

    # Active jobs first — enrich with live progress
    for meta in self._job_meta.values():
        entry = meta.to_dict()
        live = self._live_states.get(meta.job_id)
        if live is not None:
            completed = sum(
                1 for s in live.sheets.values()
                if s.status.value == "completed"
            )
            entry["progress_completed"] = completed
            entry["progress_total"] = len(live.sheets)
        result.append(entry)
        seen.add(meta.job_id)

    # Historical jobs from registry
    for record in await self._registry.list_jobs():
        if record.job_id not in seen:
            result.append(record.to_dict())

    return result
clear_jobs async
clear_jobs(statuses=None, older_than_seconds=None, job_ids=None)

Clear terminal jobs from registry and in-memory metadata.

Parameters:

Name Type Description Default
statuses list[str] | None

Status filter (defaults to terminal statuses).

None
older_than_seconds float | None

Age filter in seconds.

None
job_ids list[str] | None

Only clear these specific job IDs.

None

Returns:

Type Description
dict[str, Any]

Dict with "deleted" count.

Source code in src/marianne/daemon/manager.py
async def clear_jobs(
    self,
    statuses: list[str] | None = None,
    older_than_seconds: float | None = None,
    job_ids: list[str] | None = None,
) -> dict[str, Any]:
    """Clear terminal jobs from registry and in-memory metadata.

    Args:
        statuses: Status filter (defaults to terminal statuses).
        older_than_seconds: Age filter in seconds.
        job_ids: Only clear these specific job IDs.

    Returns:
        Dict with "deleted" count.
    """
    safe_statuses = set(statuses or ["completed", "failed", "cancelled"])
    safe_statuses -= {"queued", "running", "pending"}  # Never clear active jobs

    to_remove: list[str] = []
    now = time.time()
    for jid, meta in self._job_meta.items():
        if job_ids is not None and jid not in job_ids:
            continue
        if meta.status.value not in safe_statuses:
            continue
        if older_than_seconds is not None:
            if (now - meta.submitted_at) < older_than_seconds:
                continue
        to_remove.append(jid)

    for jid in to_remove:
        self._job_meta.pop(jid, None)
        self._live_states.pop(jid, None)

    deleted = await self._registry.delete_jobs(
        job_ids=job_ids,
        statuses=list(safe_statuses),
        older_than_seconds=older_than_seconds,
    )

    _logger.info(
        "manager.clear_jobs",
        in_memory_removed=len(to_remove),
        registry_deleted=deleted,
    )
    return {"deleted": deleted}
clear_rate_limits async
clear_rate_limits(instrument=None)

Clear active rate limits from the coordinator and baton.

Removes the active rate limit so new sheets can be dispatched immediately. Clears both the RateLimitCoordinator (used by the legacy runner and scheduler) and the baton's per-instrument InstrumentState (used by the baton dispatch loop).

Parameters:

Name Type Description Default
instrument str | None

Instrument name to clear, or None for all.

None

Returns:

Type Description
dict[str, Any]

Dict with cleared count and instrument filter.

Source code in src/marianne/daemon/manager.py
async def clear_rate_limits(
    self,
    instrument: str | None = None,
) -> dict[str, Any]:
    """Clear active rate limits from the coordinator and baton.

    Removes the active rate limit so new sheets can be dispatched
    immediately.  Clears both the ``RateLimitCoordinator`` (used by
    the legacy runner and scheduler) and the baton's per-instrument
    ``InstrumentState`` (used by the baton dispatch loop).

    Args:
        instrument: Instrument name to clear, or ``None`` for all.

    Returns:
        Dict with ``cleared`` count and ``instrument`` filter.
    """
    cleared = await self.rate_coordinator.clear_limits(
        instrument=instrument,
    )
    baton_cleared = 0
    if self._baton_adapter is not None:
        baton_cleared = self._baton_adapter.clear_instrument_rate_limit(
            instrument,
        )
        # Kick the baton event loop so dispatch_ready() runs for the
        # newly-PENDING sheets. Without this, the loop blocks on
        # inbox.get() and cleared sheets sit idle until an unrelated
        # event arrives.
        from marianne.daemon.baton.events import DispatchRetry

        self._baton_adapter._baton.inbox.put_nowait(DispatchRetry())
    _logger.info(
        "manager.clear_rate_limits",
        instrument=instrument,
        coordinator_cleared=cleared,
        baton_cleared=baton_cleared,
    )
    # Start any pending jobs now that limits are cleared
    await self._start_pending_jobs()
    return {
        "cleared": cleared + baton_cleared,
        "instrument": instrument,
    }
get_job_errors async
get_job_errors(job_id, workspace=None)

Get errors for a specific job.

Returns the CheckpointState for the CLI to extract error information. Uses the same resolution as get_job_status: live state first, then registry, never workspace files.

Source code in src/marianne/daemon/manager.py
async def get_job_errors(self, job_id: str, workspace: Path | None = None) -> dict[str, Any]:
    """Get errors for a specific job.

    Returns the CheckpointState for the CLI to extract error information.
    Uses the same resolution as get_job_status: live state first, then
    registry, never workspace files.
    """
    _ = workspace  # Conductor DB is the sole source of truth
    state_dict = await self.get_job_status(job_id)
    return {"state": state_dict}
get_diagnostic_report async
get_diagnostic_report(job_id, workspace=None)

Get diagnostic data for a specific job.

Returns the CheckpointState plus workspace path. Uses the same resolution as get_job_status: live state first, then registry.

Source code in src/marianne/daemon/manager.py
async def get_diagnostic_report(
    self, job_id: str, workspace: Path | None = None,
) -> dict[str, Any]:
    """Get diagnostic data for a specific job.

    Returns the CheckpointState plus workspace path. Uses the same
    resolution as get_job_status: live state first, then registry.
    """
    ws = await self._resolve_job_workspace(job_id, workspace)
    state_dict = await self.get_job_status(job_id)
    return {
        "state": state_dict,
        "workspace": str(ws),
    }
get_execution_history async
get_execution_history(job_id, workspace=None, sheet_num=None, limit=50)

Get execution history for a specific job.

Requires the SQLite state backend for history records.

Source code in src/marianne/daemon/manager.py
async def get_execution_history(
    self, job_id: str, workspace: Path | None = None,
    sheet_num: int | None = None, limit: int = 50,
) -> dict[str, Any]:
    """Get execution history for a specific job.

    Requires the SQLite state backend for history records.
    """
    ws = await self._resolve_job_workspace(job_id, workspace)

    from marianne.state import SQLiteStateBackend

    sqlite_path = ws / ".marianne-state.db"
    records: list[dict[str, Any]] = []
    has_history = False

    if sqlite_path.exists():
        backend = SQLiteStateBackend(sqlite_path)
        try:
            if hasattr(backend, 'get_execution_history'):
                records = await backend.get_execution_history(
                    job_id=job_id, sheet_num=sheet_num, limit=limit,
                )
                has_history = True
        finally:
            await backend.close()

    return {
        "job_id": job_id,
        "records": records,
        "has_history": has_history,
    }
recover_job async
recover_job(job_id, workspace=None, sheet_num=None, dry_run=False)

Get state for recover operation.

Returns the job state and workspace for the CLI to run validations locally. The actual validation logic stays in the CLI command to avoid duplicating ValidationEngine setup in the daemon.

Source code in src/marianne/daemon/manager.py
async def recover_job(
    self, job_id: str, workspace: Path | None = None,
    sheet_num: int | None = None, dry_run: bool = False,
) -> dict[str, Any]:
    """Get state for recover operation.

    Returns the job state and workspace for the CLI to run
    validations locally. The actual validation logic stays
    in the CLI command to avoid duplicating ValidationEngine
    setup in the daemon.
    """
    ws = await self._resolve_job_workspace(job_id, workspace)
    state = await self._checked_service.get_status(job_id, ws)
    if state is None:
        raise JobSubmissionError(f"No state found for job '{job_id}'")

    return {
        "state": state.model_dump(mode="json"),
        "workspace": str(ws),
        "dry_run": dry_run,
        "sheet_num": sheet_num,
    }
get_daemon_status async
get_daemon_status()

Build daemon status summary.

Returns all fields required by the DaemonStatus Pydantic model so DaemonClient.status() can deserialize without crashing.

Source code in src/marianne/daemon/manager.py
async def get_daemon_status(self) -> dict[str, Any]:
    """Build daemon status summary.

    Returns all fields required by the ``DaemonStatus`` Pydantic model
    so ``DaemonClient.status()`` can deserialize without crashing.
    """
    mem = self._monitor.current_memory_mb()
    return {
        "pid": os.getpid(),
        "uptime_seconds": round(time.monotonic() - self._start_time, 1),
        "running_jobs": self.running_count,
        "total_jobs_active": self.active_job_count,
        "memory_usage_mb": round(mem, 1) if mem is not None else 0.0,
        "version": getattr(marianne, "__version__", "0.1.0"),
    }
shutdown async
shutdown(graceful=True)

Cancel all running jobs, optionally waiting for sheets.

Deregisters all active jobs from the global sheet scheduler to clean up any pending sheets, running-sheet tracking, and dependency data before the daemon exits.

Source code in src/marianne/daemon/manager.py
async def shutdown(self, graceful: bool = True) -> None:
    """Cancel all running jobs, optionally waiting for sheets.

    Deregisters all active jobs from the global sheet scheduler
    to clean up any pending sheets, running-sheet tracking, and
    dependency data before the daemon exits.
    """
    self._shutting_down = True

    if graceful:
        timeout = self._config.shutdown_timeout_seconds
        _logger.info(
            "manager.shutting_down",
            graceful=True,
            timeout=timeout,
            running_jobs=self.running_count,
        )

        # Wait for running tasks to complete (up to timeout)
        running = [t for t in self._jobs.values() if not t.done()]
        if running:
            _, pending = await asyncio.wait(
                running, timeout=timeout,
            )
            for task in pending:
                task.cancel(msg="graceful shutdown timeout exceeded")
            if pending:
                results = await asyncio.gather(*pending, return_exceptions=True)
                for result in results:
                    if isinstance(result, BaseException):
                        _logger.warning(
                            "manager.shutdown_task_exception",
                            error=str(result),
                            error_type=type(result).__name__,
                        )
    else:
        _logger.info("manager.shutting_down", graceful=False)
        for task in self._jobs.values():
            if not task.done():
                task.cancel(msg="non-graceful shutdown")
        if self._jobs:
            results = await asyncio.gather(
                *self._jobs.values(), return_exceptions=True,
            )
            for result in results:
                if isinstance(result, BaseException):
                    _logger.warning(
                        "manager.shutdown_task_exception",
                        error=str(result),
                        error_type=type(result).__name__,
                    )

    # Deregister all known jobs from the scheduler to clean up
    # heap entries, running-sheet tracking, and dependency data.
    # Uses _job_meta (not _jobs) because task done-callbacks may
    # have already cleared entries from _jobs during cancellation.
    # Guard: only touch the scheduler if it was ever initialized.
    if self._scheduler_instance is not None:
        for job_id in list(self._job_meta.keys()):
            await self._scheduler_instance.deregister_job(job_id)

    self._jobs.clear()

    # Stop the baton adapter: send ShutdownRequested, wait for the
    # event loop to exit, cancel if it doesn't, close backend pool.
    if self._baton_adapter is not None:
        try:
            from marianne.daemon.baton.events import ShutdownRequested
            self._baton_adapter._baton.inbox.put_nowait(
                ShutdownRequested(graceful=graceful)
            )
            # Wait for the baton loop to exit (bounded by 5s)
            if self._baton_loop_task is not None and not self._baton_loop_task.done():
                try:
                    await asyncio.wait_for(self._baton_loop_task, timeout=5.0)
                except (TimeoutError, asyncio.CancelledError):
                    self._baton_loop_task.cancel()
                    try:
                        await self._baton_loop_task
                    except (asyncio.CancelledError, Exception):
                        pass
            # Cancel any remaining musician tasks
            await self._baton_adapter.shutdown()
            _logger.info("manager.baton_adapter_stopped")
        except Exception:
            _logger.warning(
                "manager.baton_adapter_stop_failed", exc_info=True,
            )

    # Stop all observers for any remaining jobs
    for jid in list(self._job_meta.keys()):
        await self._stop_observer(jid)

    # Stop observer recorder before event bus (needs bus for unsubscribe).
    if self._observer_recorder is not None:
        try:
            await self._observer_recorder.stop(self._event_bus)
        except (OSError, RuntimeError):
            _logger.warning(
                "manager.observer_recorder_stop_failed",
                exc_info=True,
            )

    # Stop semantic analyzer before event bus (needs bus for unsubscribe,
    # and learning hub for final writes during drain).
    if self._semantic_analyzer is not None:
        try:
            await self._semantic_analyzer.stop(self._event_bus)
        except asyncio.CancelledError:
            raise
        except (OSError, RuntimeError):
            _logger.warning(
                "manager.semantic_analyzer_stop_failed",
                exc_info=True,
            )

    # Shutdown event bus
    await self._event_bus.shutdown()

    # Stop centralized learning hub (final persist + cleanup)
    await self._learning_hub.stop()

    # Final checkpoint flush: persist ALL live states to registry before
    # closing. Fire-and-forget saves from _on_baton_state_sync may be
    # pending — this synchronous flush ensures no progress is lost.
    flushed = 0
    for jid, live in self._live_states.items():
        try:
            checkpoint_json = live.model_dump_json()
            await self._registry.save_checkpoint(jid, checkpoint_json)
            await self._registry.update_status(
                jid, live.status.value if hasattr(live.status, 'value') else str(live.status),
            )
            flushed += 1
        except Exception:
            _logger.warning(
                "manager.shutdown_flush_failed",
                job_id=jid, exc_info=True,
            )
    if flushed:
        _logger.info("manager.shutdown_checkpoint_flush", flushed=flushed)

    await self._registry.close()
    self._shutdown_event.set()
    _logger.info("manager.shutdown_complete")
wait_for_shutdown async
wait_for_shutdown()

Block until shutdown is complete.

Source code in src/marianne/daemon/manager.py
async def wait_for_shutdown(self) -> None:
    """Block until shutdown is complete."""
    await self._shutdown_event.wait()

Functions