Skip to content

core

core

Baton core — event inbox, main loop, and sheet registry.

The baton core is the event-driven execution heart of the conductor. It processes events from five sources (musicians, timers, external commands, observer, internal dispatch), maintains per-sheet execution state, resolves ready sheets, and coordinates dispatch.

The main loop is simple by design::

while not shutting_down:
    event = await inbox.get()
    handle(event)        # updates state
    dispatch_ready()     # dispatches sheets if state changed
    persist()            # save if dirty

Everything else — retry logic, rate limit management, cost enforcement — is done inside event handlers, not in separate subsystems.

See: docs/plans/2026-03-26-baton-design.md for the full architecture.

Attributes

Classes

BatonCore

BatonCore(*, timer=None, inbox=None)

The baton's event-driven execution core.

Manages the event inbox, processes events, tracks sheet state across all jobs, resolves ready sheets, and coordinates dispatch.

The baton does NOT own backend execution — it decides WHEN to dispatch, not HOW. Sheet execution is delegated to the musician (via dispatch callbacks registered by the conductor).

Usage::

baton = BatonCore()
baton.register_job("j1", sheets, deps)

# Run the main loop (blocks until shutdown)
await baton.run()

# Or process events manually (for testing)
await baton.handle_event(some_event)

Initialize the baton core.

Parameters:

Name Type Description Default
timer Any | None

Optional TimerWheel for scheduling retry delays. When None, retries are set to RETRY_SCHEDULED without actual timer events (tests or manual event injection).

None
inbox Queue[BatonEvent] | None

Optional pre-created event queue. When provided, allows the caller to share the queue with other components (e.g., TimerWheel) before BatonCore is constructed. When None, a new queue is created.

None
Source code in src/marianne/daemon/baton/core.py
def __init__(
    self,
    *,
    timer: Any | None = None,
    inbox: asyncio.Queue[BatonEvent] | None = None,
) -> None:
    """Initialize the baton core.

    Args:
        timer: Optional TimerWheel for scheduling retry delays.
            When None, retries are set to RETRY_SCHEDULED without
            actual timer events (tests or manual event injection).
        inbox: Optional pre-created event queue. When provided, allows
            the caller to share the queue with other components (e.g.,
            TimerWheel) before BatonCore is constructed. When None,
            a new queue is created.
    """
    self._inbox: asyncio.Queue[BatonEvent] = inbox or asyncio.Queue()
    self._jobs: dict[str, _JobRecord] = {}
    self._instruments: dict[str, InstrumentState] = {}
    self._job_cost_limits: dict[str, float] = {}
    self._sheet_cost_limits: dict[tuple[str, int], float] = {}
    self._shutting_down = False
    self._running = False
    self._state_dirty = False
    self._timer = timer

    # Active rate-limit timer handles per instrument. When a new
    # RateLimitHit arrives for an instrument that already has a pending
    # timer, the old timer is cancelled before scheduling the new one.
    # Without this, stale timers fire prematurely and cause wasted
    # dispatch→rate_limit→WAITING cycles.
    self._rate_limit_timers: dict[str, Any] = {}

    # Active circuit breaker recovery timer handles per instrument.
    # When a circuit breaker trips OPEN, a recovery timer is scheduled.
    # On fire, the instrument transitions OPEN→HALF_OPEN for a probe.
    # GH#169: Without this, sheets blocked by all-OPEN fallback chains
    # stay PENDING forever.
    self._circuit_breaker_timers: dict[str, Any] = {}

    # Fallback event collection — side effects of event processing.
    # The adapter drains these after each event cycle and publishes
    # them to the EventBus for observability (dashboard, learning hub).
    self._fallback_events: list[InstrumentFallback] = []

    # Per-model concurrency limits from instrument profiles.
    # Keys: "instrument:model", values: max_concurrent.
    # Populated by set_model_concurrency() from instrument profiles.
    self._model_concurrency: dict[str, int] = {}

    # Retry backoff configuration (from RetryConfig defaults)
    self._base_retry_delay: float = 10.0
    self._retry_exponential_base: float = 2.0
    self._max_retry_delay: float = 3600.0
Attributes
inbox property
inbox

The event inbox — put events here for the baton to process.

