Skip to content

Index

baton

The Baton — Marianne's event-driven execution heart.

The baton is the conductor's primary tool. It doesn't decide what to play — the score does. It doesn't decide how to play — the musicians do. The baton controls when and how much: tempo, dynamics, cues, and fermatas.

The baton replaces the current monolithic execution model where JobService.start_job() runs all sheets sequentially. Instead, the baton manages sheets across all jobs in a single event-driven loop, dispatching them to execution when they're ready and the system can handle them.

Package layout::

events.py       — All BatonEvent types (dataclasses)
timer.py        — Timer wheel (priority queue of future events)
state.py        — Baton state models (sheet/instrument/job tracking)
core.py         — Event inbox, main loop, sheet registry
musician.py     — Single-attempt sheet execution (play once, report)
backend_pool.py — Per-instrument backend instance management
adapter.py      — Wires baton into conductor (step 28)

Classes

BatonAdapter

BatonAdapter(*, event_bus=None, max_concurrent_sheets=10, state_sync_callback=None, persist_callback=None)

Bridges the conductor (JobManager) and the baton (BatonCore).

The adapter owns: - A BatonCore instance (event loop, sheet registry, state machine) - A mapping of job_id → Sheet[] (for prompt rendering at dispatch time) - Active musician tasks (asyncio.Task per dispatched sheet)

