Skip to content

adapter

adapter

BatonAdapter — wires the baton into the conductor.

Step 28: Replace monolithic execution with the baton's event-driven model.

The adapter is the bridge between the conductor (JobManager) and the baton (BatonCore). It handles:

  1. Job submission → baton registration (Surface 1) JobConfig → Sheet[] → SheetExecutionState[] → BatonCore.register_job()

  2. Dispatch callback → backend acquisition (Surface 2) BatonCore dispatches → adapter acquires backend → spawns musician task

  3. Prompt assembly (Surface 3) Creates SheetContext, calls PromptBuilder.build_sheet_prompt()

  4. State synchronization (Surface 4) BatonSheetStatus ↔ CheckpointState.SheetStatus mapping

  5. EventBus integration (Surface 5) Baton events → ObserverEvent format → EventBus.publish()

  6. Rate limit callback bridge (Surface 6) Musician extracts wait time → SheetAttemptResult → baton handles

  7. Lifecycle (Surface 8) The adapter is always initialized by the conductor at startup

Design decisions: - Checkpoint is source of truth. Baton rebuilds from checkpoint on restart. - Save checkpoint FIRST, then update baton state (prevents re-execution). - The adapter does NOT own the baton's main loop — the manager runs it. - Concert support: sequential score submission (option 1 from wiring analysis).

See: workspaces/v1-beta-v3/movement-2/step-28-wiring-analysis.md

Attributes

Classes

BatonAdapter

BatonAdapter(*, event_bus=None, max_concurrent_sheets=10, state_sync_callback=None, persist_callback=None)

Bridges the conductor (JobManager) and the baton (BatonCore).

The adapter owns: - A BatonCore instance (event loop, sheet registry, state machine) - A mapping of job_id → Sheet[] (for prompt rendering at dispatch time) - Active musician tasks (asyncio.Task per dispatched sheet)