is_running property
is_running

Whether the main loop is currently running.

job_count property
job_count

Number of registered jobs.

running_sheet_count property
running_sheet_count

Number of sheets currently in 'dispatched' status.

Functions
drain_fallback_events
drain_fallback_events()

Return and clear collected InstrumentFallback events.

Called by the adapter after each event cycle to publish fallback events to the EventBus for observability.

Source code in src/marianne/daemon/baton/core.py
def drain_fallback_events(self) -> list[InstrumentFallback]:
    """Return and clear collected InstrumentFallback events.

    Called by the adapter after each event cycle to publish
    fallback events to the EventBus for observability.
    """
    events = list(self._fallback_events)
    self._fallback_events.clear()
    return events
register_instrument
register_instrument(name, *, max_concurrent=4)

Register an instrument for tracking.

If already registered, returns the existing state (idempotent).

Parameters:

Name Type Description Default
name str

Instrument name (matches InstrumentProfile.name).

required
max_concurrent int

Maximum concurrent sheets on this instrument.

4

Returns:

Type Description
InstrumentState

The InstrumentState for the instrument.

Source code in src/marianne/daemon/baton/core.py
def register_instrument(self, name: str, *, max_concurrent: int = 4) -> InstrumentState:
    """Register an instrument for tracking.

    If already registered, returns the existing state (idempotent).

    Args:
        name: Instrument name (matches InstrumentProfile.name).
        max_concurrent: Maximum concurrent sheets on this instrument.

    Returns:
        The InstrumentState for the instrument.
    """
    if name in self._instruments:
        return self._instruments[name]

    state = InstrumentState(name=name, max_concurrent=max_concurrent)
    self._instruments[name] = state
    _logger.debug(
        "baton.instrument_registered",
        extra={"instrument": name, "max_concurrent": max_concurrent},
    )
    return state
set_model_concurrency
set_model_concurrency(instrument, model, max_concurrent)

Set per-model concurrency limit from instrument profile data.

Called during adapter initialization from loaded InstrumentProfiles.

Source code in src/marianne/daemon/baton/core.py
def set_model_concurrency(
    self,
    instrument: str,
    model: str,
    max_concurrent: int,
) -> None:
    """Set per-model concurrency limit from instrument profile data.

    Called during adapter initialization from loaded InstrumentProfiles.
    """
    key = f"{instrument}:{model}"
    self._model_concurrency[key] = max_concurrent
get_instrument_state
get_instrument_state(name)

Get the tracking state for a specific instrument.

Source code in src/marianne/daemon/baton/core.py
def get_instrument_state(self, name: str) -> InstrumentState | None:
    """Get the tracking state for a specific instrument."""
    return self._instruments.get(name)
build_dispatch_config
build_dispatch_config(*, max_concurrent_sheets=10)

Build a DispatchConfig from the current instrument state.

This bridges the gap between the baton's instrument tracking and the dispatch logic's configuration needs. Called before each dispatch cycle.

Parameters:

Name Type Description Default
max_concurrent_sheets int

Global concurrency ceiling.

10

Returns:

Type Description
DispatchConfig

DispatchConfig with rate-limited instruments, open circuit

DispatchConfig

breakers, and per-instrument concurrency limits derived

DispatchConfig

from the current InstrumentState.

Source code in src/marianne/daemon/baton/core.py
def build_dispatch_config(self, *, max_concurrent_sheets: int = 10) -> DispatchConfig:
    """Build a DispatchConfig from the current instrument state.

    This bridges the gap between the baton's instrument tracking
    and the dispatch logic's configuration needs. Called before
    each dispatch cycle.

    Args:
        max_concurrent_sheets: Global concurrency ceiling.

    Returns:
        DispatchConfig with rate-limited instruments, open circuit
        breakers, and per-instrument concurrency limits derived
        from the current InstrumentState.
    """
    # Deferred import to break circular dependency
    # (dispatch.py imports BatonCore at runtime)
    from marianne.daemon.baton.dispatch import DispatchConfig  # noqa: N814

    rate_limited: set[str] = set()
    open_breakers: set[str] = set()
    concurrency: dict[str, int] = {}

    for name, inst in self._instruments.items():
        if inst.rate_limited:
            rate_limited.add(name)
        if inst.circuit_breaker == CircuitBreakerState.OPEN:
            open_breakers.add(name)
        concurrency[name] = inst.max_concurrent

    return DispatchConfig(
        max_concurrent_sheets=max_concurrent_sheets,
        instrument_concurrency=concurrency,
        model_concurrency=dict(self._model_concurrency),
        rate_limited_instruments=rate_limited,
        open_circuit_breakers=open_breakers,
    )