The adapter does NOT own: - The BackendPool (injected by the manager) - The EventBus (injected by the manager) - The CheckpointState (managed by the manager's state backend)

Usage::

adapter = BatonAdapter(event_bus=bus)
adapter.set_backend_pool(pool)

# Register a job
adapter.register_job("j1", sheets, deps)

# Run the baton (blocks until shutdown)
await adapter.run()

Initialize the BatonAdapter.

Parameters:

Name Type Description Default
event_bus EventBus | None

Optional EventBus for publishing events to subscribers.

None
max_concurrent_sheets int

Global concurrency ceiling for dispatch.

10
state_sync_callback StateSyncCallback | None

Deprecated — kept for backward compat. Phase 2 uses persist_callback instead.

None
persist_callback PersistCallback | None

Called with job_id after significant state transitions (terminal, dispatch) to persist CheckpointState to the registry. Replaces the sync layer.

None
Source code in src/marianne/daemon/baton/adapter.py
def __init__(
    self,
    *,
    event_bus: EventBus | None = None,
    max_concurrent_sheets: int = 10,
    state_sync_callback: StateSyncCallback | None = None,
    persist_callback: PersistCallback | None = None,
) -> None:
    """Initialize the BatonAdapter.

    Args:
        event_bus: Optional EventBus for publishing events to subscribers.
        max_concurrent_sheets: Global concurrency ceiling for dispatch.
        state_sync_callback: Deprecated — kept for backward compat.
            Phase 2 uses persist_callback instead.
        persist_callback: Called with job_id after significant state
            transitions (terminal, dispatch) to persist CheckpointState
            to the registry. Replaces the sync layer.
    """
    from marianne.daemon.baton.timer import TimerWheel

    # Shared inbox breaks the circular dependency: TimerWheel needs
    # the inbox to deliver fired events, BatonCore needs the timer
    # to schedule them. Create the inbox first, pass to both.
    inbox: asyncio.Queue[Any] = asyncio.Queue()
    self._timer_wheel = TimerWheel(inbox)
    self._baton = BatonCore(timer=self._timer_wheel, inbox=inbox)
    self._event_bus = event_bus
    self._max_concurrent_sheets = max_concurrent_sheets
    self._persist_callback = persist_callback
    # Deprecated compat attributes — tests set/read these directly
    self._state_sync_callback = state_sync_callback
    self._synced_status: dict[tuple[str, int], str] = {}

    # Job → Sheet mapping for prompt rendering
    self._job_sheets: dict[str, dict[int, Sheet]] = {}

    # Active musician tasks: (job_id, sheet_num) → Task
    self._active_tasks: dict[tuple[str, int], asyncio.Task[Any]] = {}

    # BackendPool — injected via set_backend_pool()
    self._backend_pool: BackendPool | None = None

    # Per-job PromptRenderer — created when prompt_config is provided
    self._job_renderers: dict[str, PromptRenderer] = {}

    # Per-job CrossSheetConfig — enables cross-sheet context (F-210)
    self._job_cross_sheet: dict[str, CrossSheetConfig] = {}

    # Per-job completion events — set when all sheets reach terminal state
    self._completion_events: dict[str, asyncio.Event] = {}

    # Per-job completion status: True = all sheets completed, False = has failures
    self._completion_results: dict[str, bool] = {}

    # Running state
    self._running = False
    self._baton_task: asyncio.Task[Any] | None = None
Attributes
baton property
baton

The underlying BatonCore instance.

is_running property
is_running

Whether the baton's event loop is running.

Functions
set_backend_pool
set_backend_pool(pool)

Inject the BackendPool for backend acquisition.

Must be called before dispatching any sheets.

Parameters:

Name Type Description Default
pool BackendPool

The backend pool from the manager.

required
Source code in src/marianne/daemon/baton/adapter.py
def set_backend_pool(self, pool: BackendPool) -> None:
    """Inject the BackendPool for backend acquisition.

    Must be called before dispatching any sheets.

    Args:
        pool: The backend pool from the manager.
    """
    self._backend_pool = pool
register_job
register_job(job_id, sheets, dependencies, *, max_cost_usd=None, max_retries=3, max_completion=5, escalation_enabled=False, self_healing_enabled=False, prompt_config=None, parallel_enabled=False, cross_sheet=None, pacing_seconds=0.0, live_sheets=None)

Register a job with the baton for event-driven execution.

Converts Sheet entities to SheetExecutionState and registers them with the baton's sheet registry.

Phase 2: when live_sheets is provided, uses those SheetState objects directly instead of creating new ones. This ensures the baton writes to the same objects that live in _live_states, eliminating the need for a sync layer.

Parameters:

Name Type Description Default
job_id str

Unique job identifier (conductor job_id).

required
sheets list[Sheet]

Sheet entities from build_sheets().

required
dependencies dict[int, list[int]]

Dependency graph {sheet_num: [dep_nums]}.

required
max_cost_usd float | None

Optional per-job cost limit.

None
max_retries int

Max normal retry attempts per sheet.

3
max_completion int

Max completion mode attempts per sheet.

5
escalation_enabled bool

Enter fermata on exhaustion.

False
self_healing_enabled bool

Try self-healing on exhaustion.

False
prompt_config PromptConfig | None

Optional PromptConfig for full prompt rendering. When provided, creates a PromptRenderer for this job that handles the complete 9-layer prompt assembly pipeline.

None
parallel_enabled bool

Whether parallel execution is enabled (for preamble concurrency warning).

False
cross_sheet CrossSheetConfig | None

Optional CrossSheetConfig for cross-sheet context (F-210). When provided, the adapter collects previous sheet outputs and workspace files at dispatch time.

None
Source code in src/marianne/daemon/baton/adapter.py
def register_job(
    self,
    job_id: str,
    sheets: list[Sheet],
    dependencies: dict[int, list[int]],
    *,
    max_cost_usd: float | None = None,
    max_retries: int = 3,
    max_completion: int = 5,
    escalation_enabled: bool = False,
    self_healing_enabled: bool = False,
    prompt_config: PromptConfig | None = None,
    parallel_enabled: bool = False,
    cross_sheet: CrossSheetConfig | None = None,
    pacing_seconds: float = 0.0,
    live_sheets: dict[int, SheetExecutionState] | None = None,
) -> None:
    """Register a job with the baton for event-driven execution.

    Converts Sheet entities to SheetExecutionState and registers
    them with the baton's sheet registry.

    Phase 2: when ``live_sheets`` is provided, uses those SheetState
    objects directly instead of creating new ones. This ensures the
    baton writes to the same objects that live in ``_live_states``,
    eliminating the need for a sync layer.

    Args:
        job_id: Unique job identifier (conductor job_id).
        sheets: Sheet entities from build_sheets().
        dependencies: Dependency graph {sheet_num: [dep_nums]}.
        max_cost_usd: Optional per-job cost limit.
        max_retries: Max normal retry attempts per sheet.
        max_completion: Max completion mode attempts per sheet.
        escalation_enabled: Enter fermata on exhaustion.
        self_healing_enabled: Try self-healing on exhaustion.
        prompt_config: Optional PromptConfig for full prompt rendering.
            When provided, creates a PromptRenderer for this job that
            handles the complete 9-layer prompt assembly pipeline.
        parallel_enabled: Whether parallel execution is enabled
            (for preamble concurrency warning).
        cross_sheet: Optional CrossSheetConfig for cross-sheet context
            (F-210). When provided, the adapter collects previous sheet
            outputs and workspace files at dispatch time.
    """
    # Store sheets for prompt rendering at dispatch time
    self._job_sheets[job_id] = {s.num: s for s in sheets}

    # Store cross-sheet config (F-210)
    if cross_sheet is not None:
        self._job_cross_sheet[job_id] = cross_sheet

    # Create PromptRenderer if config is available (F-104)
    if prompt_config is not None:
        from marianne.daemon.baton.prompt import PromptRenderer

        total_sheets = len(sheets)
        total_stages = len({s.movement for s in sheets}) or 1
        self._job_renderers[job_id] = PromptRenderer(
            prompt_config=prompt_config,
            total_sheets=total_sheets,
            total_stages=total_stages,
            parallel_enabled=parallel_enabled,
        )

    # Create completion event for this job
    self._completion_events[job_id] = asyncio.Event()

    # Phase 2: use live_sheets (shared with _live_states) when provided.
    # This makes the baton write directly to the same SheetState objects
    # the manager serves via get_job_status — no sync layer needed.
    if live_sheets is not None:
        # Enrich existing SheetState objects with baton scheduling fields
        for sheet in sheets:
            s = live_sheets.get(sheet.num)
            if s is not None:
                s.instrument_name = sheet.instrument_name
                raw_m = sheet.instrument_config.get("model")
                if raw_m is not None:
                    s.model = str(raw_m)
                s.max_retries = max_retries
                s.max_completion = max_completion
                s.fallback_chain = list(sheet.instrument_fallbacks)
                s.sheet_timeout_seconds = sheet.timeout_seconds
        states = live_sheets
    else:
        states = sheets_to_execution_states(
            sheets,
            max_retries=max_retries,
            max_completion=max_completion,
        )

    # Register with baton
    self._baton.register_job(
        job_id,
        states,
        dependencies,
        escalation_enabled=escalation_enabled,
        self_healing_enabled=self_healing_enabled,
        pacing_seconds=pacing_seconds,
    )

    # Set cost limits if configured
    if max_cost_usd is not None:
        self._baton.set_job_cost_limit(job_id, max_cost_usd)

    _logger.info(
        "adapter.job_registered",
        extra={
            "job_id": job_id,
            "sheet_count": len(sheets),
            "dependency_count": len(dependencies),
            "max_retries": max_retries,
            "max_completion": max_completion,
        },
    )

    # Kick the event loop so dispatch_ready runs for the newly registered
    # sheets.  Without this the loop blocks on inbox.get() forever
    # because no musician or timer has produced an event yet.
    self._baton.inbox.put_nowait(DispatchRetry())
deregister_job
deregister_job(job_id)

Remove a job from the adapter and baton.

Cleans up all per-job state including active tasks.

Parameters:

Name Type Description Default
job_id str

The job to remove.

required
Source code in src/marianne/daemon/baton/adapter.py
def deregister_job(self, job_id: str) -> None:
    """Remove a job from the adapter and baton.

    Cleans up all per-job state including active tasks.

    Args:
        job_id: The job to remove.
    """
    # Cancel active musician tasks for this job
    keys_to_cancel = [
        key for key in self._active_tasks if key[0] == job_id
    ]
    for key in keys_to_cancel:
        task = self._active_tasks.pop(key)
        task.cancel()

    # Remove from baton
    self._baton.deregister_job(job_id)

    # Remove sheet mapping, renderer, cross-sheet config, and completion tracking
    self._job_sheets.pop(job_id, None)
    self._job_renderers.pop(job_id, None)
    self._job_cross_sheet.pop(job_id, None)
    self._completion_events.pop(job_id, None)
    self._completion_results.pop(job_id, None)

    # Clean up vestigial _synced_status entries (Phase 2: sync layer
    # removed but dict retained for compatibility). Defensive cleanup
    # prevents memory leaks if anything accidentally populates it.
    if hasattr(self, "_synced_status") and self._synced_status:
        self._synced_status = {
            k: v for k, v in self._synced_status.items() if k[0] != job_id
        }

    _logger.info("adapter.job_deregistered", extra={"job_id": job_id})
recover_job
recover_job(job_id, sheets, dependencies, checkpoint, *, max_cost_usd=None, max_retries=3, max_completion=5, escalation_enabled=False, self_healing_enabled=False, prompt_config=None, parallel_enabled=False, cross_sheet=None, pacing_seconds=0.0, live_sheets=None)

Recover a job from a checkpoint after conductor restart.

Rebuilds baton state from the persisted CheckpointState. Terminal sheets (completed, failed, skipped) keep their status. In-progress sheets are reset to PENDING because their musicians died when the conductor restarted. Attempt counts are preserved to avoid infinite retries.

Design invariant: Checkpoint is the source of truth. The baton rebuilds from checkpoint, not the reverse.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
sheets list[Sheet]

Sheet entities from build_sheets() — same config that produced the original job.

required
dependencies dict[int, list[int]]

Dependency graph {sheet_num: [dep_nums]}.

required
checkpoint CheckpointState

Persisted CheckpointState loaded from workspace.

required
max_cost_usd float | None

Optional per-job cost limit.

None
max_retries int

Max normal retry attempts per sheet.

3
max_completion int

Max completion mode attempts per sheet.

5
escalation_enabled bool

Enter fermata on exhaustion.

False
self_healing_enabled bool

Try self-healing on exhaustion.

False
prompt_config PromptConfig | None

Optional PromptConfig for prompt rendering.

None
parallel_enabled bool

Whether parallel execution is enabled.

False
cross_sheet CrossSheetConfig | None

Optional CrossSheetConfig for cross-sheet context (F-210).

None
Source code in src/marianne/daemon/baton/adapter.py
def recover_job(
    self,
    job_id: str,
    sheets: list[Sheet],
    dependencies: dict[int, list[int]],
    checkpoint: CheckpointState,
    *,
    max_cost_usd: float | None = None,
    max_retries: int = 3,
    max_completion: int = 5,
    escalation_enabled: bool = False,
    self_healing_enabled: bool = False,
    prompt_config: PromptConfig | None = None,
    parallel_enabled: bool = False,
    cross_sheet: CrossSheetConfig | None = None,
    pacing_seconds: float = 0.0,
    live_sheets: dict[int, SheetExecutionState] | None = None,
) -> None:
    """Recover a job from a checkpoint after conductor restart.

    Rebuilds baton state from the persisted CheckpointState. Terminal
    sheets (completed, failed, skipped) keep their status. In-progress
    sheets are reset to PENDING because their musicians died when the
    conductor restarted. Attempt counts are preserved to avoid infinite
    retries.

    Design invariant: Checkpoint is the source of truth. The baton
    rebuilds from checkpoint, not the reverse.

    Args:
        job_id: Unique job identifier.
        sheets: Sheet entities from build_sheets() — same config
            that produced the original job.
        dependencies: Dependency graph {sheet_num: [dep_nums]}.
        checkpoint: Persisted CheckpointState loaded from workspace.
        max_cost_usd: Optional per-job cost limit.
        max_retries: Max normal retry attempts per sheet.
        max_completion: Max completion mode attempts per sheet.
        escalation_enabled: Enter fermata on exhaustion.
        self_healing_enabled: Try self-healing on exhaustion.
        prompt_config: Optional PromptConfig for prompt rendering.
        parallel_enabled: Whether parallel execution is enabled.
        cross_sheet: Optional CrossSheetConfig for cross-sheet context (F-210).
    """
    # Store sheets for prompt rendering
    self._job_sheets[job_id] = {s.num: s for s in sheets}

    # Store cross-sheet config (F-210)
    if cross_sheet is not None:
        self._job_cross_sheet[job_id] = cross_sheet

    # Create PromptRenderer if config is available
    if prompt_config is not None:
        from marianne.daemon.baton.prompt import PromptRenderer

        total_sheets = len(sheets)
        total_stages = len({s.movement for s in sheets}) or 1
        self._job_renderers[job_id] = PromptRenderer(
            prompt_config=prompt_config,
            total_sheets=total_sheets,
            total_stages=total_stages,
            parallel_enabled=parallel_enabled,
        )

    # Create completion event
    self._completion_events[job_id] = asyncio.Event()

    # Build SheetExecutionState with recovered statuses and attempt counts
    states: dict[int, SheetExecutionState] = {}
    for sheet in sheets:
        cp_sheet = checkpoint.sheets.get(sheet.num)

        if cp_sheet is not None:
            # On restart, active/transient sheets reset to PENDING:
            # - in_progress/dispatched: musician was killed on restart
            # - waiting: rate limit timers lost on restart
            # - retry_scheduled: retry timers lost on restart
            # - fermata: re-evaluate escalation on restart
            # Terminal sheets (completed, failed, skipped, cancelled)
            # keep their status. Pending/ready stay as-is.
            _RESET_ON_RESTART = frozenset({
                BatonSheetStatus.IN_PROGRESS,
                BatonSheetStatus.DISPATCHED,
                BatonSheetStatus.WAITING,
                BatonSheetStatus.RETRY_SCHEDULED,
                BatonSheetStatus.FERMATA,
            })
            baton_status = (
                BatonSheetStatus.PENDING
                if cp_sheet.status in _RESET_ON_RESTART
                else cp_sheet.status
            )

            # Carry forward attempt counts to avoid infinite retries
            normal_attempts = cp_sheet.attempt_count
            completion_attempts = cp_sheet.completion_attempts
        else:
            # Sheet not in checkpoint — treat as fresh PENDING
            baton_status = BatonSheetStatus.PENDING
            normal_attempts = 0
            completion_attempts = 0

        # Phase 2: update the live SheetState in-place when available,
        # so the baton operates on the same objects as _live_states.
        if live_sheets is not None and sheet.num in live_sheets:
            state = live_sheets[sheet.num]
        else:
            raw_model = sheet.instrument_config.get("model")
            state = SheetExecutionState(
                sheet_num=sheet.num,
                instrument_name=sheet.instrument_name,
                model=str(raw_model) if raw_model is not None else None,
            )

        # Always populate instrument identity from the Sheet entity.
        # The checkpoint may have instrument_name=None for sheets that
        # were never dispatched (e.g., dependency-cascaded failures).
        state.instrument_name = sheet.instrument_name
        raw_model = sheet.instrument_config.get("model")
        if raw_model is not None:
            state.model = str(raw_model)
        state.max_retries = max_retries
        state.max_completion = max_completion
        state.fallback_chain = list(sheet.instrument_fallbacks)
        state.sheet_timeout_seconds = sheet.timeout_seconds
        state.status = baton_status
        state.normal_attempts = normal_attempts
        state.completion_attempts = completion_attempts
        # Reset instrument fallback position for non-terminal sheets.
        # A recovered/resumed sheet must start from the primary
        # instrument, not stay stuck on whatever fallback it died on.
        # Terminal sheets (COMPLETED, SKIPPED) keep their instrument
        # history for diagnostics.
        _TERMINAL = {
            BatonSheetStatus.COMPLETED,
            BatonSheetStatus.SKIPPED,
            BatonSheetStatus.CANCELLED,
        }
        if baton_status not in _TERMINAL:
            state.current_instrument_index = 0

        states[sheet.num] = state

    # Register with baton using the recovered states
    self._baton.register_job(
        job_id,
        states,
        dependencies,
        escalation_enabled=escalation_enabled,
        self_healing_enabled=self_healing_enabled,
        pacing_seconds=pacing_seconds,
    )

    # Set cost limits if configured
    if max_cost_usd is not None:
        self._baton.set_job_cost_limit(job_id, max_cost_usd)

    _logger.info(
        "adapter.job_recovered",
        extra={
            "job_id": job_id,
            "sheet_count": len(sheets),
            "recovered_terminal": sum(
                1 for s in states.values()
                if s.status in (
                    BatonSheetStatus.COMPLETED,
                    BatonSheetStatus.FAILED,
                    BatonSheetStatus.SKIPPED,
                )
            ),
            "recovered_pending": sum(
                1 for s in states.values()
                if s.status == BatonSheetStatus.PENDING
            ),
        },
    )

    # Kick the event loop so dispatch_ready runs for recovered sheets
    self._baton.inbox.put_nowait(DispatchRetry())
get_sheet
get_sheet(job_id, sheet_num)

Get a Sheet entity for a registered job.

Parameters:

Name Type Description Default
job_id str

The job identifier.

required
sheet_num int

The sheet number.

required

Returns:

Type Description
Sheet | None

The Sheet entity, or None if not found.

Source code in src/marianne/daemon/baton/adapter.py
def get_sheet(self, job_id: str, sheet_num: int) -> Sheet | None:
    """Get a Sheet entity for a registered job.

    Args:
        job_id: The job identifier.
        sheet_num: The sheet number.

    Returns:
        The Sheet entity, or None if not found.
    """
    job_sheets = self._job_sheets.get(job_id)
    if job_sheets is None:
        return None
    return job_sheets.get(sheet_num)
wait_for_completion async
wait_for_completion(job_id)

Wait until a job reaches terminal state.

Blocks until all sheets in the job are completed, failed, skipped, or cancelled. Used by the manager's _run_job_task to await baton execution.

Parameters:

Name Type Description Default
job_id str

The job to wait for.

required

Returns:

Type Description
bool

True if all sheets completed successfully, False if any failed.

Raises:

Type Description
KeyError

If the job is not registered.

Source code in src/marianne/daemon/baton/adapter.py
async def wait_for_completion(self, job_id: str) -> bool:
    """Wait until a job reaches terminal state.

    Blocks until all sheets in the job are completed, failed, skipped,
    or cancelled. Used by the manager's _run_job_task to await baton
    execution.

    Args:
        job_id: The job to wait for.

    Returns:
        True if all sheets completed successfully, False if any failed.

    Raises:
        KeyError: If the job is not registered.
    """
    event = self._completion_events.get(job_id)
    if event is None:
        raise KeyError(f"Job '{job_id}' is not registered with the adapter")
    await event.wait()
    return self._completion_results.get(job_id, False)
has_completed_sheets
has_completed_sheets(job_id)

Check if any sheet in the job reached COMPLETED status.

F-145: Used by the manager to set completed_new_work after baton execution. The zero-work guard for concert chaining needs to know whether any sheet completed new work — not just whether all sheets succeeded.

Parameters:

Name Type Description Default
job_id str

The job to check.

required

Returns:

Type Description
bool

True if at least one sheet completed, False otherwise.

Source code in src/marianne/daemon/baton/adapter.py
def has_completed_sheets(self, job_id: str) -> bool:
    """Check if any sheet in the job reached COMPLETED status.

    F-145: Used by the manager to set completed_new_work after baton
    execution. The zero-work guard for concert chaining needs to know
    whether any sheet completed new work — not just whether all
    sheets succeeded.

    Args:
        job_id: The job to check.

    Returns:
        True if at least one sheet completed, False otherwise.
    """
    job = self._baton._jobs.get(job_id)
    if job is None:
        return False
    return any(
        s.status == BatonSheetStatus.COMPLETED
        for s in job.sheets.values()
    )
clear_instrument_rate_limit
clear_instrument_rate_limit(instrument=None)

Clear instrument rate limit state in the baton core.

Delegates to BatonCore.clear_instrument_rate_limit(). Also moves WAITING sheets back to PENDING so they can be re-dispatched.

Parameters:

Name Type Description Default
instrument str | None

Instrument name to clear, or None for all.

None

Returns:

Type Description
int

Number of instruments whose rate limit was cleared.

Source code in src/marianne/daemon/baton/adapter.py
def clear_instrument_rate_limit(
    self,
    instrument: str | None = None,
) -> int:
    """Clear instrument rate limit state in the baton core.

    Delegates to ``BatonCore.clear_instrument_rate_limit()``.  Also
    moves WAITING sheets back to PENDING so they can be re-dispatched.

    Args:
        instrument: Instrument name to clear, or ``None`` for all.

    Returns:
        Number of instruments whose rate limit was cleared.
    """
    return self._baton.clear_instrument_rate_limit(instrument)
publish_sheet_skipped async
publish_sheet_skipped(event)

Publish a sheet skip event to the EventBus.

Parameters:

Name Type Description Default
event SheetSkipped

The sheet skip event.

required
Source code in src/marianne/daemon/baton/adapter.py
async def publish_sheet_skipped(self, event: SheetSkipped) -> None:
    """Publish a sheet skip event to the EventBus.

    Args:
        event: The sheet skip event.
    """
    if self._event_bus is None:
        return

    obs_event = skipped_to_observer_event(event)
    try:
        await self._event_bus.publish(obs_event)
    except Exception:
        _logger.warning(
            "adapter.event_publish_failed",
            extra={
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
            },
            exc_info=True,
        )
publish_job_event async
publish_job_event(job_id, event_name, data=None)

Publish a job-level event to the EventBus.

Parameters:

Name Type Description Default
job_id str

The job identifier.

required
event_name str

Event name (e.g., "job.started", "job.completed").

required
data dict[str, Any] | None

Optional event data.

None
Source code in src/marianne/daemon/baton/adapter.py
async def publish_job_event(
    self,
    job_id: str,
    event_name: str,
    data: dict[str, Any] | None = None,
) -> None:
    """Publish a job-level event to the EventBus.

    Args:
        job_id: The job identifier.
        event_name: Event name (e.g., "job.started", "job.completed").
        data: Optional event data.
    """
    if self._event_bus is None:
        return

    event: ObserverEvent = {
        "job_id": job_id,
        "sheet_num": 0,
        "event": event_name,
        "data": data or {},
        "timestamp": time.time(),
    }
    try:
        await self._event_bus.publish(event)
    except Exception:
        _logger.warning(
            "adapter.job_event_publish_failed",
            extra={"job_id": job_id, "event": event_name},
            exc_info=True,
        )
run async
run()

Run the baton's event loop with dispatch integration.

Processes events from the inbox, updates state, dispatches ready sheets, and publishes events to the EventBus.

Runs until the baton receives a ShutdownRequested event.

Source code in src/marianne/daemon/baton/adapter.py
async def run(self) -> None:
    """Run the baton's event loop with dispatch integration.

    Processes events from the inbox, updates state, dispatches
    ready sheets, and publishes events to the EventBus.

    Runs until the baton receives a ShutdownRequested event.
    """
    from marianne.daemon.baton.dispatch import dispatch_ready

    self._running = True
    _logger.info("adapter.started")

    # Start the timer wheel drain task — fires scheduled events
    # (rate limit expiry, retry delays) into the baton's inbox.
    # Wrapped in a restart loop so a crash doesn't silently kill
    # all timer-based recovery (rate limit expiry, retry backoff).
    async def _timer_with_restart() -> None:
        while not self._baton._shutting_down:
            try:
                await self._timer_wheel.run()
            except asyncio.CancelledError:
                raise
            except Exception:
                _logger.error(
                    "adapter.timer_wheel_crashed",
                    exc_info=True,
                )
                await asyncio.sleep(1.0)  # Brief pause before restart

    timer_task = asyncio.create_task(
        _timer_with_restart(), name="baton-timer-wheel"
    )

    try:
        # Initial dispatch cycle — pick up PENDING sheets from resume/restart
        # without waiting for the first event. Without this, sheets mapped
        # from WAITING→PENDING on restart sit idle until an unrelated event
        # (e.g., a completion from another job) triggers dispatch.
        config = self._baton.build_dispatch_config(
            max_concurrent_sheets=self._max_concurrent_sheets,
        )
        initial_result = await dispatch_ready(
            self._baton, config, self._dispatch_callback
        )
        if initial_result.dispatched_sheets:
            _logger.info(
                "adapter.initial_dispatch",
                extra={
                    "count": len(initial_result.dispatched_sheets),
                    "sheets": [
                        f"{jid}:{sn}"
                        for jid, sn in initial_result.dispatched_sheets
                    ],
                },
            )
            for d_job_id, _d_sheet_num in initial_result.dispatched_sheets:
                if self._persist_callback:
                    self._persist_callback(d_job_id)

        while not self._baton._shutting_down:
            event = await self._baton.inbox.get()

            _logger.debug(
                "adapter.event_loop.received",
                event_type=type(event).__name__,
                queue_size=self._baton.inbox.qsize(),
            )

            # Intercept StaleCheck: only fail if the musician task
            # is actually dead. If the task is alive, the sheet isn't
            # stale — the backend is still executing. Reschedule.
            if isinstance(event, StaleCheck):
                task_key = (event.job_id, event.sheet_num)
                task = self._active_tasks.get(task_key)
                if task is not None and not task.done():
                    # Task alive — not stale, reschedule in 60s
                    self._timer_wheel.schedule(
                        60.0,
                        StaleCheck(
                            job_id=event.job_id,
                            sheet_num=event.sheet_num,
                        ),
                    )
                    # Still pass to baton for logging
                    await self._baton.handle_event(event)
                else:
                    # Task dead with no result — actually stale
                    state = self._baton.get_sheet_state(
                        event.job_id, event.sheet_num,
                    )
                    if (
                        state is not None
                        and state.status == BatonSheetStatus.DISPATCHED
                    ):
                        _logger.warning(
                            "adapter.stale_check.task_dead",
                            extra={
                                "job_id": event.job_id,
                                "sheet_num": event.sheet_num,
                            },
                        )
                        # Inject synthetic failure
                        from marianne.daemon.baton.events import (
                            SheetAttemptResult as SAR,
                        )
                        self._baton.inbox.put_nowait(SAR(
                            job_id=event.job_id,
                            sheet_num=event.sheet_num,
                            instrument_name=state.instrument_name or "",
                            attempt=state.normal_attempts + 1,
                            execution_success=False,
                            error_classification="STALE",
                            error_message=(
                                f"Sheet {event.sheet_num}: musician task "
                                f"dead with no result reported"
                            ),
                        ))
                    await self._baton.handle_event(event)
            else:
                await self._baton.handle_event(event)

            # Phase 2: persist to registry if state changed.
            # The baton writes directly to SheetState objects in
            # _live_states — no sync needed. Just persist.
            if self._baton._state_dirty and self._persist_callback:
                self._persist_dirty_jobs()
                self._baton._state_dirty = False

            # Dispatch ready sheets after every event
            config = self._baton.build_dispatch_config(
                max_concurrent_sheets=self._max_concurrent_sheets,
            )
            dispatch_result = await dispatch_ready(
                self._baton, config, self._dispatch_callback
            )

            # Sync dispatched sheets so status display shows in_progress
            if dispatch_result.dispatched_sheets:
                _logger.info(
                    "adapter.dispatch_sync",
                    extra={
                        "count": len(dispatch_result.dispatched_sheets),
                        "sheets": [
                            f"{jid}:{sn}"
                            for jid, sn in dispatch_result.dispatched_sheets
                        ],
                    },
                )
            for d_job_id, d_sheet_num in dispatch_result.dispatched_sheets:
                # Persist dispatch state (sheet moved to DISPATCHED)
                if self._persist_callback:
                    self._persist_callback(d_job_id)
                # Schedule stale detection using per-sheet timeout.
                # If the sheet completes normally, the StaleCheck handler
                # finds it non-DISPATCHED and is a no-op.
                d_state = self._baton.get_sheet_state(d_job_id, d_sheet_num)
                stale_delay = (
                    getattr(d_state, "sheet_timeout_seconds", 1800.0)
                    if d_state else 1800.0
                ) + 60.0  # buffer beyond timeout
                self._timer_wheel.schedule(
                    stale_delay,
                    StaleCheck(job_id=d_job_id, sheet_num=d_sheet_num),
                )

            # Publish any fallback events to EventBus
            await self._publish_fallback_events()

            # Check for job completions after dispatch
            self._check_completions()

    except asyncio.CancelledError:
        _logger.info("adapter.cancelled")
        raise
    finally:
        timer_task.cancel()
        await self._timer_wheel.shutdown()
        self._running = False
        _logger.info("adapter.stopped")
shutdown async
shutdown()

Gracefully shut down the adapter.

Cancels all active musician tasks and closes the backend pool.

Source code in src/marianne/daemon/baton/adapter.py
async def shutdown(self) -> None:
    """Gracefully shut down the adapter.

    Cancels all active musician tasks and closes the backend pool.
    """
    # Cancel all active tasks
    for task in self._active_tasks.values():
        task.cancel()
    self._active_tasks.clear()

    # Close backend pool
    if self._backend_pool is not None:
        try:
            await self._backend_pool.close_all()
        except Exception:
            _logger.warning("adapter.pool_close_failed", exc_info=True)

    _logger.info("adapter.shutdown_complete")

BackendPool

BackendPool(registry, pgroup=None, keyring=None)

Manages Backend instances for per-sheet execution.

The baton acquires a backend before dispatching a sheet and releases it after the sheet completes (or fails). The pool enforces per-instrument concurrency by tracking in-flight instances.

Usage::

pool = BackendPool(registry)

# Dispatch a sheet
backend = await pool.acquire("claude-code", working_directory=ws)
try:
    result = await backend.execute(prompt)
finally:
    await pool.release("claude-code", backend)

# Job done
await pool.close_all()
Source code in src/marianne/daemon/baton/backend_pool.py
def __init__(
    self,
    registry: InstrumentRegistry,
    pgroup: ProcessGroupManager | None = None,
    keyring: ApiKeyKeyring | None = None,
) -> None:
    self._registry = registry
    self._pgroup = pgroup
    self._keyring = keyring

    # CLI instruments: free list per instrument name
    self._cli_free: dict[str, list[Backend]] = {}

    # HTTP instruments: singleton per instrument name
    self._http_singletons: dict[str, Backend] = {}

    # Tracking: how many backends are currently in-flight (acquired
    # but not yet released) per instrument.
    self._in_flight: dict[str, int] = {}

    # All backends ever created (for close_all cleanup)
    self._all_backends: list[Backend] = []

    # Protect concurrent acquire/release to avoid race conditions
    # on the free lists
    self._lock = asyncio.Lock()

    self._closed = False
Functions
acquire async
acquire(instrument_name, *, model=None, working_directory=None)

Acquire a Backend instance for an instrument.

For CLI instruments: returns a free instance if available, otherwise creates a new one. For HTTP instruments: returns the shared singleton (creating it on first call).

Parameters:

Name Type Description Default
instrument_name str

Name of the instrument (from registry).

required
model str | None

Optional model override for this execution.

None
working_directory Path | None

Working directory for the backend.

None

Returns:

Type Description
Backend

A Backend instance ready for execution.

Raises:

Type Description
ValueError

If the instrument is not registered.

RuntimeError

If the pool has been closed.

Source code in src/marianne/daemon/baton/backend_pool.py
async def acquire(
    self,
    instrument_name: str,
    *,
    model: str | None = None,
    working_directory: Path | None = None,
) -> Backend:
    """Acquire a Backend instance for an instrument.

    For CLI instruments: returns a free instance if available,
    otherwise creates a new one. For HTTP instruments: returns
    the shared singleton (creating it on first call).

    Args:
        instrument_name: Name of the instrument (from registry).
        model: Optional model override for this execution.
        working_directory: Working directory for the backend.

    Returns:
        A Backend instance ready for execution.

    Raises:
        ValueError: If the instrument is not registered.
        RuntimeError: If the pool has been closed.
    """
    if self._closed:
        msg = "BackendPool is closed — cannot acquire new backends"
        raise RuntimeError(msg)

    profile = self._registry.get(instrument_name)
    if profile is None:
        msg = (
            f"Instrument '{instrument_name}' not found in registry. "
            f"Available: {', '.join(p.name for p in self._registry.list_all())}"
        )
        raise ValueError(msg)

    # Resolve API key from keyring for HTTP instruments before acquiring lock.
    # Key is loaded from disk, used to configure the backend, then not stored.
    api_key: str | None = None
    if profile.kind == "http" and self._keyring is not None:
        if self._keyring.has_keys(instrument_name):
            try:
                api_key = await self._keyring.select_key(instrument_name)
            except (KeyError, FileNotFoundError, ValueError):
                _logger.warning(
                    "backend_pool.keyring_select_failed",
                    extra={"instrument": instrument_name},
                    exc_info=True,
                )

    async with self._lock:
        backend = self._acquire_locked(
            profile,
            model=model,
            working_directory=working_directory,
            api_key=api_key,
        )

    _logger.debug(
        "backend_pool.acquired",
        extra={
            "instrument": instrument_name,
            "in_flight": self._in_flight.get(instrument_name, 0),
            "model": model,
        },
    )
    return backend
release async
release(instrument_name, backend)

Release a Backend instance back to the pool.

For CLI instruments: the backend goes back to the free list for reuse. For HTTP instruments: no-op (the singleton stays active).

Parameters:

Name Type Description Default
instrument_name str

The instrument name used in acquire().

required
backend Backend

The Backend instance to release.

required
Source code in src/marianne/daemon/baton/backend_pool.py
async def release(
    self,
    instrument_name: str,
    backend: Backend,
) -> None:
    """Release a Backend instance back to the pool.

    For CLI instruments: the backend goes back to the free list for
    reuse. For HTTP instruments: no-op (the singleton stays active).

    Args:
        instrument_name: The instrument name used in ``acquire()``.
        backend: The Backend instance to release.
    """
    # Clear any per-sheet overrides (model, etc.) before returning
    # the backend to the free list. Without this, a model override from
    # sheet N would silently carry over to sheet N+1 that reuses the
    # same backend instance. This was F-150's secondary bug.
    backend.clear_overrides()

    async with self._lock:
        count = self._in_flight.get(instrument_name, 0)
        self._in_flight[instrument_name] = max(0, count - 1)

        profile = self._registry.get(instrument_name)
        if profile is not None and profile.kind == "cli":
            # Return CLI backend to free list for reuse
            if instrument_name not in self._cli_free:
                self._cli_free[instrument_name] = []
            self._cli_free[instrument_name].append(backend)

        # HTTP singletons are never "released" — they stay active

    _logger.debug(
        "backend_pool.released",
        extra={
            "instrument": instrument_name,
            "in_flight": self._in_flight.get(instrument_name, 0),
        },
    )
in_flight_count
in_flight_count(instrument_name)

How many backends are currently acquired for this instrument.

Used by the baton's dispatch logic to enforce per-instrument concurrency limits.

Source code in src/marianne/daemon/baton/backend_pool.py
def in_flight_count(self, instrument_name: str) -> int:
    """How many backends are currently acquired for this instrument.

    Used by the baton's dispatch logic to enforce per-instrument
    concurrency limits.
    """
    return self._in_flight.get(instrument_name, 0)
total_in_flight
total_in_flight()

Total backends in-flight across all instruments.

Source code in src/marianne/daemon/baton/backend_pool.py
def total_in_flight(self) -> int:
    """Total backends in-flight across all instruments."""
    return sum(self._in_flight.values())
close_all async
close_all()

Close all Backend instances and mark the pool as closed.

Called at job completion, cancellation, or conductor shutdown. After this call, acquire() raises RuntimeError.

Source code in src/marianne/daemon/baton/backend_pool.py
async def close_all(self) -> None:
    """Close all Backend instances and mark the pool as closed.

    Called at job completion, cancellation, or conductor shutdown.
    After this call, ``acquire()`` raises RuntimeError.
    """
    self._closed = True

    async with self._lock:
        for backend in self._all_backends:
            try:
                await backend.close()
            except Exception:
                _logger.warning(
                    "backend_pool.close_failed",
                    extra={"backend": backend.name},
                    exc_info=True,
                )

        self._cli_free.clear()
        self._http_singletons.clear()
        self._in_flight.clear()

    _logger.debug(
        "backend_pool.closed",
        extra={"total_backends": len(self._all_backends)},
    )

BatonCore

BatonCore(*, timer=None, inbox=None)

The baton's event-driven execution core.

Manages the event inbox, processes events, tracks sheet state across all jobs, resolves ready sheets, and coordinates dispatch.

The baton does NOT own backend execution — it decides WHEN to dispatch, not HOW. Sheet execution is delegated to the musician (via dispatch callbacks registered by the conductor).

Usage::

baton = BatonCore()
baton.register_job("j1", sheets, deps)

# Run the main loop (blocks until shutdown)
await baton.run()

# Or process events manually (for testing)
await baton.handle_event(some_event)

Initialize the baton core.

Parameters:

Name Type Description Default
timer Any | None

Optional TimerWheel for scheduling retry delays. When None, retries are set to RETRY_SCHEDULED without actual timer events (tests or manual event injection).

None
inbox Queue[BatonEvent] | None

Optional pre-created event queue. When provided, allows the caller to share the queue with other components (e.g., TimerWheel) before BatonCore is constructed. When None, a new queue is created.

None
Source code in src/marianne/daemon/baton/core.py
def __init__(
    self,
    *,
    timer: Any | None = None,
    inbox: asyncio.Queue[BatonEvent] | None = None,
) -> None:
    """Initialize the baton core.

    Args:
        timer: Optional TimerWheel for scheduling retry delays.
            When None, retries are set to RETRY_SCHEDULED without
            actual timer events (tests or manual event injection).
        inbox: Optional pre-created event queue. When provided, allows
            the caller to share the queue with other components (e.g.,
            TimerWheel) before BatonCore is constructed. When None,
            a new queue is created.
    """
    self._inbox: asyncio.Queue[BatonEvent] = inbox or asyncio.Queue()
    self._jobs: dict[str, _JobRecord] = {}
    self._instruments: dict[str, InstrumentState] = {}
    self._job_cost_limits: dict[str, float] = {}
    self._sheet_cost_limits: dict[tuple[str, int], float] = {}
    self._shutting_down = False
    self._running = False
    self._state_dirty = False
    self._timer = timer

    # Active rate-limit timer handles per instrument. When a new
    # RateLimitHit arrives for an instrument that already has a pending
    # timer, the old timer is cancelled before scheduling the new one.
    # Without this, stale timers fire prematurely and cause wasted
    # dispatch→rate_limit→WAITING cycles.
    self._rate_limit_timers: dict[str, Any] = {}

    # Active circuit breaker recovery timer handles per instrument.
    # When a circuit breaker trips OPEN, a recovery timer is scheduled.
    # On fire, the instrument transitions OPEN→HALF_OPEN for a probe.
    # GH#169: Without this, sheets blocked by all-OPEN fallback chains
    # stay PENDING forever.
    self._circuit_breaker_timers: dict[str, Any] = {}

    # Fallback event collection — side effects of event processing.
    # The adapter drains these after each event cycle and publishes
    # them to the EventBus for observability (dashboard, learning hub).
    self._fallback_events: list[InstrumentFallback] = []

    # Per-model concurrency limits from instrument profiles.
    # Keys: "instrument:model", values: max_concurrent.
    # Populated by set_model_concurrency() from instrument profiles.
    self._model_concurrency: dict[str, int] = {}

    # Retry backoff configuration (from RetryConfig defaults)
    self._base_retry_delay: float = 10.0
    self._retry_exponential_base: float = 2.0
    self._max_retry_delay: float = 3600.0
Attributes
inbox property
inbox

The event inbox — put events here for the baton to process.

is_running property
is_running

Whether the main loop is currently running.

job_count property
job_count

Number of registered jobs.

running_sheet_count property
running_sheet_count

Number of sheets currently in 'dispatched' status.

Functions
drain_fallback_events
drain_fallback_events()

Return and clear collected InstrumentFallback events.

Called by the adapter after each event cycle to publish fallback events to the EventBus for observability.

Source code in src/marianne/daemon/baton/core.py
def drain_fallback_events(self) -> list[InstrumentFallback]:
    """Return and clear collected InstrumentFallback events.

    Called by the adapter after each event cycle to publish
    fallback events to the EventBus for observability.
    """
    events = list(self._fallback_events)
    self._fallback_events.clear()
    return events
register_instrument
register_instrument(name, *, max_concurrent=4)

Register an instrument for tracking.

If already registered, returns the existing state (idempotent).

Parameters:

Name Type Description Default
name str

Instrument name (matches InstrumentProfile.name).

required
max_concurrent int

Maximum concurrent sheets on this instrument.

4

Returns:

Type Description
InstrumentState

The InstrumentState for the instrument.

Source code in src/marianne/daemon/baton/core.py
def register_instrument(self, name: str, *, max_concurrent: int = 4) -> InstrumentState:
    """Register an instrument for tracking.

    If already registered, returns the existing state (idempotent).

    Args:
        name: Instrument name (matches InstrumentProfile.name).
        max_concurrent: Maximum concurrent sheets on this instrument.

    Returns:
        The InstrumentState for the instrument.
    """
    if name in self._instruments:
        return self._instruments[name]

    state = InstrumentState(name=name, max_concurrent=max_concurrent)
    self._instruments[name] = state
    _logger.debug(
        "baton.instrument_registered",
        extra={"instrument": name, "max_concurrent": max_concurrent},
    )
    return state
set_model_concurrency
set_model_concurrency(instrument, model, max_concurrent)

Set per-model concurrency limit from instrument profile data.

Called during adapter initialization from loaded InstrumentProfiles.

Source code in src/marianne/daemon/baton/core.py
def set_model_concurrency(
    self,
    instrument: str,
    model: str,
    max_concurrent: int,
) -> None:
    """Set per-model concurrency limit from instrument profile data.

    Called during adapter initialization from loaded InstrumentProfiles.
    """
    key = f"{instrument}:{model}"
    self._model_concurrency[key] = max_concurrent
get_instrument_state
get_instrument_state(name)

Get the tracking state for a specific instrument.

Source code in src/marianne/daemon/baton/core.py
def get_instrument_state(self, name: str) -> InstrumentState | None:
    """Get the tracking state for a specific instrument."""
    return self._instruments.get(name)
build_dispatch_config
build_dispatch_config(*, max_concurrent_sheets=10)

Build a DispatchConfig from the current instrument state.

This bridges the gap between the baton's instrument tracking and the dispatch logic's configuration needs. Called before each dispatch cycle.

Parameters:

Name Type Description Default
max_concurrent_sheets int

Global concurrency ceiling.

10

Returns:

Type Description
DispatchConfig

DispatchConfig with rate-limited instruments, open circuit

DispatchConfig

breakers, and per-instrument concurrency limits derived

DispatchConfig

from the current InstrumentState.

Source code in src/marianne/daemon/baton/core.py
def build_dispatch_config(self, *, max_concurrent_sheets: int = 10) -> DispatchConfig:
    """Build a DispatchConfig from the current instrument state.

    This bridges the gap between the baton's instrument tracking
    and the dispatch logic's configuration needs. Called before
    each dispatch cycle.

    Args:
        max_concurrent_sheets: Global concurrency ceiling.

    Returns:
        DispatchConfig with rate-limited instruments, open circuit
        breakers, and per-instrument concurrency limits derived
        from the current InstrumentState.
    """
    # Deferred import to break circular dependency
    # (dispatch.py imports BatonCore at runtime)
    from marianne.daemon.baton.dispatch import DispatchConfig  # noqa: N814

    rate_limited: set[str] = set()
    open_breakers: set[str] = set()
    concurrency: dict[str, int] = {}

    for name, inst in self._instruments.items():
        if inst.rate_limited:
            rate_limited.add(name)
        if inst.circuit_breaker == CircuitBreakerState.OPEN:
            open_breakers.add(name)
        concurrency[name] = inst.max_concurrent

    return DispatchConfig(
        max_concurrent_sheets=max_concurrent_sheets,
        instrument_concurrency=concurrency,
        model_concurrency=dict(self._model_concurrency),
        rate_limited_instruments=rate_limited,
        open_circuit_breakers=open_breakers,
    )
set_job_cost_limit
set_job_cost_limit(job_id, max_cost_usd)

Set a per-job cost limit. The baton pauses the job when exceeded.

Parameters:

Name Type Description Default
job_id str

The job to set the limit for.

required
max_cost_usd float

Maximum total cost in USD.

required
Source code in src/marianne/daemon/baton/core.py
def set_job_cost_limit(self, job_id: str, max_cost_usd: float) -> None:
    """Set a per-job cost limit. The baton pauses the job when exceeded.

    Args:
        job_id: The job to set the limit for.
        max_cost_usd: Maximum total cost in USD.
    """
    self._job_cost_limits[job_id] = max_cost_usd
get_rate_limited_instruments
get_rate_limited_instruments()

Get the set of currently rate-limited instrument names.

Used by dispatch logic to skip rate-limited instruments.

Source code in src/marianne/daemon/baton/core.py
def get_rate_limited_instruments(self) -> set[str]:
    """Get the set of currently rate-limited instrument names.

    Used by dispatch logic to skip rate-limited instruments.
    """
    return {name for name, inst in self._instruments.items() if inst.rate_limited}
clear_instrument_rate_limit
clear_instrument_rate_limit(instrument=None)

Clear rate limit state on one or all instruments.

Resets rate_limited to False and rate_limit_expires_at to None. Also moves any WAITING sheets on the cleared instrument(s) back to PENDING so they can be re-dispatched.

Parameters:

Name Type Description Default
instrument str | None

Instrument name to clear, or None for all.

None

Returns:

Type Description
int

Number of instruments whose rate limit was cleared.

Source code in src/marianne/daemon/baton/core.py
def clear_instrument_rate_limit(
    self,
    instrument: str | None = None,
) -> int:
    """Clear rate limit state on one or all instruments.

    Resets ``rate_limited`` to ``False`` and ``rate_limit_expires_at``
    to ``None``.  Also moves any WAITING sheets on the cleared
    instrument(s) back to PENDING so they can be re-dispatched.

    Args:
        instrument: Instrument name to clear, or ``None`` for all.

    Returns:
        Number of instruments whose rate limit was cleared.
    """
    cleared = 0
    if instrument is not None:
        # Specific instrument — look it up. If not found, targets is
        # empty (return 0). Previously, missing instruments fell through
        # to clear-all — F-200 bug found by Breakpoint M3.
        # Uses `is not None` (not truthiness) to prevent empty string
        # from falling through to clear-all — F-201 (same bug class).
        inst = self._instruments.get(instrument)
        targets = [inst] if inst is not None else []
    else:
        # None → clear all instruments
        targets = list(self._instruments.values())
    for inst in targets:
        if inst.rate_limited:
            inst.rate_limited = False
            inst.rate_limit_expires_at = None
            cleared += 1
            # Move WAITING sheets on this instrument back to PENDING
            for job in self._jobs.values():
                for sheet in job.sheets.values():
                    if (
                        sheet.status == BatonSheetStatus.WAITING
                        and sheet.instrument_name == inst.name
                    ):
                        sheet.status = BatonSheetStatus.PENDING
    if cleared > 0:
        self._state_dirty = True
    return cleared
get_open_circuit_breakers
get_open_circuit_breakers()

Get the set of instruments with open circuit breakers.

Used by dispatch logic to skip unhealthy instruments.

Source code in src/marianne/daemon/baton/core.py
def get_open_circuit_breakers(self) -> set[str]:
    """Get the set of instruments with open circuit breakers.

    Used by dispatch logic to skip unhealthy instruments.
    """
    return {
        name
        for name, inst in self._instruments.items()
        if inst.circuit_breaker == CircuitBreakerState.OPEN
    }
set_sheet_cost_limit
set_sheet_cost_limit(job_id, sheet_num, max_cost_usd)

Set a per-sheet cost limit. The baton fails the sheet when exceeded.

Parameters:

Name Type Description Default
job_id str

The job containing the sheet.

required
sheet_num int

The sheet number.

required
max_cost_usd float

Maximum cost in USD for this sheet.

required
Source code in src/marianne/daemon/baton/core.py
def set_sheet_cost_limit(self, job_id: str, sheet_num: int, max_cost_usd: float) -> None:
    """Set a per-sheet cost limit. The baton fails the sheet when exceeded.

    Args:
        job_id: The job containing the sheet.
        sheet_num: The sheet number.
        max_cost_usd: Maximum cost in USD for this sheet.
    """
    self._sheet_cost_limits[(job_id, sheet_num)] = max_cost_usd
calculate_retry_delay
calculate_retry_delay(attempt)

Calculate retry delay using exponential backoff.

Parameters:

Name Type Description Default
attempt int

0-based attempt index (0 = first retry).

required

Returns:

Type Description
float

Delay in seconds, clamped to _max_retry_delay.

Source code in src/marianne/daemon/baton/core.py
def calculate_retry_delay(self, attempt: int) -> float:
    """Calculate retry delay using exponential backoff.

    Args:
        attempt: 0-based attempt index (0 = first retry).

    Returns:
        Delay in seconds, clamped to ``_max_retry_delay``.
    """
    delay = self._base_retry_delay * (self._retry_exponential_base**attempt)
    return min(delay, self._max_retry_delay)
register_job
register_job(job_id, sheets, dependencies, *, escalation_enabled=False, self_healing_enabled=False, pacing_seconds=0.0)

Register a job's sheets with the baton for scheduling.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
sheets dict[int, SheetExecutionState]

Map of sheet_num → SheetExecutionState.

required
dependencies dict[int, list[int]]

Map of sheet_num → list of dependency sheet_nums. Sheets not in this map have no dependencies.

required
escalation_enabled bool

Whether to enter fermata on exhaustion.

False
self_healing_enabled bool

Whether to try healing on exhaustion.

False
pacing_seconds float

Inter-sheet delay after each completion (from pause_between_sheets_seconds). 0 = no delay.

0.0
Source code in src/marianne/daemon/baton/core.py
def register_job(
    self,
    job_id: str,
    sheets: dict[int, SheetExecutionState],
    dependencies: dict[int, list[int]],
    *,
    escalation_enabled: bool = False,
    self_healing_enabled: bool = False,
    pacing_seconds: float = 0.0,
) -> None:
    """Register a job's sheets with the baton for scheduling.

    Args:
        job_id: Unique job identifier.
        sheets: Map of sheet_num → SheetExecutionState.
        dependencies: Map of sheet_num → list of dependency sheet_nums.
            Sheets not in this map have no dependencies.
        escalation_enabled: Whether to enter fermata on exhaustion.
        self_healing_enabled: Whether to try healing on exhaustion.
        pacing_seconds: Inter-sheet delay after each completion (from
            ``pause_between_sheets_seconds``). 0 = no delay.
    """
    if job_id in self._jobs:
        _logger.warning(
            "baton.register_job.duplicate",
            extra={"job_id": job_id},
        )
        return

    # Auto-register any instruments used by the job's sheets
    self._auto_register_instruments(sheets)

    self._jobs[job_id] = _JobRecord(
        job_id=job_id,
        sheets=sheets,
        dependencies=dependencies,
        escalation_enabled=escalation_enabled,
        self_healing_enabled=self_healing_enabled,
        pacing_seconds=pacing_seconds,
    )
    self._state_dirty = True

    # F-440: Re-propagate failure for any sheet that's already FAILED.
    # During normal execution, _propagate_failure_to_dependents() cascades
    # failure to downstream sheets. But _sync_sheet_status() only fires
    # for SheetAttemptResult/SheetSkipped events, so cascaded failures
    # are NOT synced to the checkpoint. On restart recovery, dependents
    # revert to PENDING while their upstream is FAILED → zombie job.
    # Re-running propagation here is idempotent (only touches non-terminal
    # sheets) and fixes the sync gap for both fresh registration and
    # recovery.
    for sheet_num, sheet in sheets.items():
        if sheet.status == BatonSheetStatus.FAILED:
            self._propagate_failure_to_dependents(job_id, sheet_num)

    _logger.info(
        "baton.job_registered",
        extra={
            "job_id": job_id,
            "sheet_count": len(sheets),
            "dependency_count": len(dependencies),
        },
    )
deregister_job
deregister_job(job_id)

Remove a job from the baton's tracking.

Cleans up all per-job state including cost limit entries to prevent memory leaks in long-running conductors (F-062).

Source code in src/marianne/daemon/baton/core.py
def deregister_job(self, job_id: str) -> None:
    """Remove a job from the baton's tracking.

    Cleans up all per-job state including cost limit entries to
    prevent memory leaks in long-running conductors (F-062).
    """
    if job_id in self._jobs:
        del self._jobs[job_id]
        # F-062: Clean up cost limit dicts to prevent memory leaks
        self._job_cost_limits.pop(job_id, None)
        # Remove sheet cost limits for this job
        sheet_keys_to_remove = [key for key in self._sheet_cost_limits if key[0] == job_id]
        for key in sheet_keys_to_remove:
            del self._sheet_cost_limits[key]
        self._state_dirty = True
        _logger.info("baton.job_deregistered", extra={"job_id": job_id})
get_sheet_state
get_sheet_state(job_id, sheet_num)

Get the scheduling state for a specific sheet.

Source code in src/marianne/daemon/baton/core.py
def get_sheet_state(self, job_id: str, sheet_num: int) -> SheetExecutionState | None:
    """Get the scheduling state for a specific sheet."""
    job = self._jobs.get(job_id)
    if job is None:
        return None
    return job.sheets.get(sheet_num)
is_job_paused
is_job_paused(job_id)

Check if a job's dispatch is paused.

Source code in src/marianne/daemon/baton/core.py
def is_job_paused(self, job_id: str) -> bool:
    """Check if a job's dispatch is paused."""
    job = self._jobs.get(job_id)
    return job.paused if job is not None else False
is_job_complete
is_job_complete(job_id)

Check if all sheets in a job are in terminal state.

Source code in src/marianne/daemon/baton/core.py
def is_job_complete(self, job_id: str) -> bool:
    """Check if all sheets in a job are in terminal state."""
    job = self._jobs.get(job_id)
    if job is None:
        return False
    return all(sheet.status in _TERMINAL_BATON_STATUSES for sheet in job.sheets.values())
get_ready_sheets
get_ready_sheets(job_id)

Find sheets that are ready to dispatch.

A sheet is ready when: 1. Status is 'pending' or 'ready' 2. All dependencies are satisfied (completed or skipped) 3. The job is not paused

Source code in src/marianne/daemon/baton/core.py
def get_ready_sheets(self, job_id: str) -> list[SheetExecutionState]:
    """Find sheets that are ready to dispatch.

    A sheet is ready when:
    1. Status is 'pending' or 'ready'
    2. All dependencies are satisfied (completed or skipped)
    3. The job is not paused
    """
    job = self._jobs.get(job_id)
    if job is None or job.paused or job.pacing_active:
        return []

    ready: list[SheetExecutionState] = []
    for sheet_num, sheet in job.sheets.items():
        if sheet.status not in _DISPATCHABLE_BATON_STATUSES:
            continue

        # Check dependencies
        deps = job.dependencies.get(sheet_num, [])
        deps_satisfied = all(self._is_dependency_satisfied(job, dep) for dep in deps)
        if deps_satisfied:
            ready.append(sheet)

    return ready
handle_event async
handle_event(event)

Process a single event. Updates state but does NOT dispatch.

This is the core decision-making method. Each event type has a specific handler that updates sheet state.

Per the baton spec: handler exceptions are logged, not re-raised. The baton continues processing subsequent events.

Source code in src/marianne/daemon/baton/core.py
async def handle_event(self, event: BatonEvent) -> None:
    """Process a single event. Updates state but does NOT dispatch.

    This is the core decision-making method. Each event type has
    a specific handler that updates sheet state.

    Per the baton spec: handler exceptions are logged, not re-raised.
    The baton continues processing subsequent events.
    """
    try:
        match event:
            # === Musician events ===
            case SheetAttemptResult():
                self._handle_attempt_result(event)

            case SheetSkipped():
                self._handle_sheet_skipped(event)

            case SheetDispatched():
                self._handle_sheet_dispatched(event)

            # === Rate limit events ===
            case RateLimitHit():
                self._handle_rate_limit_hit(event)

            case RateLimitExpired():
                self._handle_rate_limit_expired(event)

            # === Timer events ===
            case RetryDue():
                self._handle_retry_due(event)

            case StaleCheck():
                self._handle_stale_check(event)

            case CronTick():
                _logger.warning(
                    "baton.event.unimplemented",
                    extra={"event_type": "CronTick"},
                )

            case JobTimeout():
                self._handle_job_timeout(event)

            case PacingComplete():
                self._handle_pacing_complete(event)

            # === Escalation events ===
            case EscalationNeeded():
                self._handle_escalation_needed(event)

            case EscalationResolved():
                self._handle_escalation_resolved(event)

            case EscalationTimeout():
                self._handle_escalation_timeout(event)

            # === External command events ===
            case PauseJob():
                self._handle_pause_job(event)

            case ResumeJob():
                self._handle_resume_job(event)

            case CancelJob():
                self._handle_cancel_job(event)

            case ConfigReloaded():
                _logger.warning(
                    "baton.event.unimplemented",
                    extra={"event_type": "ConfigReloaded"},
                )

            case ShutdownRequested():
                self._handle_shutdown(event)

            # === Observer events ===
            case ProcessExited():
                self._handle_process_exited(event)

            case ResourceAnomaly():
                self._handle_resource_anomaly(event)

            # === Instrument fallback events ===
            case InstrumentFallback():
                # InstrumentFallback events are emitted by the baton
                # itself, not received from external sources. They pass
                # through the event bus for observability (dashboard,
                # learning hub, notifications). No handler needed.
                pass

            # === Internal events ===
            case DispatchRetry():
                pass  # Dispatch retry — _dispatch_ready handles this

            case CircuitBreakerRecovery():
                self._handle_circuit_breaker_recovery(event)

            case _:
                _logger.warning(
                    "baton.unknown_event",
                    extra={"event_type": type(event).__name__},
                )

    except Exception:
        _logger.error(
            "baton.event_handler_failed",
            extra={"event_type": type(event).__name__},
            exc_info=True,
        )
run async
run()

The baton's main event loop.

Processes events from the inbox, updates state, and dispatches ready sheets. Runs until a ShutdownRequested event is received.

Source code in src/marianne/daemon/baton/core.py
async def run(self) -> None:
    """The baton's main event loop.

    Processes events from the inbox, updates state, and dispatches
    ready sheets. Runs until a ShutdownRequested event is received.
    """
    self._running = True
    _logger.info("baton.started")

    try:
        while not self._shutting_down:
            event = await self._inbox.get()
            await self.handle_event(event)
            # dispatch_ready() would be called here when wired to
            # conductor
    except asyncio.CancelledError:
        _logger.info("baton.cancelled")
        raise
    finally:
        self._running = False
        _logger.info("baton.stopped")
get_diagnostics
get_diagnostics(job_id)

Get diagnostic information for a job.

Returns a dict with sheet counts by status, instrument state, and other debugging information. Returns None if job not found.

Source code in src/marianne/daemon/baton/core.py
def get_diagnostics(self, job_id: str) -> dict[str, Any] | None:
    """Get diagnostic information for a job.

    Returns a dict with sheet counts by status, instrument state,
    and other debugging information. Returns None if job not found.
    """
    job = self._jobs.get(job_id)
    if job is None:
        return None

    status_counts: dict[str, int] = {}
    instruments_used: set[str] = set()
    for sheet in job.sheets.values():
        status_key = sheet.status.value
        status_counts[status_key] = status_counts.get(status_key, 0) + 1
        if sheet.instrument_name:
            instruments_used.add(sheet.instrument_name)

    return {
        "job_id": job_id,
        "paused": job.paused,
        "sheets": {
            "total": len(job.sheets),
            **status_counts,
        },
        "instruments_used": sorted(instruments_used),
    }

CancelJob dataclass

CancelJob(job_id, timestamp=time())

Cancel all sheets for a job and deregister it from the baton.

In-flight sheet tasks are cancelled. The job is marked as cancelled in CheckpointState.

CircuitBreakerRecovery dataclass

CircuitBreakerRecovery(instrument, timestamp=time())

Timer fired — check if a circuit-broken instrument can accept a probe.

When a circuit breaker trips OPEN, the baton schedules this event via the timer wheel. On fire, the instrument transitions from OPEN to HALF_OPEN, allowing one probe request through. The next dispatch cycle picks up any PENDING sheets blocked by the dead-ended fallback chain.

If the probe succeeds, the breaker closes. If it fails, the breaker reopens and a new recovery timer is scheduled with increased backoff.

GH#169: Without this, sheets whose entire fallback chain is circuit-broken stay PENDING forever — the score appears RUNNING but is dead.

ConfigReloaded dataclass

ConfigReloaded(job_id, new_config, timestamp=time())

Config has changed for a job (SIGHUP, mzt modify, resume -c).

The baton rebuilds pending sheets from the new config. Completed sheets are preserved. Cost limits may be reset if they changed.

CronTick dataclass

CronTick(entry_name, score_path, timestamp=time())

Timer fired — a cron-scheduled job should be submitted.

The baton submits the configured score as a new job and schedules the next tick. If a previous run is still active, this tick is skipped.

DispatchRetry dataclass

DispatchRetry(timestamp=time())

Internal signal to retry dispatch after a backpressure delay.

When the baton encounters backpressure during dispatch, it schedules this event via the timer wheel rather than blocking.

EscalationNeeded dataclass

EscalationNeeded(job_id, sheet_num, reason, options=list(), timestamp=time())

A sheet execution requires composer judgment — enter fermata.

The baton pauses the job's dispatch and notifies the composer (human or AI) via configured channels. A timeout timer is scheduled.

EscalationResolved dataclass

EscalationResolved(job_id, sheet_num, decision, timestamp=time())

The composer has made a decision on a fermata.

The baton applies the decision and resumes dispatching for the job. Arrives via IPC (job.resolve_escalation method).

EscalationTimeout dataclass

EscalationTimeout(job_id, sheet_num, timestamp=time())

Timer fired — no escalation response received within the deadline.

The baton defaults to the safe action: fail the sheet (not the job) and resume dispatching for other sheets.

JobTimeout dataclass

JobTimeout(job_id, timestamp=time())

Timer fired — a job has exceeded its wall-clock time limit.

The baton cancels all remaining sheets for this job.

PacingComplete dataclass

PacingComplete(job_id, timestamp=time())

Timer fired — the inter-sheet pacing delay for a job has elapsed.

The baton clears the pacing flag, allowing the next sheet to dispatch. Implements pause_between_sheets_seconds from score config.

PauseJob dataclass

PauseJob(job_id, timestamp=time())

Pause dispatching for a job. In-flight sheets continue to completion.

No new sheets are dispatched until ResumeJob is received. Retry timers are preserved — when resumed, scheduled retries fire normally.

ProcessExited dataclass

ProcessExited(job_id, sheet_num, pid, exit_code=None, timestamp=time())

Observer detected that a backend process died unexpectedly.

The baton checks if this was a sheet's backend process and, if so, marks the sheet as crashed — faster than waiting for timeout.

RateLimitExpired dataclass

RateLimitExpired(instrument, timestamp=time())

Timer fired — check if the rate-limited instrument is available again.

If the instrument is still unavailable, the baton schedules another timer. If available, sheets waiting on this instrument become ready.

RateLimitHit dataclass

RateLimitHit(instrument, wait_seconds, job_id, sheet_num, model=None, timestamp=time())

An instrument hit a rate limit. NOT a failure — a tempo change.

The baton marks the instrument as rate-limited and schedules a timer for recovery. Sheets targeting this instrument move to waiting state. Other instruments are completely unaffected.

ResourceAnomaly dataclass

ResourceAnomaly(severity, metric, value, timestamp=time())

Observer/monitor detected a resource pressure event.

Critical severity triggers backpressure — the baton stops dispatching new sheets and lets running sheets drain.

ResumeJob dataclass

ResumeJob(job_id, new_config=None, timestamp=time())

Resume dispatching for a paused job, optionally with new config.

When new_config is provided, pending sheets are rebuilt from the new config. Completed sheets are preserved. Failed sheets being retried use the new config for the retry.

RetryDue dataclass

RetryDue(job_id, sheet_num, timestamp=time())

Timer fired — a previously failed sheet is ready for retry.

The baton moves the sheet from retry-scheduled to ready state. The next dispatch cycle will pick it up.

SheetAttemptResult dataclass

SheetAttemptResult(job_id, sheet_num, instrument_name, attempt, execution_success=True, exit_code=None, duration_seconds=0.0, validations_passed=0, validations_total=0, validation_pass_rate=0.0, validation_details=None, error_classification=None, error_message=None, rate_limited=False, rate_limit_wait_seconds=None, cost_usd=0.0, input_tokens=0, output_tokens=0, model_used=None, stdout_tail='', stderr_tail='', timestamp=time())

A musician reports the result of a single sheet attempt.

This is the central event in the baton's event loop. The musician plays once and reports in full detail. The conductor (baton) decides what happens next — retry, completion mode, healing, escalation, or accept.

Rate limits are NOT failures. When rate_limited is True, the baton re-queues the sheet for when the instrument recovers. No retry budget is consumed.

Attributes
validation_pass_rate class-attribute instance-attribute
validation_pass_rate = 0.0

Percentage of validations that passed (0.0-100.0).

CRITICAL CONTRACT (F-018): Set to 100.0 when execution succeeds with no validation rules OR when all validations pass. The baton treats the default (0.0) as "all validations failed" and will retry.

A musician that reports execution_success=True with validations_total=0 but leaves this at 0.0 will trigger unnecessary retries until max_retries is exhausted.

rate_limit_wait_seconds class-attribute instance-attribute
rate_limit_wait_seconds = None

Parsed wait duration from the API's rate limit error message.

When set, the baton uses this instead of the default 60s for scheduling the recovery timer. This is the actual duration the API told us to wait.

SheetSkipped dataclass

SheetSkipped(job_id, sheet_num, reason, timestamp=time())

A sheet was skipped due to skip_when condition or start_sheet override.

The baton propagates skip state to dependents — downstream sheets that depend on a skipped sheet receive a skip sentinel, not empty string.

ShutdownRequested dataclass

ShutdownRequested(graceful=True, timestamp=time())

The conductor is shutting down (SIGTERM, mzt stop).

When graceful is True, the baton waits for in-flight sheets to complete (up to the configured drain timeout) before stopping. When False, sheets are cancelled immediately.

StaleCheck dataclass

StaleCheck(job_id, sheet_num, timestamp=time())

Timer fired — check if a running sheet has gone stale.

If no output progress has been received within the configured idle timeout, the baton kills the stale sheet and reschedules or fails it.

PromptRenderer

PromptRenderer(prompt_config, total_sheets, total_stages, parallel_enabled)

Renders full prompts for baton musicians.

Bridges the PromptBuilder pipeline with the baton's Sheet-based execution model. Handles template rendering, injection resolution, preamble assembly, and validation requirements — everything the old runner's context mixin did, but without the runner dependency.

Create one per job (or share across jobs with the same config). Call render() per sheet dispatch.

Parameters:

Name Type Description Default
prompt_config PromptConfig

The job's PromptConfig (template, variables, etc.).

required
total_sheets int

Total sheets in the job (for preamble).

required
total_stages int

Total stages before fan-out expansion (for template aliases).

required
parallel_enabled bool

Whether parallel execution is enabled (for preamble).

required
Source code in src/marianne/daemon/baton/prompt.py
def __init__(
    self,
    prompt_config: PromptConfig,
    total_sheets: int,
    total_stages: int,
    parallel_enabled: bool,
) -> None:
    self._prompt_config = prompt_config
    self._total_sheets = total_sheets
    self._total_stages = total_stages
    self._parallel_enabled = parallel_enabled
Functions
render
render(sheet, attempt_context, *, patterns=None, failure_history=None, spec_fragments=None)

Render a full prompt for a sheet execution.

Performs the complete 9-layer prompt assembly: template rendering -> injection resolution -> optional layers -> validation requirements -> completion suffix.

Parameters:

Name Type Description Default
sheet Sheet

The Sheet entity to render for.

required
attempt_context AttemptContext

Context from the baton (attempt number, mode).

required
patterns list[str] | None

Optional learned pattern descriptions to inject.

None
failure_history list[HistoricalFailure] | None

Optional historical failures from previous sheets.

None
spec_fragments list[SpecFragment] | None

Optional spec corpus fragments to inject.

None

Returns:

Type Description
RenderedPrompt

RenderedPrompt with fully rendered prompt and preamble.

Source code in src/marianne/daemon/baton/prompt.py
def render(
    self,
    sheet: Sheet,
    attempt_context: AttemptContext,
    *,
    patterns: list[str] | None = None,
    failure_history: list[HistoricalFailure] | None = None,
    spec_fragments: list[SpecFragment] | None = None,
) -> RenderedPrompt:
    """Render a full prompt for a sheet execution.

    Performs the complete 9-layer prompt assembly:
    template rendering -> injection resolution -> optional layers ->
    validation requirements -> completion suffix.

    Args:
        sheet: The Sheet entity to render for.
        attempt_context: Context from the baton (attempt number, mode).
        patterns: Optional learned pattern descriptions to inject.
        failure_history: Optional historical failures from previous sheets.
        spec_fragments: Optional spec corpus fragments to inject.

    Returns:
        RenderedPrompt with fully rendered prompt and preamble.
    """
    # Layer 1: Build SheetContext from Sheet entity (F-210: includes cross-sheet)
    context = self._build_context(sheet, attempt_context)

    # Layer 2-3: Resolve prelude/cadenza injections into the context
    self._resolve_injections(context, sheet)

    # Layer 4-8: Build prompt through PromptBuilder
    prompt = self._build_prompt(
        sheet, context, patterns, failure_history, spec_fragments
    )

    # Layer 9: Completion mode suffix
    if attempt_context.completion_prompt_suffix:
        prompt = f"{prompt}\n\n{attempt_context.completion_prompt_suffix}"

    # Preamble: positional identity + retry status
    retry_count = max(0, attempt_context.attempt_number - 1)
    preamble = build_preamble(
        sheet_num=sheet.num,
        total_sheets=self._total_sheets,
        workspace=sheet.workspace,
        retry_count=retry_count,
        is_parallel=self._parallel_enabled,
    )

    return RenderedPrompt(prompt=prompt, preamble=preamble)

RenderedPrompt dataclass

RenderedPrompt(prompt, preamble)

Output of the prompt rendering pipeline.

Contains both the rendered prompt (for backend execution) and the preamble (set on the backend before execution). Separating them allows the musician to configure the backend correctly.

Attributes
prompt instance-attribute
prompt

Fully rendered prompt with all injections and context layers.

preamble instance-attribute
preamble

Dynamic preamble with positional identity and retry status.

AttemptContext dataclass

AttemptContext(attempt_number, mode, completion_prompt_suffix=None, healing_context=None, previous_results=None, learned_patterns=None, total_sheets=1, total_movements=1, previous_outputs=dict(), previous_files=dict())

Context provided by the conductor to the musician for a single attempt.

Each dispatch carries this context so the musician knows: - Which attempt this is (1 = first try, 2+ = retry) - What mode to operate in (normal/completion/healing) - Any extra context for non-normal modes - Learned patterns from the learning store (instrument-scoped) - Previous attempt results (for failure history injection)

Attributes
attempt_number instance-attribute
attempt_number

1-based attempt number. First try = 1, first retry = 2, etc.

mode instance-attribute
mode

The execution mode for this attempt.

completion_prompt_suffix class-attribute instance-attribute
completion_prompt_suffix = None

For completion mode: appended to the prompt to fix partial failures.

healing_context class-attribute instance-attribute
healing_context = None

For healing mode: diagnostic context from self-healing analysis.

previous_results class-attribute instance-attribute
previous_results = None

Previous attempt results for failure history injection into prompts.

learned_patterns class-attribute instance-attribute
learned_patterns = None

Patterns from the learning store, scoped to this instrument.

total_sheets class-attribute instance-attribute
total_sheets = 1

Total concrete sheet count in the job (for preamble and template vars).

total_movements class-attribute instance-attribute
total_movements = 1

Total movement count in the job (for template vars).

previous_outputs class-attribute instance-attribute
previous_outputs = field(default_factory=dict)

Stdout outputs from completed sheets. Keys are sheet numbers (1-indexed). Populated by the adapter from completed sheet attempt results.

previous_files class-attribute instance-attribute
previous_files = field(default_factory=dict)

File contents captured via capture_files patterns. Keys are file paths. Populated by the adapter from workspace files matching CrossSheetConfig patterns.

AttemptMode

Bases: str, Enum

The mode a sheet attempt runs in.

  • NORMAL — standard execution (first try or retry)
  • COMPLETION — partial validation passed, trying to complete
  • HEALING — self-healing after retry exhaustion

BatonJobState dataclass

BatonJobState(job_id, total_sheets, paused=False, pacing_active=False, sheets=dict())

The baton's per-job tracking during a performance.

Contains all sheet states for a job, plus job-level flags (paused, pacing, cost tracking).

Attributes
job_id instance-attribute
job_id

The unique job identifier.

total_sheets instance-attribute
total_sheets

Total number of sheets in this job.

paused class-attribute instance-attribute
paused = False

Whether dispatching is paused for this job.

pacing_active class-attribute instance-attribute
pacing_active = False

Whether inter-sheet pacing delay is currently active.

sheets class-attribute instance-attribute
sheets = field(default_factory=dict)

Map of sheet_num → SheetExecutionState.

total_cost_usd property
total_cost_usd

Total cost across all sheets in this job.

completed_count property
completed_count

Number of sheets in COMPLETED status.

terminal_count property
terminal_count

Number of sheets in a terminal status (completed, failed, skipped).

is_complete property
is_complete

Whether all registered sheets have reached a terminal status.

running_sheets property
running_sheets

Sheets currently in RUNNING status.

has_any_failed property
has_any_failed

Whether any sheet has reached FAILED status.

Functions
register_sheet
register_sheet(sheet)

Register a sheet's execution state with this job.

Source code in src/marianne/daemon/baton/state.py
def register_sheet(self, sheet: SheetExecutionState) -> None:
    """Register a sheet's execution state with this job."""
    self.sheets[sheet.sheet_num] = sheet
get_sheet
get_sheet(sheet_num)

Get a sheet's execution state, or None if not registered.

Source code in src/marianne/daemon/baton/state.py
def get_sheet(self, sheet_num: int) -> SheetExecutionState | None:
    """Get a sheet's execution state, or None if not registered."""
    return self.sheets.get(sheet_num)

CircuitBreakerState

Bases: str, Enum

Three-state circuit breaker for per-instrument health tracking.

  • CLOSED — healthy, accepting requests
  • OPEN — unhealthy, rejecting requests, waiting for recovery timer
  • HALF_OPEN — probing, allowing one request to test recovery

InstrumentState dataclass

InstrumentState(name, max_concurrent, running_count=0, rate_limited=False, rate_limit_expires_at=None, circuit_breaker=CLOSED, consecutive_failures=0, circuit_breaker_threshold=5, circuit_breaker_recovery_at=None)

Per-instrument state tracking for rate limits, circuit breakers, and concurrency.

The baton tracks each instrument's health independently. Rate limits on claude-code don't affect gemini-cli. Circuit breaker thresholds are per-instrument. Concurrency limits come from the InstrumentProfile.

Attributes
name instance-attribute
name

Instrument name (matches InstrumentProfile.name).

max_concurrent instance-attribute
max_concurrent

Maximum concurrent sheets on this instrument (from InstrumentProfile).

running_count class-attribute instance-attribute
running_count = 0

Number of currently running sheets on this instrument.

rate_limited class-attribute instance-attribute
rate_limited = False

Whether this instrument is currently rate-limited.

rate_limit_expires_at class-attribute instance-attribute
rate_limit_expires_at = None

Monotonic time when the rate limit is expected to clear.

circuit_breaker class-attribute instance-attribute
circuit_breaker = CLOSED

Current circuit breaker state.

consecutive_failures class-attribute instance-attribute
consecutive_failures = 0

Consecutive failures across all jobs for this instrument.

circuit_breaker_threshold class-attribute instance-attribute
circuit_breaker_threshold = 5

Number of consecutive failures to trip the circuit breaker.

circuit_breaker_recovery_at class-attribute instance-attribute
circuit_breaker_recovery_at = None

Monotonic time for circuit breaker recovery check.

is_available property
is_available

Whether this instrument can accept new sheets.

Available when not rate-limited and circuit breaker is not open. Half-open allows one probe request through.

at_capacity property
at_capacity

Whether all concurrent slots are in use.

Functions
record_success
record_success()

Record a successful execution on this instrument.

Resets consecutive failures. If circuit breaker is half-open, closes it (the probe succeeded).

Source code in src/marianne/daemon/baton/state.py
def record_success(self) -> None:
    """Record a successful execution on this instrument.

    Resets consecutive failures. If circuit breaker is half-open,
    closes it (the probe succeeded).
    """
    self.consecutive_failures = 0
    if self.circuit_breaker == CircuitBreakerState.HALF_OPEN:
        self.circuit_breaker = CircuitBreakerState.CLOSED
record_failure
record_failure()

Record a failed execution on this instrument.

Increments consecutive failures. If threshold reached, opens the circuit breaker. If already half-open, reopens it.

Source code in src/marianne/daemon/baton/state.py
def record_failure(self) -> None:
    """Record a failed execution on this instrument.

    Increments consecutive failures. If threshold reached, opens
    the circuit breaker. If already half-open, reopens it.
    """
    self.consecutive_failures += 1

    if self.circuit_breaker == CircuitBreakerState.HALF_OPEN:
        # Probe failed — back to open
        self.circuit_breaker = CircuitBreakerState.OPEN
    elif self.consecutive_failures >= self.circuit_breaker_threshold:
        self.circuit_breaker = CircuitBreakerState.OPEN
to_dict
to_dict()

Serialize to a dict for SQLite persistence.

Source code in src/marianne/daemon/baton/state.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dict for SQLite persistence."""
    return {
        "name": self.name,
        "max_concurrent": self.max_concurrent,
        "rate_limited": self.rate_limited,
        "rate_limit_expires_at": self.rate_limit_expires_at,
        "circuit_breaker": self.circuit_breaker.value,
        "consecutive_failures": self.consecutive_failures,
        "circuit_breaker_threshold": self.circuit_breaker_threshold,
        "circuit_breaker_recovery_at": self.circuit_breaker_recovery_at,
    }
from_dict classmethod
from_dict(data)

Restore from a serialized dict.

Source code in src/marianne/daemon/baton/state.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> InstrumentState:
    """Restore from a serialized dict."""
    state = cls(
        name=data["name"],
        max_concurrent=data.get("max_concurrent", 4),
    )
    state.rate_limited = data.get("rate_limited", False)
    state.rate_limit_expires_at = data.get("rate_limit_expires_at")
    state.circuit_breaker = CircuitBreakerState(
        data.get("circuit_breaker", "closed")
    )
    state.consecutive_failures = data.get("consecutive_failures", 0)
    state.circuit_breaker_threshold = data.get("circuit_breaker_threshold", 5)
    state.circuit_breaker_recovery_at = data.get("circuit_breaker_recovery_at")
    return state

TimerHandle dataclass

TimerHandle(fire_at, event, _seq=_next_seq())

Opaque handle returned by schedule(). Used for cancellation.

Attributes:

Name Type Description
fire_at float

Monotonic time when the event should fire.

event BatonEvent

The BatonEvent to deliver to the inbox.

TimerWheel

TimerWheel(inbox)

Priority queue of future events with a background drain task.

Usage::

inbox = asyncio.Queue()
wheel = TimerWheel(inbox)

# Schedule a retry in 30 seconds
handle = wheel.schedule(30.0, RetryDue(job_id="j1", sheet_num=5))

# Cancel if no longer needed
wheel.cancel(handle)

# Run the drain task (usually as asyncio.create_task(wheel.run()))
await wheel.run()  # blocks forever, fires events into inbox

Parameters:

Name Type Description Default
inbox Queue[Any]

The asyncio.Queue that receives fired events. This is the baton's event inbox — the same queue that receives musician results, external commands, etc.

required
Source code in src/marianne/daemon/baton/timer.py
def __init__(self, inbox: asyncio.Queue[Any]) -> None:
    self._inbox = inbox
    self._heap: list[_TimerEntry] = []
    self._cancelled: set[int] = set()  # set of cancelled _seq values
    self._wake = asyncio.Event()
    self._shutting_down = False
Attributes
pending_count property
pending_count

Number of non-cancelled timers in the wheel.

Functions
schedule
schedule(delay_seconds, event)

Schedule an event to fire after delay_seconds.

Parameters:

Name Type Description Default
delay_seconds float

Seconds from now. Clamped to >= 0.

required
event BatonEvent

The BatonEvent to deliver when the timer fires.

required

Returns:

Type Description
TimerHandle

A TimerHandle that can be passed to cancel().

Source code in src/marianne/daemon/baton/timer.py
def schedule(self, delay_seconds: float, event: BatonEvent) -> TimerHandle:
    """Schedule an event to fire after ``delay_seconds``.

    Args:
        delay_seconds: Seconds from now. Clamped to >= 0.
        event: The BatonEvent to deliver when the timer fires.

    Returns:
        A TimerHandle that can be passed to ``cancel()``.
    """
    delay = max(0.0, delay_seconds)
    fire_at = time.monotonic() + delay
    handle = TimerHandle(fire_at=fire_at, event=event)
    entry = _TimerEntry(fire_at=fire_at, seq=handle._seq, handle=handle)
    heapq.heappush(self._heap, entry)

    _logger.debug(
        "timer.scheduled",
        extra={
            "event_type": type(event).__name__,
            "delay_seconds": delay,
            "fire_at": fire_at,
            "pending": self.pending_count,
        },
    )

    # Wake the drain task — it may need to recalculate its sleep
    self._wake.set()
    return handle
cancel
cancel(handle)

Cancel a scheduled timer.

Parameters:

Name Type Description Default
handle TimerHandle

The handle returned by schedule().

required

Returns:

Type Description
bool

True if the timer was pending and is now cancelled.

bool

False if it was already cancelled or already fired.

Source code in src/marianne/daemon/baton/timer.py
def cancel(self, handle: TimerHandle) -> bool:
    """Cancel a scheduled timer.

    Args:
        handle: The handle returned by ``schedule()``.

    Returns:
        True if the timer was pending and is now cancelled.
        False if it was already cancelled or already fired.
    """
    seq = handle._seq
    if seq in self._cancelled:
        return False

    # Check if it's still in the heap (not yet fired)
    for entry in self._heap:
        if entry.handle._seq == seq and not entry.cancelled:
            entry.cancelled = True
            self._cancelled.add(seq)
            _logger.debug(
                "timer.cancelled",
                extra={
                    "event_type": type(handle.event).__name__,
                    "fire_at": handle.fire_at,
                },
            )
            return True

    return False
snapshot
snapshot()

Export pending (non-cancelled) timers for persistence.

Returns a list of (fire_at, event) tuples — the data needed to reconstruct the timer wheel after a restart.

Source code in src/marianne/daemon/baton/timer.py
def snapshot(self) -> list[tuple[float, BatonEvent]]:
    """Export pending (non-cancelled) timers for persistence.

    Returns a list of (fire_at, event) tuples — the data needed to
    reconstruct the timer wheel after a restart.
    """
    return [
        (entry.fire_at, entry.handle.event)
        for entry in self._heap
        if entry.handle._seq not in self._cancelled
    ]
run async
run()

Background drain task — fire timers into the inbox.

This coroutine runs indefinitely. Cancel the task to stop it. It sleeps until the next timer is due (or a wake signal arrives), fires all due timers, and repeats.

Source code in src/marianne/daemon/baton/timer.py
async def run(self) -> None:
    """Background drain task — fire timers into the inbox.

    This coroutine runs indefinitely. Cancel the task to stop it.
    It sleeps until the next timer is due (or a wake signal arrives),
    fires all due timers, and repeats.
    """
    while True:
        # Skip cancelled entries at the front of the heap
        while self._heap and self._heap[0].handle._seq in self._cancelled:
            heapq.heappop(self._heap)

        if not self._heap:
            # No timers — sleep until a timer is added
            self._wake.clear()
            await self._wake.wait()
            continue

        next_fire = self._heap[0].fire_at
        now = time.monotonic()

        if next_fire <= now:
            # Timer is due — pop and fire
            entry = heapq.heappop(self._heap)
            if entry.handle._seq in self._cancelled:
                # Was cancelled between check and pop — skip
                self._cancelled.discard(entry.handle._seq)
                continue

            self._cancelled.discard(entry.handle._seq)
            await self._inbox.put(entry.handle.event)

            _logger.debug(
                "timer.fired",
                extra={
                    "event_type": type(entry.handle.event).__name__,
                    "fire_at": entry.fire_at,
                    "latency_ms": (time.monotonic() - entry.fire_at) * 1000,
                },
            )
        else:
            # Sleep until next timer or wake signal
            sleep_for = next_fire - now
            self._wake.clear()
            try:
                await asyncio.wait_for(self._wake.wait(), timeout=sleep_for)
            except TimeoutError:
                pass  # Timer is due, loop will fire it
shutdown async
shutdown()

Fire all pending timers immediately and stop.

Used during graceful shutdown — ensures no events are lost. Events are placed into the inbox in fire_at order.

Source code in src/marianne/daemon/baton/timer.py
async def shutdown(self) -> None:
    """Fire all pending timers immediately and stop.

    Used during graceful shutdown — ensures no events are lost.
    Events are placed into the inbox in fire_at order.
    """
    self._shutting_down = True

    # Drain heap in order, skipping cancelled
    while self._heap:
        entry = heapq.heappop(self._heap)
        if entry.handle._seq not in self._cancelled:
            await self._inbox.put(entry.handle.event)

    _logger.debug("timer.shutdown_complete")

Functions

sheet_task async

sheet_task(*, job_id, sheet, backend, attempt_context, inbox, total_sheets=1, total_movements=1, rendered_prompt=None, preamble=None, cost_per_1k_input=None, cost_per_1k_output=None, instrument_override=None)

Execute a single sheet attempt and report the result.

This is the musician's entire job. Play once, report in full detail. The conductor (baton) decides what happens next.

Parameters:

Name Type Description Default
job_id str

The job this sheet belongs to.

required
sheet Sheet

The sheet to execute (prompt, validations, timeout, etc.).

required
backend Backend

The backend to execute through.

required
attempt_context AttemptContext

Context from the conductor (attempt number, mode, etc.).

required
inbox Queue[SheetAttemptResult]

The baton's event inbox to report results to.

required
total_sheets int

Total concrete sheets in the job (for template variables).

1
total_movements int

Total movements in the job (for template variables).

1
rendered_prompt str | None

Optional pre-rendered prompt from PromptRenderer. When provided, the musician uses this directly instead of calling _build_prompt(). This enables the full 9-layer prompt assembly pipeline including spec fragments, learned patterns, and failure history.

None
preamble str | None

Optional pre-built preamble. Set on the backend via set_preamble() before execution. Only used when rendered_prompt is also provided (the PromptRenderer separates them).

None
cost_per_1k_input float | None

Cost per 1000 input tokens (USD) from the instrument profile's ModelCapacity. None uses hardcoded fallback.

None
cost_per_1k_output float | None

Cost per 1000 output tokens (USD) from the instrument profile's ModelCapacity. None uses hardcoded fallback.

None
instrument_override str | None

When set, used as the instrument_name in the SheetAttemptResult instead of sheet.instrument_name. Required after instrument fallback — the Sheet entity keeps the original instrument but the baton's SheetExecutionState tracks the fallback instrument. Without this, attempt results credit the wrong instrument for success/failure tracking.

None

Never raises — all exceptions are caught and reported via the inbox.

Source code in src/marianne/daemon/baton/musician.py
async def sheet_task(
    *,
    job_id: str,
    sheet: Sheet,
    backend: Backend,
    attempt_context: AttemptContext,
    inbox: asyncio.Queue[SheetAttemptResult],
    total_sheets: int = 1,
    total_movements: int = 1,
    rendered_prompt: str | None = None,
    preamble: str | None = None,
    cost_per_1k_input: float | None = None,
    cost_per_1k_output: float | None = None,
    instrument_override: str | None = None,
) -> None:
    """Execute a single sheet attempt and report the result.

    This is the musician's entire job. Play once, report in full detail.
    The conductor (baton) decides what happens next.

    Args:
        job_id: The job this sheet belongs to.
        sheet: The sheet to execute (prompt, validations, timeout, etc.).
        backend: The backend to execute through.
        attempt_context: Context from the conductor (attempt number, mode, etc.).
        inbox: The baton's event inbox to report results to.
        total_sheets: Total concrete sheets in the job (for template variables).
        total_movements: Total movements in the job (for template variables).
        rendered_prompt: Optional pre-rendered prompt from PromptRenderer.
            When provided, the musician uses this directly instead of
            calling _build_prompt(). This enables the full 9-layer
            prompt assembly pipeline including spec fragments, learned
            patterns, and failure history.
        preamble: Optional pre-built preamble. Set on the backend via
            set_preamble() before execution. Only used when rendered_prompt
            is also provided (the PromptRenderer separates them).
        cost_per_1k_input: Cost per 1000 input tokens (USD) from the
            instrument profile's ModelCapacity. None uses hardcoded fallback.
        cost_per_1k_output: Cost per 1000 output tokens (USD) from the
            instrument profile's ModelCapacity. None uses hardcoded fallback.
        instrument_override: When set, used as the instrument_name in the
            SheetAttemptResult instead of sheet.instrument_name. Required
            after instrument fallback — the Sheet entity keeps the original
            instrument but the baton's SheetExecutionState tracks the
            fallback instrument. Without this, attempt results credit
            the wrong instrument for success/failure tracking.

    Never raises — all exceptions are caught and reported via the inbox.
    """
    start_time = time.monotonic()
    # Resolve the effective instrument name — fallback may have changed it
    effective_instrument = instrument_override or sheet.instrument_name

    try:
        # Step 1: Build prompt
        if rendered_prompt is not None:
            # F-104 via PromptRenderer — pre-rendered with all 9 layers.
            # Preamble is separated and set on the backend directly.
            prompt = rendered_prompt
            if preamble is not None:
                backend.set_preamble(preamble)
        else:
            # Fallback: inline rendering (covers basic cases)
            prompt = _build_prompt(
                sheet, attempt_context,
                total_sheets=total_sheets,
                total_movements=total_movements,
            )

        # Step 2-3: Execute through backend
        exec_result = await _execute(backend, prompt, sheet.timeout_seconds)

        # Step 4: Run validations (only if execution succeeded)
        # F-118: pass total_sheets/total_movements for rich context
        val_passed, val_total, val_rate, val_details = await _validate(
            sheet, exec_result,
            total_sheets=total_sheets,
            total_movements=total_movements,
        )

        # Step 5: Record output with credential redaction
        stdout_tail, stderr_tail = _capture_output(exec_result)

        # Step 6: Classify errors (redact credentials from error messages —
        # backend error_message can contain API keys from auth failures,
        # config errors, or URL parameters)
        error_class, raw_error_msg = _classify_error(exec_result)
        error_msg = redact_credentials(raw_error_msg) if raw_error_msg else raw_error_msg

        # Build and report result
        duration = exec_result.duration_seconds
        result = SheetAttemptResult(
            job_id=job_id,
            sheet_num=sheet.num,
            instrument_name=effective_instrument,
            attempt=attempt_context.attempt_number,
            execution_success=exec_result.success,
            exit_code=exec_result.exit_code,
            duration_seconds=duration,
            validations_passed=val_passed,
            validations_total=val_total,
            validation_pass_rate=val_rate,
            validation_details=val_details,
            error_classification=error_class,
            error_message=error_msg,
            rate_limited=exec_result.rate_limited,
            rate_limit_wait_seconds=exec_result.rate_limit_wait_seconds,
            cost_usd=_estimate_cost(
                exec_result,
                cost_per_1k_input=cost_per_1k_input,
                cost_per_1k_output=cost_per_1k_output,
            ),
            input_tokens=exec_result.input_tokens or 0,
            output_tokens=exec_result.output_tokens or 0,
            model_used=exec_result.model,
            stdout_tail=stdout_tail,
            stderr_tail=stderr_tail,
        )

    except Exception as exc:
        # Step 6 (exception path): Never crash the baton
        duration = time.monotonic() - start_time
        raw_error_msg = f"{type(exc).__name__}: {exc}"
        # Redact credentials from exception messages before logging/storing.
        # Exception text can contain API keys (e.g., auth failures that echo
        # the key, config loading errors with key values in paths). Without
        # redaction, credentials propagate to logs, state DB, dashboard,
        # learning store, and diagnostic output — 6+ storage locations.
        error_msg = redact_credentials(raw_error_msg) or raw_error_msg
        _logger.error(
            "musician.sheet_task.exception",
            extra={
                "job_id": job_id,
                SHEET_NUM_KEY: sheet.num,
                "error": error_msg,
            },
            exc_info=True,
        )

        result = SheetAttemptResult(
            job_id=job_id,
            sheet_num=sheet.num,
            instrument_name=effective_instrument,
            attempt=attempt_context.attempt_number,
            execution_success=False,
            exit_code=None,
            duration_seconds=duration,
            error_classification="TRANSIENT",
            error_message=error_msg,
            rate_limited=False,
        )

    # Always report — the baton must know what happened
    await inbox.put(result)
    _logger.info(
        "musician.sheet_task.reported",
        extra={
            "job_id": job_id,
            SHEET_NUM_KEY: sheet.num,
            "success": result.execution_success,
            "pass_rate": result.validation_pass_rate,
            "duration": result.duration_seconds,
        },
    )