The adapter does NOT own: - The BackendPool (injected by the manager) - The EventBus (injected by the manager) - The CheckpointState (managed by the manager's state backend)

Usage::

adapter = BatonAdapter(event_bus=bus)
adapter.set_backend_pool(pool)

# Register a job
adapter.register_job("j1", sheets, deps)

# Run the baton (blocks until shutdown)
await adapter.run()

Initialize the BatonAdapter.

Parameters:

Name Type Description Default
event_bus EventBus | None

Optional EventBus for publishing events to subscribers.

None
max_concurrent_sheets int

Global concurrency ceiling for dispatch.

10
state_sync_callback StateSyncCallback | None

Deprecated — kept for backward compat. Phase 2 uses persist_callback instead.

None
persist_callback PersistCallback | None

Called with job_id after significant state transitions (terminal, dispatch) to persist CheckpointState to the registry. Replaces the sync layer.

None
Source code in src/marianne/daemon/baton/adapter.py
def __init__(
    self,
    *,
    event_bus: EventBus | None = None,
    max_concurrent_sheets: int = 10,
    state_sync_callback: StateSyncCallback | None = None,
    persist_callback: PersistCallback | None = None,
) -> None:
    """Initialize the BatonAdapter.

    Args:
        event_bus: Optional EventBus for publishing events to subscribers.
        max_concurrent_sheets: Global concurrency ceiling for dispatch.
        state_sync_callback: Deprecated — kept for backward compat.
            Phase 2 uses persist_callback instead.
        persist_callback: Called with job_id after significant state
            transitions (terminal, dispatch) to persist CheckpointState
            to the registry. Replaces the sync layer.
    """
    from marianne.daemon.baton.timer import TimerWheel

    # Shared inbox breaks the circular dependency: TimerWheel needs
    # the inbox to deliver fired events, BatonCore needs the timer
    # to schedule them. Create the inbox first, pass to both.
    inbox: asyncio.Queue[Any] = asyncio.Queue()
    self._timer_wheel = TimerWheel(inbox)
    self._baton = BatonCore(timer=self._timer_wheel, inbox=inbox)
    self._event_bus = event_bus
    self._max_concurrent_sheets = max_concurrent_sheets
    self._persist_callback = persist_callback
    # Deprecated compat attributes — tests set/read these directly
    self._state_sync_callback = state_sync_callback
    self._synced_status: dict[tuple[str, int], str] = {}

    # Job → Sheet mapping for prompt rendering
    self._job_sheets: dict[str, dict[int, Sheet]] = {}

    # Active musician tasks: (job_id, sheet_num) → Task
    self._active_tasks: dict[tuple[str, int], asyncio.Task[Any]] = {}

    # BackendPool — injected via set_backend_pool()
    self._backend_pool: BackendPool | None = None

    # Per-job PromptRenderer — created when prompt_config is provided
    self._job_renderers: dict[str, PromptRenderer] = {}

    # Per-job CrossSheetConfig — enables cross-sheet context (F-210)
    self._job_cross_sheet: dict[str, CrossSheetConfig] = {}

    # Per-job completion events — set when all sheets reach terminal state
    self._completion_events: dict[str, asyncio.Event] = {}

    # Per-job completion status: True = all sheets completed, False = has failures
    self._completion_results: dict[str, bool] = {}

    # Running state
    self._running = False
    self._baton_task: asyncio.Task[Any] | None = None
Attributes
baton property
baton

The underlying BatonCore instance.

is_running property
is_running

Whether the baton's event loop is running.

Functions
set_backend_pool
set_backend_pool(pool)

Inject the BackendPool for backend acquisition.

Must be called before dispatching any sheets.

Parameters:

Name Type Description Default
pool BackendPool

The backend pool from the manager.

required
Source code in src/marianne/daemon/baton/adapter.py
def set_backend_pool(self, pool: BackendPool) -> None:
    """Inject the BackendPool for backend acquisition.

    Must be called before dispatching any sheets.

    Args:
        pool: The backend pool from the manager.
    """
    self._backend_pool = pool
register_job
register_job(job_id, sheets, dependencies, *, max_cost_usd=None, max_retries=3, max_completion=5, escalation_enabled=False, self_healing_enabled=False, prompt_config=None, parallel_enabled=False, cross_sheet=None, pacing_seconds=0.0, live_sheets=None)

Register a job with the baton for event-driven execution.

Converts Sheet entities to SheetExecutionState and registers them with the baton's sheet registry.

Phase 2: when live_sheets is provided, uses those SheetState objects directly instead of creating new ones. This ensures the baton writes to the same objects that live in _live_states, eliminating the need for a sync layer.

Parameters:

Name Type Description Default
job_id str

Unique job identifier (conductor job_id).

required
sheets list[Sheet]

Sheet entities from build_sheets().

required
dependencies dict[int, list[int]]

Dependency graph {sheet_num: [dep_nums]}.

required
max_cost_usd float | None

Optional per-job cost limit.

None
max_retries int

Max normal retry attempts per sheet.

3
max_completion int

Max completion mode attempts per sheet.

5
escalation_enabled bool

Enter fermata on exhaustion.

False
self_healing_enabled bool

Try self-healing on exhaustion.

False
prompt_config PromptConfig | None

Optional PromptConfig for full prompt rendering. When provided, creates a PromptRenderer for this job that handles the complete 9-layer prompt assembly pipeline.

None
parallel_enabled bool

Whether parallel execution is enabled (for preamble concurrency warning).

False
cross_sheet CrossSheetConfig | None

Optional CrossSheetConfig for cross-sheet context (F-210). When provided, the adapter collects previous sheet outputs and workspace files at dispatch time.

None
Source code in src/marianne/daemon/baton/adapter.py
def register_job(
    self,
    job_id: str,
    sheets: list[Sheet],
    dependencies: dict[int, list[int]],
    *,
    max_cost_usd: float | None = None,
    max_retries: int = 3,
    max_completion: int = 5,
    escalation_enabled: bool = False,
    self_healing_enabled: bool = False,
    prompt_config: PromptConfig | None = None,
    parallel_enabled: bool = False,
    cross_sheet: CrossSheetConfig | None = None,
    pacing_seconds: float = 0.0,
    live_sheets: dict[int, SheetExecutionState] | None = None,
) -> None:
    """Register a job with the baton for event-driven execution.

    Converts Sheet entities to SheetExecutionState and registers
    them with the baton's sheet registry.

    Phase 2: when ``live_sheets`` is provided, uses those SheetState
    objects directly instead of creating new ones. This ensures the
    baton writes to the same objects that live in ``_live_states``,
    eliminating the need for a sync layer.

    Args:
        job_id: Unique job identifier (conductor job_id).
        sheets: Sheet entities from build_sheets().
        dependencies: Dependency graph {sheet_num: [dep_nums]}.
        max_cost_usd: Optional per-job cost limit.
        max_retries: Max normal retry attempts per sheet.
        max_completion: Max completion mode attempts per sheet.
        escalation_enabled: Enter fermata on exhaustion.
        self_healing_enabled: Try self-healing on exhaustion.
        prompt_config: Optional PromptConfig for full prompt rendering.
            When provided, creates a PromptRenderer for this job that
            handles the complete 9-layer prompt assembly pipeline.
        parallel_enabled: Whether parallel execution is enabled
            (for preamble concurrency warning).
        cross_sheet: Optional CrossSheetConfig for cross-sheet context
            (F-210). When provided, the adapter collects previous sheet
            outputs and workspace files at dispatch time.
    """
    # Store sheets for prompt rendering at dispatch time
    self._job_sheets[job_id] = {s.num: s for s in sheets}

    # Store cross-sheet config (F-210)
    if cross_sheet is not None:
        self._job_cross_sheet[job_id] = cross_sheet

    # Create PromptRenderer if config is available (F-104)
    if prompt_config is not None:
        from marianne.daemon.baton.prompt import PromptRenderer

        total_sheets = len(sheets)
        total_stages = len({s.movement for s in sheets}) or 1
        self._job_renderers[job_id] = PromptRenderer(
            prompt_config=prompt_config,
            total_sheets=total_sheets,
            total_stages=total_stages,
            parallel_enabled=parallel_enabled,
        )

    # Create completion event for this job
    self._completion_events[job_id] = asyncio.Event()

    # Phase 2: use live_sheets (shared with _live_states) when provided.
    # This makes the baton write directly to the same SheetState objects
    # the manager serves via get_job_status — no sync layer needed.
    if live_sheets is not None:
        # Enrich existing SheetState objects with baton scheduling fields
        for sheet in sheets:
            s = live_sheets.get(sheet.num)
            if s is not None:
                s.instrument_name = sheet.instrument_name
                raw_m = sheet.instrument_config.get("model")
                if raw_m is not None:
                    s.model = str(raw_m)
                s.max_retries = max_retries
                s.max_completion = max_completion
                s.fallback_chain = list(sheet.instrument_fallbacks)
                s.sheet_timeout_seconds = sheet.timeout_seconds
        states = live_sheets
    else:
        states = sheets_to_execution_states(
            sheets,
            max_retries=max_retries,
            max_completion=max_completion,
        )

    # Register with baton
    self._baton.register_job(
        job_id,
        states,
        dependencies,
        escalation_enabled=escalation_enabled,
        self_healing_enabled=self_healing_enabled,
        pacing_seconds=pacing_seconds,
    )

    # Set cost limits if configured
    if max_cost_usd is not None:
        self._baton.set_job_cost_limit(job_id, max_cost_usd)

    _logger.info(
        "adapter.job_registered",
        extra={
            "job_id": job_id,
            "sheet_count": len(sheets),
            "dependency_count": len(dependencies),
            "max_retries": max_retries,
            "max_completion": max_completion,
        },
    )

    # Kick the event loop so dispatch_ready runs for the newly registered
    # sheets.  Without this the loop blocks on inbox.get() forever
    # because no musician or timer has produced an event yet.
    self._baton.inbox.put_nowait(DispatchRetry())
deregister_job
deregister_job(job_id)

Remove a job from the adapter and baton.

Cleans up all per-job state including active tasks.

Parameters:

Name Type Description Default
job_id str

The job to remove.

required
Source code in src/marianne/daemon/baton/adapter.py
def deregister_job(self, job_id: str) -> None:
    """Remove a job from the adapter and baton.

    Cleans up all per-job state including active tasks.

    Args:
        job_id: The job to remove.
    """
    # Cancel active musician tasks for this job
    keys_to_cancel = [
        key for key in self._active_tasks if key[0] == job_id
    ]
    for key in keys_to_cancel:
        task = self._active_tasks.pop(key)
        task.cancel()

    # Remove from baton
    self._baton.deregister_job(job_id)

    # Remove sheet mapping, renderer, cross-sheet config, and completion tracking
    self._job_sheets.pop(job_id, None)
    self._job_renderers.pop(job_id, None)
    self._job_cross_sheet.pop(job_id, None)
    self._completion_events.pop(job_id, None)
    self._completion_results.pop(job_id, None)

    # Clean up vestigial _synced_status entries (Phase 2: sync layer
    # removed but dict retained for compatibility). Defensive cleanup
    # prevents memory leaks if anything accidentally populates it.
    if hasattr(self, "_synced_status") and self._synced_status:
        self._synced_status = {
            k: v for k, v in self._synced_status.items() if k[0] != job_id
        }

    _logger.info("adapter.job_deregistered", extra={"job_id": job_id})
recover_job
recover_job(job_id, sheets, dependencies, checkpoint, *, max_cost_usd=None, max_retries=3, max_completion=5, escalation_enabled=False, self_healing_enabled=False, prompt_config=None, parallel_enabled=False, cross_sheet=None, pacing_seconds=0.0, live_sheets=None)

Recover a job from a checkpoint after conductor restart.

Rebuilds baton state from the persisted CheckpointState. Terminal sheets (completed, failed, skipped) keep their status. In-progress sheets are reset to PENDING because their musicians died when the conductor restarted. Attempt counts are preserved to avoid infinite retries.

Design invariant: Checkpoint is the source of truth. The baton rebuilds from checkpoint, not the reverse.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
sheets list[Sheet]

Sheet entities from build_sheets() — same config that produced the original job.

required
dependencies dict[int, list[int]]

Dependency graph {sheet_num: [dep_nums]}.

required
checkpoint CheckpointState

Persisted CheckpointState loaded from workspace.

required
max_cost_usd float | None

Optional per-job cost limit.

None
max_retries int

Max normal retry attempts per sheet.

3
max_completion int

Max completion mode attempts per sheet.

5
escalation_enabled bool

Enter fermata on exhaustion.

False
self_healing_enabled bool

Try self-healing on exhaustion.

False
prompt_config PromptConfig | None

Optional PromptConfig for prompt rendering.

None
parallel_enabled bool

Whether parallel execution is enabled.

False
cross_sheet CrossSheetConfig | None

Optional CrossSheetConfig for cross-sheet context (F-210).

None
Source code in src/marianne/daemon/baton/adapter.py
def recover_job(
    self,
    job_id: str,
    sheets: list[Sheet],
    dependencies: dict[int, list[int]],
    checkpoint: CheckpointState,
    *,
    max_cost_usd: float | None = None,
    max_retries: int = 3,
    max_completion: int = 5,
    escalation_enabled: bool = False,
    self_healing_enabled: bool = False,
    prompt_config: PromptConfig | None = None,
    parallel_enabled: bool = False,
    cross_sheet: CrossSheetConfig | None = None,
    pacing_seconds: float = 0.0,
    live_sheets: dict[int, SheetExecutionState] | None = None,
) -> None:
    """Recover a job from a checkpoint after conductor restart.

    Rebuilds baton state from the persisted CheckpointState. Terminal
    sheets (completed, failed, skipped) keep their status. In-progress
    sheets are reset to PENDING because their musicians died when the
    conductor restarted. Attempt counts are preserved to avoid infinite
    retries.

    Design invariant: Checkpoint is the source of truth. The baton
    rebuilds from checkpoint, not the reverse.

    Args:
        job_id: Unique job identifier.
        sheets: Sheet entities from build_sheets() — same config
            that produced the original job.
        dependencies: Dependency graph {sheet_num: [dep_nums]}.
        checkpoint: Persisted CheckpointState loaded from workspace.
        max_cost_usd: Optional per-job cost limit.
        max_retries: Max normal retry attempts per sheet.
        max_completion: Max completion mode attempts per sheet.
        escalation_enabled: Enter fermata on exhaustion.
        self_healing_enabled: Try self-healing on exhaustion.
        prompt_config: Optional PromptConfig for prompt rendering.
        parallel_enabled: Whether parallel execution is enabled.
        cross_sheet: Optional CrossSheetConfig for cross-sheet context (F-210).
    """
    # Store sheets for prompt rendering
    self._job_sheets[job_id] = {s.num: s for s in sheets}

    # Store cross-sheet config (F-210)
    if cross_sheet is not None:
        self._job_cross_sheet[job_id] = cross_sheet

    # Create PromptRenderer if config is available
    if prompt_config is not None:
        from marianne.daemon.baton.prompt import PromptRenderer

        total_sheets = len(sheets)
        total_stages = len({s.movement for s in sheets}) or 1
        self._job_renderers[job_id] = PromptRenderer(
            prompt_config=prompt_config,
            total_sheets=total_sheets,
            total_stages=total_stages,
            parallel_enabled=parallel_enabled,
        )

    # Create completion event
    self._completion_events[job_id] = asyncio.Event()

    # Build SheetExecutionState with recovered statuses and attempt counts
    states: dict[int, SheetExecutionState] = {}
    for sheet in sheets:
        cp_sheet = checkpoint.sheets.get(sheet.num)

        if cp_sheet is not None:
            # On restart, active/transient sheets reset to PENDING:
            # - in_progress/dispatched: musician was killed on restart
            # - waiting: rate limit timers lost on restart
            # - retry_scheduled: retry timers lost on restart
            # - fermata: re-evaluate escalation on restart
            # Terminal sheets (completed, failed, skipped, cancelled)
            # keep their status. Pending/ready stay as-is.
            _RESET_ON_RESTART = frozenset({
                BatonSheetStatus.IN_PROGRESS,
                BatonSheetStatus.DISPATCHED,
                BatonSheetStatus.WAITING,
                BatonSheetStatus.RETRY_SCHEDULED,
                BatonSheetStatus.FERMATA,
            })
            baton_status = (
                BatonSheetStatus.PENDING
                if cp_sheet.status in _RESET_ON_RESTART
                else cp_sheet.status
            )

            # Carry forward attempt counts to avoid infinite retries
            normal_attempts = cp_sheet.attempt_count
            completion_attempts = cp_sheet.completion_attempts
        else:
            # Sheet not in checkpoint — treat as fresh PENDING
            baton_status = BatonSheetStatus.PENDING
            normal_attempts = 0
            completion_attempts = 0

        # Phase 2: update the live SheetState in-place when available,
        # so the baton operates on the same objects as _live_states.
        if live_sheets is not None and sheet.num in live_sheets:
            state = live_sheets[sheet.num]
        else:
            raw_model = sheet.instrument_config.get("model")
            state = SheetExecutionState(
                sheet_num=sheet.num,
                instrument_name=sheet.instrument_name,
                model=str(raw_model) if raw_model is not None else None,
            )

        # Always populate instrument identity from the Sheet entity.
        # The checkpoint may have instrument_name=None for sheets that
        # were never dispatched (e.g., dependency-cascaded failures).
        state.instrument_name = sheet.instrument_name
        raw_model = sheet.instrument_config.get("model")
        if raw_model is not None:
            state.model = str(raw_model)
        state.max_retries = max_retries
        state.max_completion = max_completion
        state.fallback_chain = list(sheet.instrument_fallbacks)
        state.sheet_timeout_seconds = sheet.timeout_seconds
        state.status = baton_status
        state.normal_attempts = normal_attempts
        state.completion_attempts = completion_attempts
        # Reset instrument fallback position for non-terminal sheets.
        # A recovered/resumed sheet must start from the primary
        # instrument, not stay stuck on whatever fallback it died on.
        # Terminal sheets (COMPLETED, SKIPPED) keep their instrument
        # history for diagnostics.
        _TERMINAL = {
            BatonSheetStatus.COMPLETED,
            BatonSheetStatus.SKIPPED,
            BatonSheetStatus.CANCELLED,
        }
        if baton_status not in _TERMINAL:
            state.current_instrument_index = 0

        states[sheet.num] = state

    # Register with baton using the recovered states
    self._baton.register_job(
        job_id,
        states,
        dependencies,
        escalation_enabled=escalation_enabled,
        self_healing_enabled=self_healing_enabled,
        pacing_seconds=pacing_seconds,
    )

    # Set cost limits if configured
    if max_cost_usd is not None:
        self._baton.set_job_cost_limit(job_id, max_cost_usd)

    _logger.info(
        "adapter.job_recovered",
        extra={
            "job_id": job_id,
            "sheet_count": len(sheets),
            "recovered_terminal": sum(
                1 for s in states.values()
                if s.status in (
                    BatonSheetStatus.COMPLETED,
                    BatonSheetStatus.FAILED,
                    BatonSheetStatus.SKIPPED,
                )
            ),
            "recovered_pending": sum(
                1 for s in states.values()
                if s.status == BatonSheetStatus.PENDING
            ),
        },
    )

    # Kick the event loop so dispatch_ready runs for recovered sheets
    self._baton.inbox.put_nowait(DispatchRetry())
get_sheet
get_sheet(job_id, sheet_num)

Get a Sheet entity for a registered job.

Parameters:

Name Type Description Default
job_id str

The job identifier.

required
sheet_num int

The sheet number.

required

Returns:

Type Description
Sheet | None

The Sheet entity, or None if not found.

Source code in src/marianne/daemon/baton/adapter.py
def get_sheet(self, job_id: str, sheet_num: int) -> Sheet | None:
    """Get a Sheet entity for a registered job.

    Args:
        job_id: The job identifier.
        sheet_num: The sheet number.

    Returns:
        The Sheet entity, or None if not found.
    """
    job_sheets = self._job_sheets.get(job_id)
    if job_sheets is None:
        return None
    return job_sheets.get(sheet_num)
wait_for_completion async
wait_for_completion(job_id)

Wait until a job reaches terminal state.

Blocks until all sheets in the job are completed, failed, skipped, or cancelled. Used by the manager's _run_job_task to await baton execution.

Parameters:

Name Type Description Default
job_id str

The job to wait for.

required

Returns:

Type Description
bool

True if all sheets completed successfully, False if any failed.

Raises:

Type Description
KeyError

If the job is not registered.

Source code in src/marianne/daemon/baton/adapter.py
async def wait_for_completion(self, job_id: str) -> bool:
    """Wait until a job reaches terminal state.

    Blocks until all sheets in the job are completed, failed, skipped,
    or cancelled. Used by the manager's _run_job_task to await baton
    execution.

    Args:
        job_id: The job to wait for.

    Returns:
        True if all sheets completed successfully, False if any failed.

    Raises:
        KeyError: If the job is not registered.
    """
    event = self._completion_events.get(job_id)
    if event is None:
        raise KeyError(f"Job '{job_id}' is not registered with the adapter")
    await event.wait()
    return self._completion_results.get(job_id, False)
has_completed_sheets
has_completed_sheets(job_id)

Check if any sheet in the job reached COMPLETED status.

F-145: Used by the manager to set completed_new_work after baton execution. The zero-work guard for concert chaining needs to know whether any sheet completed new work — not just whether all sheets succeeded.

Parameters:

Name Type Description Default
job_id str

The job to check.

required

Returns:

Type Description
bool

True if at least one sheet completed, False otherwise.

Source code in src/marianne/daemon/baton/adapter.py
def has_completed_sheets(self, job_id: str) -> bool:
    """Check if any sheet in the job reached COMPLETED status.

    F-145: Used by the manager to set completed_new_work after baton
    execution. The zero-work guard for concert chaining needs to know
    whether any sheet completed new work — not just whether all
    sheets succeeded.

    Args:
        job_id: The job to check.

    Returns:
        True if at least one sheet completed, False otherwise.
    """
    job = self._baton._jobs.get(job_id)
    if job is None:
        return False
    return any(
        s.status == BatonSheetStatus.COMPLETED
        for s in job.sheets.values()
    )
clear_instrument_rate_limit
clear_instrument_rate_limit(instrument=None)

Clear instrument rate limit state in the baton core.

Delegates to BatonCore.clear_instrument_rate_limit(). Also moves WAITING sheets back to PENDING so they can be re-dispatched.

Parameters:

Name Type Description Default
instrument str | None

Instrument name to clear, or None for all.

None

Returns:

Type Description
int

Number of instruments whose rate limit was cleared.

Source code in src/marianne/daemon/baton/adapter.py
def clear_instrument_rate_limit(
    self,
    instrument: str | None = None,
) -> int:
    """Clear instrument rate limit state in the baton core.

    Delegates to ``BatonCore.clear_instrument_rate_limit()``.  Also
    moves WAITING sheets back to PENDING so they can be re-dispatched.

    Args:
        instrument: Instrument name to clear, or ``None`` for all.

    Returns:
        Number of instruments whose rate limit was cleared.
    """
    return self._baton.clear_instrument_rate_limit(instrument)
publish_sheet_skipped async
publish_sheet_skipped(event)

Publish a sheet skip event to the EventBus.

Parameters:

Name Type Description Default
event SheetSkipped

The sheet skip event.

required
Source code in src/marianne/daemon/baton/adapter.py
async def publish_sheet_skipped(self, event: SheetSkipped) -> None:
    """Publish a sheet skip event to the EventBus.

    Args:
        event: The sheet skip event.
    """
    if self._event_bus is None:
        return

    obs_event = skipped_to_observer_event(event)
    try:
        await self._event_bus.publish(obs_event)
    except Exception:
        _logger.warning(
            "adapter.event_publish_failed",
            extra={
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
            },
            exc_info=True,
        )
publish_job_event async
publish_job_event(job_id, event_name, data=None)

Publish a job-level event to the EventBus.

Parameters:

Name Type Description Default
job_id str

The job identifier.

required
event_name str

Event name (e.g., "job.started", "job.completed").

required
data dict[str, Any] | None

Optional event data.

None
Source code in src/marianne/daemon/baton/adapter.py
async def publish_job_event(
    self,
    job_id: str,
    event_name: str,
    data: dict[str, Any] | None = None,
) -> None:
    """Publish a job-level event to the EventBus.

    Args:
        job_id: The job identifier.
        event_name: Event name (e.g., "job.started", "job.completed").
        data: Optional event data.
    """
    if self._event_bus is None:
        return

    event: ObserverEvent = {
        "job_id": job_id,
        "sheet_num": 0,
        "event": event_name,
        "data": data or {},
        "timestamp": time.time(),
    }
    try:
        await self._event_bus.publish(event)
    except Exception:
        _logger.warning(
            "adapter.job_event_publish_failed",
            extra={"job_id": job_id, "event": event_name},
            exc_info=True,
        )
run async
run()

Run the baton's event loop with dispatch integration.

Processes events from the inbox, updates state, dispatches ready sheets, and publishes events to the EventBus.

Runs until the baton receives a ShutdownRequested event.

Source code in src/marianne/daemon/baton/adapter.py
async def run(self) -> None:
    """Run the baton's event loop with dispatch integration.

    Processes events from the inbox, updates state, dispatches
    ready sheets, and publishes events to the EventBus.

    Runs until the baton receives a ShutdownRequested event.
    """
    from marianne.daemon.baton.dispatch import dispatch_ready

    self._running = True
    _logger.info("adapter.started")

    # Start the timer wheel drain task — fires scheduled events
    # (rate limit expiry, retry delays) into the baton's inbox.
    # Wrapped in a restart loop so a crash doesn't silently kill
    # all timer-based recovery (rate limit expiry, retry backoff).
    async def _timer_with_restart() -> None:
        while not self._baton._shutting_down:
            try:
                await self._timer_wheel.run()
            except asyncio.CancelledError:
                raise
            except Exception:
                _logger.error(
                    "adapter.timer_wheel_crashed",
                    exc_info=True,
                )
                await asyncio.sleep(1.0)  # Brief pause before restart

    timer_task = asyncio.create_task(
        _timer_with_restart(), name="baton-timer-wheel"
    )

    try:
        # Initial dispatch cycle — pick up PENDING sheets from resume/restart
        # without waiting for the first event. Without this, sheets mapped
        # from WAITING→PENDING on restart sit idle until an unrelated event
        # (e.g., a completion from another job) triggers dispatch.
        config = self._baton.build_dispatch_config(
            max_concurrent_sheets=self._max_concurrent_sheets,
        )
        initial_result = await dispatch_ready(
            self._baton, config, self._dispatch_callback
        )
        if initial_result.dispatched_sheets:
            _logger.info(
                "adapter.initial_dispatch",
                extra={
                    "count": len(initial_result.dispatched_sheets),
                    "sheets": [
                        f"{jid}:{sn}"
                        for jid, sn in initial_result.dispatched_sheets
                    ],
                },
            )
            for d_job_id, _d_sheet_num in initial_result.dispatched_sheets:
                if self._persist_callback:
                    self._persist_callback(d_job_id)

        while not self._baton._shutting_down:
            event = await self._baton.inbox.get()

            _logger.debug(
                "adapter.event_loop.received",
                event_type=type(event).__name__,
                queue_size=self._baton.inbox.qsize(),
            )

            # Intercept StaleCheck: only fail if the musician task
            # is actually dead. If the task is alive, the sheet isn't
            # stale — the backend is still executing. Reschedule.
            if isinstance(event, StaleCheck):
                task_key = (event.job_id, event.sheet_num)
                task = self._active_tasks.get(task_key)
                if task is not None and not task.done():
                    # Task alive — not stale, reschedule in 60s
                    self._timer_wheel.schedule(
                        60.0,
                        StaleCheck(
                            job_id=event.job_id,
                            sheet_num=event.sheet_num,
                        ),
                    )
                    # Still pass to baton for logging
                    await self._baton.handle_event(event)
                else:
                    # Task dead with no result — actually stale
                    state = self._baton.get_sheet_state(
                        event.job_id, event.sheet_num,
                    )
                    if (
                        state is not None
                        and state.status == BatonSheetStatus.DISPATCHED
                    ):
                        _logger.warning(
                            "adapter.stale_check.task_dead",
                            extra={
                                "job_id": event.job_id,
                                "sheet_num": event.sheet_num,
                            },
                        )
                        # Inject synthetic failure
                        from marianne.daemon.baton.events import (
                            SheetAttemptResult as SAR,
                        )
                        self._baton.inbox.put_nowait(SAR(
                            job_id=event.job_id,
                            sheet_num=event.sheet_num,
                            instrument_name=state.instrument_name or "",
                            attempt=state.normal_attempts + 1,
                            execution_success=False,
                            error_classification="STALE",
                            error_message=(
                                f"Sheet {event.sheet_num}: musician task "
                                f"dead with no result reported"
                            ),
                        ))
                    await self._baton.handle_event(event)
            else:
                await self._baton.handle_event(event)

            # Phase 2: persist to registry if state changed.
            # The baton writes directly to SheetState objects in
            # _live_states — no sync needed. Just persist.
            if self._baton._state_dirty and self._persist_callback:
                self._persist_dirty_jobs()
                self._baton._state_dirty = False

            # Dispatch ready sheets after every event
            config = self._baton.build_dispatch_config(
                max_concurrent_sheets=self._max_concurrent_sheets,
            )
            dispatch_result = await dispatch_ready(
                self._baton, config, self._dispatch_callback
            )

            # Sync dispatched sheets so status display shows in_progress
            if dispatch_result.dispatched_sheets:
                _logger.info(
                    "adapter.dispatch_sync",
                    extra={
                        "count": len(dispatch_result.dispatched_sheets),
                        "sheets": [
                            f"{jid}:{sn}"
                            for jid, sn in dispatch_result.dispatched_sheets
                        ],
                    },
                )
            for d_job_id, d_sheet_num in dispatch_result.dispatched_sheets:
                # Persist dispatch state (sheet moved to DISPATCHED)
                if self._persist_callback:
                    self._persist_callback(d_job_id)
                # Schedule stale detection using per-sheet timeout.
                # If the sheet completes normally, the StaleCheck handler
                # finds it non-DISPATCHED and is a no-op.
                d_state = self._baton.get_sheet_state(d_job_id, d_sheet_num)
                stale_delay = (
                    getattr(d_state, "sheet_timeout_seconds", 1800.0)
                    if d_state else 1800.0
                ) + 60.0  # buffer beyond timeout
                self._timer_wheel.schedule(
                    stale_delay,
                    StaleCheck(job_id=d_job_id, sheet_num=d_sheet_num),
                )

            # Publish any fallback events to EventBus
            await self._publish_fallback_events()

            # Check for job completions after dispatch
            self._check_completions()

    except asyncio.CancelledError:
        _logger.info("adapter.cancelled")
        raise
    finally:
        timer_task.cancel()
        await self._timer_wheel.shutdown()
        self._running = False
        _logger.info("adapter.stopped")
shutdown async
shutdown()

Gracefully shut down the adapter.

Cancels all active musician tasks and closes the backend pool.

Source code in src/marianne/daemon/baton/adapter.py
async def shutdown(self) -> None:
    """Gracefully shut down the adapter.

    Cancels all active musician tasks and closes the backend pool.
    """
    # Cancel all active tasks
    for task in self._active_tasks.values():
        task.cancel()
    self._active_tasks.clear()

    # Close backend pool
    if self._backend_pool is not None:
        try:
            await self._backend_pool.close_all()
        except Exception:
            _logger.warning("adapter.pool_close_failed", exc_info=True)

    _logger.info("adapter.shutdown_complete")

Functions

baton_to_checkpoint_status

baton_to_checkpoint_status(status)

Identity mapping — Phase 2 unified the enums.

Kept for backward compatibility with tests that import this function. Since BatonSheetStatus IS SheetStatus, this is just status.value.

Source code in src/marianne/daemon/baton/adapter.py
def baton_to_checkpoint_status(status: BatonSheetStatus) -> str:
    """Identity mapping — Phase 2 unified the enums.

    Kept for backward compatibility with tests that import this function.
    Since BatonSheetStatus IS SheetStatus, this is just status.value.
    """
    return status.value

checkpoint_to_baton_status

checkpoint_to_baton_status(status)

Reconstruct SheetStatus from string — Phase 2 unified the enums.

Kept for backward compatibility with tests that import this function.

Source code in src/marianne/daemon/baton/adapter.py
def checkpoint_to_baton_status(status: str) -> BatonSheetStatus:
    """Reconstruct SheetStatus from string — Phase 2 unified the enums.

    Kept for backward compatibility with tests that import this function.
    """
    return BatonSheetStatus(status)

attempt_result_to_observer_event

attempt_result_to_observer_event(result)

Convert a SheetAttemptResult to the ObserverEvent format.

Maps baton musician results to the event names the EventBus subscribers expect (dashboard, learning hub, notifications).

Parameters:

Name Type Description Default
result SheetAttemptResult

The musician's execution report.

required

Returns:

Type Description
ObserverEvent

Dict matching the ObserverEvent TypedDict shape.

Source code in src/marianne/daemon/baton/adapter.py
def attempt_result_to_observer_event(
    result: SheetAttemptResult,
) -> ObserverEvent:
    """Convert a SheetAttemptResult to the ObserverEvent format.

    Maps baton musician results to the event names the EventBus
    subscribers expect (dashboard, learning hub, notifications).

    Args:
        result: The musician's execution report.

    Returns:
        Dict matching the ObserverEvent TypedDict shape.
    """
    if result.rate_limited:
        event_name = "rate_limit.active"
    elif result.execution_success and result.validation_pass_rate >= 100.0:
        event_name = "sheet.completed"
    elif result.execution_success:
        event_name = "sheet.partial"
    else:
        event_name = "sheet.failed"

    return {
        "job_id": result.job_id,
        "sheet_num": result.sheet_num,
        "event": event_name,
        "data": {
            "instrument": result.instrument_name,
            "attempt": result.attempt,
            "success": result.execution_success,
            VALIDATION_PASS_RATE_KEY: result.validation_pass_rate,
            "cost_usd": result.cost_usd,
            "duration_seconds": result.duration_seconds,
            "rate_limited": result.rate_limited,
            "error_classification": result.error_classification,
            "model_used": result.model_used,
        },
        "timestamp": result.timestamp,
    }

skipped_to_observer_event

skipped_to_observer_event(event)

Convert a SheetSkipped event to ObserverEvent format.

Parameters:

Name Type Description Default
event SheetSkipped

The sheet skip event.

required

Returns:

Type Description
ObserverEvent

Dict matching the ObserverEvent TypedDict shape.

Source code in src/marianne/daemon/baton/adapter.py
def skipped_to_observer_event(event: SheetSkipped) -> ObserverEvent:
    """Convert a SheetSkipped event to ObserverEvent format.

    Args:
        event: The sheet skip event.

    Returns:
        Dict matching the ObserverEvent TypedDict shape.
    """
    return {
        "job_id": event.job_id,
        "sheet_num": event.sheet_num,
        "event": "sheet.skipped",
        "data": {"reason": event.reason},
        "timestamp": event.timestamp,
    }

sheets_to_execution_states

sheets_to_execution_states(sheets, *, max_retries=3, max_completion=5)

Convert Sheet entities to SheetExecutionState dict for baton registration.

Each Sheet becomes a SheetExecutionState with the baton's extended tracking fields. The sheet_num is the dict key.

Parameters:

Name Type Description Default
sheets list[Sheet]

List of Sheet entities from build_sheets().

required
max_retries int

Maximum normal retry attempts per sheet.

3
max_completion int

Maximum completion mode attempts per sheet.

5

Returns:

Type Description
dict[int, SheetExecutionState]

Dict of sheet_num → SheetExecutionState.

Source code in src/marianne/daemon/baton/adapter.py
def sheets_to_execution_states(
    sheets: list[Sheet],
    *,
    max_retries: int = 3,
    max_completion: int = 5,
) -> dict[int, SheetExecutionState]:
    """Convert Sheet entities to SheetExecutionState dict for baton registration.

    Each Sheet becomes a SheetExecutionState with the baton's extended
    tracking fields. The sheet_num is the dict key.

    Args:
        sheets: List of Sheet entities from build_sheets().
        max_retries: Maximum normal retry attempts per sheet.
        max_completion: Maximum completion mode attempts per sheet.

    Returns:
        Dict of sheet_num → SheetExecutionState.
    """
    states: dict[int, SheetExecutionState] = {}
    for sheet in sheets:
        raw_model = sheet.instrument_config.get("model")
        states[sheet.num] = SheetExecutionState(
            sheet_num=sheet.num,
            instrument_name=sheet.instrument_name,
            model=str(raw_model) if raw_model is not None else None,
            max_retries=max_retries,
            max_completion=max_completion,
            fallback_chain=list(sheet.instrument_fallbacks),
            sheet_timeout_seconds=sheet.timeout_seconds,
        )
    return states

extract_dependencies

extract_dependencies(config)

Extract baton-compatible dependency graph from a JobConfig.

The baton expects: {sheet_num: [dep_sheet_num, ...]}

When config.sheet.dependencies is set (non-empty), it is used as the authoritative DAG. Stage-level dependencies are expanded to sheet-level: if stage S has a fan-out of 3 (sheets 4,5,6) and depends on stage T (sheets 1,2,3), each of 4/5/6 depends on all of 1/2/3. Stages not listed in the dependencies map are treated as having no dependencies (independent).

When config.sheet.dependencies is empty or absent, falls back to the legacy linear chain: all sheets in stage N+1 depend on all sheets in stage N.

Parameters:

Name Type Description Default
config Any

Parsed JobConfig with sheet.get_fan_out_metadata().

required

Returns:

Type Description
dict[int, list[int]]

Dict of sheet_num → list of dependency sheet_nums.

Source code in src/marianne/daemon/baton/adapter.py
def extract_dependencies(config: Any) -> dict[int, list[int]]:
    """Extract baton-compatible dependency graph from a JobConfig.

    The baton expects: ``{sheet_num: [dep_sheet_num, ...]}``

    When ``config.sheet.dependencies`` is set (non-empty), it is used
    as the authoritative DAG.  Stage-level dependencies are expanded to
    sheet-level: if stage S has a fan-out of 3 (sheets 4,5,6) and
    depends on stage T (sheets 1,2,3), each of 4/5/6 depends on all of
    1/2/3.  Stages not listed in the dependencies map are treated as
    having no dependencies (independent).

    When ``config.sheet.dependencies`` is empty or absent, falls back to
    the legacy linear chain: all sheets in stage N+1 depend on all sheets
    in stage N.

    Args:
        config: Parsed JobConfig with sheet.get_fan_out_metadata().

    Returns:
        Dict of sheet_num → list of dependency sheet_nums.
    """
    total = config.sheet.total_sheets

    # Group sheets by stage
    stage_sheets: dict[int, list[int]] = {}
    for num in range(1, total + 1):
        meta = config.sheet.get_fan_out_metadata(num)
        stage = meta.stage
        stage_sheets.setdefault(stage, []).append(num)

    raw_deps = getattr(config.sheet, "dependencies", None)
    yaml_deps: dict[int, list[int]] = (
        raw_deps if isinstance(raw_deps, dict) else {}
    )

    if yaml_deps:
        # Check if dependencies are already expanded to sheet-level.
        # The config model expands stage-level deps to sheet-level at parse
        # time (SheetConfig._expand_fan_out) and clears fan_out={} to prevent
        # re-expansion.  When that has happened, the dep keys are sheet nums
        # (potentially > total_stages) and the values reference sheet nums.
        # We must NOT re-expand already-expanded deps — that produces wrong
        # results (GH#167 variant: double expansion).
        fan_out = getattr(config.sheet, "fan_out", None)
        already_expanded = not fan_out  # fan_out cleared → deps are sheet-level

        if already_expanded:
            _logger.debug(
                "extract_dependencies.using_pre_expanded",
                extra={
                    "sheet_count": total,
                    "dep_entries": len(yaml_deps),
                },
            )
            # Deps are already sheet-level.  Ensure every sheet has an entry
            # (sheets not in the map have no dependencies).
            deps: dict[int, list[int]] = {}
            for num in range(1, total + 1):
                deps[num] = list(yaml_deps.get(num, []))
            return deps

        # Dependencies are still stage-level (fan_out not yet applied, or
        # no fan-out declared).  Expand each stage-level dep to sheet level.
        _logger.debug(
            "extract_dependencies.expanding_stage_deps",
            extra={"stage_count": len(stage_sheets), "dep_edges": len(yaml_deps)},
        )
        deps = {}
        for stage, sheet_nums in stage_sheets.items():
            stage_deps: list[int] = yaml_deps.get(stage, [])
            # Expand: replace each dep-stage with ALL sheets in that stage
            expanded: list[int] = []
            for dep_stage in stage_deps:
                expanded.extend(stage_sheets.get(dep_stage, [dep_stage]))
            for sn in sheet_nums:
                deps[sn] = list(expanded)
        return deps

    # Fallback: linear chain (legacy behavior for scores without
    # explicit dependencies).
    _logger.debug(
        "extract_dependencies.linear_fallback",
        extra={"stage_count": len(stage_sheets)},
    )
    sorted_stages = sorted(stage_sheets.keys())
    deps = {}

    for i, stage in enumerate(sorted_stages):
        if i == 0:
            for num in stage_sheets[stage]:
                deps[num] = []
        else:
            prev_stage = sorted_stages[i - 1]
            prev_sheets = stage_sheets[prev_stage]
            for num in stage_sheets[stage]:
                deps[num] = list(prev_sheets)

    return deps