set_job_cost_limit
set_job_cost_limit(job_id, max_cost_usd)

Set a per-job cost limit. The baton pauses the job when exceeded.

Parameters:

Name Type Description Default
job_id str

The job to set the limit for.

required
max_cost_usd float

Maximum total cost in USD.

required
Source code in src/marianne/daemon/baton/core.py
def set_job_cost_limit(self, job_id: str, max_cost_usd: float) -> None:
    """Set a per-job cost limit. The baton pauses the job when exceeded.

    Args:
        job_id: The job to set the limit for.
        max_cost_usd: Maximum total cost in USD.
    """
    self._job_cost_limits[job_id] = max_cost_usd
get_rate_limited_instruments
get_rate_limited_instruments()

Get the set of currently rate-limited instrument names.

Used by dispatch logic to skip rate-limited instruments.

Source code in src/marianne/daemon/baton/core.py
def get_rate_limited_instruments(self) -> set[str]:
    """Get the set of currently rate-limited instrument names.

    Used by dispatch logic to skip rate-limited instruments.
    """
    return {name for name, inst in self._instruments.items() if inst.rate_limited}
clear_instrument_rate_limit
clear_instrument_rate_limit(instrument=None)

Clear rate limit state on one or all instruments.

Resets rate_limited to False and rate_limit_expires_at to None. Also moves any WAITING sheets on the cleared instrument(s) back to PENDING so they can be re-dispatched.

Parameters:

Name Type Description Default
instrument str | None

Instrument name to clear, or None for all.

None

Returns:

Type Description
int

Number of instruments whose rate limit was cleared.

Source code in src/marianne/daemon/baton/core.py
def clear_instrument_rate_limit(
    self,
    instrument: str | None = None,
) -> int:
    """Clear rate limit state on one or all instruments.

    Resets ``rate_limited`` to ``False`` and ``rate_limit_expires_at``
    to ``None``.  Also moves any WAITING sheets on the cleared
    instrument(s) back to PENDING so they can be re-dispatched.

    Args:
        instrument: Instrument name to clear, or ``None`` for all.

    Returns:
        Number of instruments whose rate limit was cleared.
    """
    cleared = 0
    if instrument is not None:
        # Specific instrument — look it up. If not found, targets is
        # empty (return 0). Previously, missing instruments fell through
        # to clear-all — F-200 bug found by Breakpoint M3.
        # Uses `is not None` (not truthiness) to prevent empty string
        # from falling through to clear-all — F-201 (same bug class).
        inst = self._instruments.get(instrument)
        targets = [inst] if inst is not None else []
    else:
        # None → clear all instruments
        targets = list(self._instruments.values())
    for inst in targets:
        if inst.rate_limited:
            inst.rate_limited = False
            inst.rate_limit_expires_at = None
            cleared += 1
            # Move WAITING sheets on this instrument back to PENDING
            for job in self._jobs.values():
                for sheet in job.sheets.values():
                    if (
                        sheet.status == BatonSheetStatus.WAITING
                        and sheet.instrument_name == inst.name
                    ):
                        sheet.status = BatonSheetStatus.PENDING
    if cleared > 0:
        self._state_dirty = True
    return cleared
get_open_circuit_breakers
get_open_circuit_breakers()

Get the set of instruments with open circuit breakers.

Used by dispatch logic to skip unhealthy instruments.

Source code in src/marianne/daemon/baton/core.py
def get_open_circuit_breakers(self) -> set[str]:
    """Get the set of instruments with open circuit breakers.

    Used by dispatch logic to skip unhealthy instruments.
    """
    return {
        name
        for name, inst in self._instruments.items()
        if inst.circuit_breaker == CircuitBreakerState.OPEN
    }
set_sheet_cost_limit
set_sheet_cost_limit(job_id, sheet_num, max_cost_usd)

Set a per-sheet cost limit. The baton fails the sheet when exceeded.

Parameters:

Name Type Description Default
job_id str

The job containing the sheet.

required
sheet_num int

The sheet number.

required
max_cost_usd float

Maximum cost in USD for this sheet.

required
Source code in src/marianne/daemon/baton/core.py
def set_sheet_cost_limit(self, job_id: str, sheet_num: int, max_cost_usd: float) -> None:
    """Set a per-sheet cost limit. The baton fails the sheet when exceeded.

    Args:
        job_id: The job containing the sheet.
        sheet_num: The sheet number.
        max_cost_usd: Maximum cost in USD for this sheet.
    """
    self._sheet_cost_limits[(job_id, sheet_num)] = max_cost_usd
calculate_retry_delay
calculate_retry_delay(attempt)

Calculate retry delay using exponential backoff.

Parameters:

Name Type Description Default
attempt int

0-based attempt index (0 = first retry).

required

Returns:

Type Description
float

Delay in seconds, clamped to _max_retry_delay.

Source code in src/marianne/daemon/baton/core.py
def calculate_retry_delay(self, attempt: int) -> float:
    """Calculate retry delay using exponential backoff.

    Args:
        attempt: 0-based attempt index (0 = first retry).

    Returns:
        Delay in seconds, clamped to ``_max_retry_delay``.
    """
    delay = self._base_retry_delay * (self._retry_exponential_base**attempt)
    return min(delay, self._max_retry_delay)
register_job
register_job(job_id, sheets, dependencies, *, escalation_enabled=False, self_healing_enabled=False, pacing_seconds=0.0)

Register a job's sheets with the baton for scheduling.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
sheets dict[int, SheetExecutionState]

Map of sheet_num → SheetExecutionState.

required
dependencies dict[int, list[int]]

Map of sheet_num → list of dependency sheet_nums. Sheets not in this map have no dependencies.

required
escalation_enabled bool

Whether to enter fermata on exhaustion.

False
self_healing_enabled bool

Whether to try healing on exhaustion.

False
pacing_seconds float

Inter-sheet delay after each completion (from pause_between_sheets_seconds). 0 = no delay.

0.0
Source code in src/marianne/daemon/baton/core.py
def register_job(
    self,
    job_id: str,
    sheets: dict[int, SheetExecutionState],
    dependencies: dict[int, list[int]],
    *,
    escalation_enabled: bool = False,
    self_healing_enabled: bool = False,
    pacing_seconds: float = 0.0,
) -> None:
    """Register a job's sheets with the baton for scheduling.

    Args:
        job_id: Unique job identifier.
        sheets: Map of sheet_num → SheetExecutionState.
        dependencies: Map of sheet_num → list of dependency sheet_nums.
            Sheets not in this map have no dependencies.
        escalation_enabled: Whether to enter fermata on exhaustion.
        self_healing_enabled: Whether to try healing on exhaustion.
        pacing_seconds: Inter-sheet delay after each completion (from
            ``pause_between_sheets_seconds``). 0 = no delay.
    """
    if job_id in self._jobs:
        _logger.warning(
            "baton.register_job.duplicate",
            extra={"job_id": job_id},
        )
        return

    # Auto-register any instruments used by the job's sheets
    self._auto_register_instruments(sheets)

    self._jobs[job_id] = _JobRecord(
        job_id=job_id,
        sheets=sheets,
        dependencies=dependencies,
        escalation_enabled=escalation_enabled,
        self_healing_enabled=self_healing_enabled,
        pacing_seconds=pacing_seconds,
    )
    self._state_dirty = True

    # F-440: Re-propagate failure for any sheet that's already FAILED.
    # During normal execution, _propagate_failure_to_dependents() cascades
    # failure to downstream sheets. But _sync_sheet_status() only fires
    # for SheetAttemptResult/SheetSkipped events, so cascaded failures
    # are NOT synced to the checkpoint. On restart recovery, dependents
    # revert to PENDING while their upstream is FAILED → zombie job.
    # Re-running propagation here is idempotent (only touches non-terminal
    # sheets) and fixes the sync gap for both fresh registration and
    # recovery.
    for sheet_num, sheet in sheets.items():
        if sheet.status == BatonSheetStatus.FAILED:
            self._propagate_failure_to_dependents(job_id, sheet_num)

    _logger.info(
        "baton.job_registered",
        extra={
            "job_id": job_id,
            "sheet_count": len(sheets),
            "dependency_count": len(dependencies),
        },
    )
deregister_job
deregister_job(job_id)

Remove a job from the baton's tracking.

Cleans up all per-job state including cost limit entries to prevent memory leaks in long-running conductors (F-062).

Source code in src/marianne/daemon/baton/core.py
def deregister_job(self, job_id: str) -> None:
    """Remove a job from the baton's tracking.

    Cleans up all per-job state including cost limit entries to
    prevent memory leaks in long-running conductors (F-062).
    """
    if job_id in self._jobs:
        del self._jobs[job_id]
        # F-062: Clean up cost limit dicts to prevent memory leaks
        self._job_cost_limits.pop(job_id, None)
        # Remove sheet cost limits for this job
        sheet_keys_to_remove = [key for key in self._sheet_cost_limits if key[0] == job_id]
        for key in sheet_keys_to_remove:
            del self._sheet_cost_limits[key]
        self._state_dirty = True
        _logger.info("baton.job_deregistered", extra={"job_id": job_id})
get_sheet_state
get_sheet_state(job_id, sheet_num)

Get the scheduling state for a specific sheet.

Source code in src/marianne/daemon/baton/core.py
def get_sheet_state(self, job_id: str, sheet_num: int) -> SheetExecutionState | None:
    """Get the scheduling state for a specific sheet."""
    job = self._jobs.get(job_id)
    if job is None:
        return None
    return job.sheets.get(sheet_num)
is_job_paused
is_job_paused(job_id)

Check if a job's dispatch is paused.

Source code in src/marianne/daemon/baton/core.py
def is_job_paused(self, job_id: str) -> bool:
    """Check if a job's dispatch is paused."""
    job = self._jobs.get(job_id)
    return job.paused if job is not None else False
is_job_complete
is_job_complete(job_id)

Check if all sheets in a job are in terminal state.

Source code in src/marianne/daemon/baton/core.py
def is_job_complete(self, job_id: str) -> bool:
    """Check if all sheets in a job are in terminal state."""
    job = self._jobs.get(job_id)
    if job is None:
        return False
    return all(sheet.status in _TERMINAL_BATON_STATUSES for sheet in job.sheets.values())
get_ready_sheets
get_ready_sheets(job_id)

Find sheets that are ready to dispatch.

A sheet is ready when: 1. Status is 'pending' or 'ready' 2. All dependencies are satisfied (completed or skipped) 3. The job is not paused

Source code in src/marianne/daemon/baton/core.py
def get_ready_sheets(self, job_id: str) -> list[SheetExecutionState]:
    """Find sheets that are ready to dispatch.

    A sheet is ready when:
    1. Status is 'pending' or 'ready'
    2. All dependencies are satisfied (completed or skipped)
    3. The job is not paused
    """
    job = self._jobs.get(job_id)
    if job is None or job.paused or job.pacing_active:
        return []

    ready: list[SheetExecutionState] = []
    for sheet_num, sheet in job.sheets.items():
        if sheet.status not in _DISPATCHABLE_BATON_STATUSES:
            continue

        # Check dependencies
        deps = job.dependencies.get(sheet_num, [])
        deps_satisfied = all(self._is_dependency_satisfied(job, dep) for dep in deps)
        if deps_satisfied:
            ready.append(sheet)

    return ready
handle_event async
handle_event(event)

Process a single event. Updates state but does NOT dispatch.

This is the core decision-making method. Each event type has a specific handler that updates sheet state.

Per the baton spec: handler exceptions are logged, not re-raised. The baton continues processing subsequent events.

Source code in src/marianne/daemon/baton/core.py
async def handle_event(self, event: BatonEvent) -> None:
    """Process a single event. Updates state but does NOT dispatch.

    This is the core decision-making method. Each event type has
    a specific handler that updates sheet state.

    Per the baton spec: handler exceptions are logged, not re-raised.
    The baton continues processing subsequent events.
    """
    try:
        match event:
            # === Musician events ===
            case SheetAttemptResult():
                self._handle_attempt_result(event)

            case SheetSkipped():
                self._handle_sheet_skipped(event)

            case SheetDispatched():
                self._handle_sheet_dispatched(event)

            # === Rate limit events ===
            case RateLimitHit():
                self._handle_rate_limit_hit(event)

            case RateLimitExpired():
                self._handle_rate_limit_expired(event)

            # === Timer events ===
            case RetryDue():
                self._handle_retry_due(event)

            case StaleCheck():
                self._handle_stale_check(event)

            case CronTick():
                _logger.warning(
                    "baton.event.unimplemented",
                    extra={"event_type": "CronTick"},
                )

            case JobTimeout():
                self._handle_job_timeout(event)

            case PacingComplete():
                self._handle_pacing_complete(event)

            # === Escalation events ===
            case EscalationNeeded():
                self._handle_escalation_needed(event)

            case EscalationResolved():
                self._handle_escalation_resolved(event)

            case EscalationTimeout():
                self._handle_escalation_timeout(event)

            # === External command events ===
            case PauseJob():
                self._handle_pause_job(event)

            case ResumeJob():
                self._handle_resume_job(event)

            case CancelJob():
                self._handle_cancel_job(event)

            case ConfigReloaded():
                _logger.warning(
                    "baton.event.unimplemented",
                    extra={"event_type": "ConfigReloaded"},
                )

            case ShutdownRequested():
                self._handle_shutdown(event)

            # === Observer events ===
            case ProcessExited():
                self._handle_process_exited(event)

            case ResourceAnomaly():
                self._handle_resource_anomaly(event)

            # === Instrument fallback events ===
            case InstrumentFallback():
                # InstrumentFallback events are emitted by the baton
                # itself, not received from external sources. They pass
                # through the event bus for observability (dashboard,
                # learning hub, notifications). No handler needed.
                pass

            # === Internal events ===
            case DispatchRetry():
                pass  # Dispatch retry — _dispatch_ready handles this

            case CircuitBreakerRecovery():
                self._handle_circuit_breaker_recovery(event)

            case _:
                _logger.warning(
                    "baton.unknown_event",
                    extra={"event_type": type(event).__name__},
                )

    except Exception:
        _logger.error(
            "baton.event_handler_failed",
            extra={"event_type": type(event).__name__},
            exc_info=True,
        )
run async
run()

The baton's main event loop.

Processes events from the inbox, updates state, and dispatches ready sheets. Runs until a ShutdownRequested event is received.

Source code in src/marianne/daemon/baton/core.py
async def run(self) -> None:
    """The baton's main event loop.

    Processes events from the inbox, updates state, and dispatches
    ready sheets. Runs until a ShutdownRequested event is received.
    """
    self._running = True
    _logger.info("baton.started")

    try:
        while not self._shutting_down:
            event = await self._inbox.get()
            await self.handle_event(event)
            # dispatch_ready() would be called here when wired to
            # conductor
    except asyncio.CancelledError:
        _logger.info("baton.cancelled")
        raise
    finally:
        self._running = False
        _logger.info("baton.stopped")
get_diagnostics
get_diagnostics(job_id)

Get diagnostic information for a job.

Returns a dict with sheet counts by status, instrument state, and other debugging information. Returns None if job not found.

Source code in src/marianne/daemon/baton/core.py
def get_diagnostics(self, job_id: str) -> dict[str, Any] | None:
    """Get diagnostic information for a job.

    Returns a dict with sheet counts by status, instrument state,
    and other debugging information. Returns None if job not found.
    """
    job = self._jobs.get(job_id)
    if job is None:
        return None

    status_counts: dict[str, int] = {}
    instruments_used: set[str] = set()
    for sheet in job.sheets.values():
        status_key = sheet.status.value
        status_counts[status_key] = status_counts.get(status_key, 0) + 1
        if sheet.instrument_name:
            instruments_used.add(sheet.instrument_name)

    return {
        "job_id": job_id,
        "paused": job.paused,
        "sheets": {
            "total": len(job.sheets),
            **status_counts,
        },
        "instruments_used": sorted(instruments_used),
    }

Functions