Skip to content

events

events

BatonEvent types — the vocabulary of the conductor's execution heart.

Every event the baton can receive is defined here as a frozen dataclass. Events are the baton's sole input mechanism — it sleeps until an event arrives, processes it, checks for dispatchable sheets, and sleeps again.

Events come from five sources:

  • Musicians — sheet execution results (SheetAttemptResult, SheetSkipped)
  • Timer wheel — scheduled future events (RetryDue, RateLimitExpired, StaleCheck, CronTick, JobTimeout, EscalationTimeout, PacingComplete)
  • External commands — CLI/IPC/dashboard (PauseJob, ResumeJob, CancelJob, ConfigReloaded, EscalationResolved, ShutdownRequested)
  • Observer — runtime monitoring (ProcessExited, ResourceAnomaly)
  • Internal — dispatch coordination (DispatchRetry)

Design decisions:

  • All events are frozen dataclasses — immutable after creation, safe to pass between asyncio tasks without copying.
  • BatonEvent is the union type used for inbox typing.
  • Events carry only the data needed for the baton to make a decision. They do not carry the full execution output (that's in CheckpointState).
  • Events are compatible with the existing ObserverEvent TypedDict format via the to_observer_event() method for EventBus integration.

See: docs/plans/2026-03-26-baton-design.md for the full architecture.

Attributes

Classes

SheetAttemptResult dataclass

SheetAttemptResult(job_id, sheet_num, instrument_name, attempt, execution_success=True, exit_code=None, duration_seconds=0.0, validations_passed=0, validations_total=0, validation_pass_rate=0.0, validation_details=None, error_classification=None, error_message=None, rate_limited=False, rate_limit_wait_seconds=None, cost_usd=0.0, input_tokens=0, output_tokens=0, model_used=None, stdout_tail='', stderr_tail='', timestamp=time())

A musician reports the result of a single sheet attempt.

This is the central event in the baton's event loop. The musician plays once and reports in full detail. The conductor (baton) decides what happens next — retry, completion mode, healing, escalation, or accept.

Rate limits are NOT failures. When rate_limited is True, the baton re-queues the sheet for when the instrument recovers. No retry budget is consumed.

Attributes
validation_pass_rate class-attribute instance-attribute
validation_pass_rate = 0.0

Percentage of validations that passed (0.0-100.0).

CRITICAL CONTRACT (F-018): Set to 100.0 when execution succeeds with no validation rules OR when all validations pass. The baton treats the default (0.0) as "all validations failed" and will retry.

A musician that reports execution_success=True with validations_total=0 but leaves this at 0.0 will trigger unnecessary retries until max_retries is exhausted.

rate_limit_wait_seconds class-attribute instance-attribute
rate_limit_wait_seconds = None

Parsed wait duration from the API's rate limit error message.

When set, the baton uses this instead of the default 60s for scheduling the recovery timer. This is the actual duration the API told us to wait.

SheetSkipped dataclass

SheetSkipped(job_id, sheet_num, reason, timestamp=time())

A sheet was skipped due to skip_when condition or start_sheet override.

The baton propagates skip state to dependents — downstream sheets that depend on a skipped sheet receive a skip sentinel, not empty string.

SheetDispatched dataclass

SheetDispatched(job_id, sheet_num, instrument, timestamp=monotonic())

A sheet has been dispatched to a musician for execution.

Emitted by dispatch_ready() after the dispatch callback succeeds. The baton sets DISPATCHED status and records the dispatch timestamp.

RateLimitHit dataclass

RateLimitHit(instrument, wait_seconds, job_id, sheet_num, model=None, timestamp=time())

An instrument hit a rate limit. NOT a failure — a tempo change.

The baton marks the instrument as rate-limited and schedules a timer for recovery. Sheets targeting this instrument move to waiting state. Other instruments are completely unaffected.

RateLimitExpired dataclass

RateLimitExpired(instrument, timestamp=time())

Timer fired — check if the rate-limited instrument is available again.

If the instrument is still unavailable, the baton schedules another timer. If available, sheets waiting on this instrument become ready.

RetryDue dataclass

RetryDue(job_id, sheet_num, timestamp=time())

Timer fired — a previously failed sheet is ready for retry.

The baton moves the sheet from retry-scheduled to ready state. The next dispatch cycle will pick it up.

StaleCheck dataclass

StaleCheck(job_id, sheet_num, timestamp=time())

Timer fired — check if a running sheet has gone stale.

If no output progress has been received within the configured idle timeout, the baton kills the stale sheet and reschedules or fails it.

CronTick dataclass

CronTick(entry_name, score_path, timestamp=time())

Timer fired — a cron-scheduled job should be submitted.

The baton submits the configured score as a new job and schedules the next tick. If a previous run is still active, this tick is skipped.

JobTimeout dataclass

JobTimeout(job_id, timestamp=time())

Timer fired — a job has exceeded its wall-clock time limit.

The baton cancels all remaining sheets for this job.

PacingComplete dataclass

PacingComplete(job_id, timestamp=time())

Timer fired — the inter-sheet pacing delay for a job has elapsed.

The baton clears the pacing flag, allowing the next sheet to dispatch. Implements pause_between_sheets_seconds from score config.

EscalationNeeded dataclass

EscalationNeeded(job_id, sheet_num, reason, options=list(), timestamp=time())

A sheet execution requires composer judgment — enter fermata.

The baton pauses the job's dispatch and notifies the composer (human or AI) via configured channels. A timeout timer is scheduled.

EscalationResolved dataclass

EscalationResolved(job_id, sheet_num, decision, timestamp=time())

The composer has made a decision on a fermata.

The baton applies the decision and resumes dispatching for the job. Arrives via IPC (job.resolve_escalation method).

EscalationTimeout dataclass

EscalationTimeout(job_id, sheet_num, timestamp=time())

Timer fired — no escalation response received within the deadline.

The baton defaults to the safe action: fail the sheet (not the job) and resume dispatching for other sheets.

PauseJob dataclass

PauseJob(job_id, timestamp=time())

Pause dispatching for a job. In-flight sheets continue to completion.

No new sheets are dispatched until ResumeJob is received. Retry timers are preserved — when resumed, scheduled retries fire normally.

ResumeJob dataclass

ResumeJob(job_id, new_config=None, timestamp=time())

Resume dispatching for a paused job, optionally with new config.

When new_config is provided, pending sheets are rebuilt from the new config. Completed sheets are preserved. Failed sheets being retried use the new config for the retry.

CancelJob dataclass

CancelJob(job_id, timestamp=time())

Cancel all sheets for a job and deregister it from the baton.

In-flight sheet tasks are cancelled. The job is marked as cancelled in CheckpointState.

ConfigReloaded dataclass

ConfigReloaded(job_id, new_config, timestamp=time())

Config has changed for a job (SIGHUP, mzt modify, resume -c).

The baton rebuilds pending sheets from the new config. Completed sheets are preserved. Cost limits may be reset if they changed.

ShutdownRequested dataclass

ShutdownRequested(graceful=True, timestamp=time())

The conductor is shutting down (SIGTERM, mzt stop).

When graceful is True, the baton waits for in-flight sheets to complete (up to the configured drain timeout) before stopping. When False, sheets are cancelled immediately.

ProcessExited dataclass

ProcessExited(job_id, sheet_num, pid, exit_code=None, timestamp=time())

Observer detected that a backend process died unexpectedly.

The baton checks if this was a sheet's backend process and, if so, marks the sheet as crashed — faster than waiting for timeout.

ResourceAnomaly dataclass

ResourceAnomaly(severity, metric, value, timestamp=time())

Observer/monitor detected a resource pressure event.

Critical severity triggers backpressure — the baton stops dispatching new sheets and lets running sheets drain.

InstrumentFallback dataclass

InstrumentFallback(job_id, sheet_num, from_instrument, to_instrument, reason, timestamp=time())

A sheet's instrument was switched to a fallback.

Emitted when the baton moves a sheet from one instrument to the next in its fallback chain. Reasons:

  • "unavailable" — instrument not installed, binary missing, circuit breaker open. Immediate fallback at dispatch time.
  • "rate_limit_exhausted" — all retries on the current instrument hit rate limits with no recovery. Fallback after retry exhaustion.

This is an INFO-level event. Fallback is the system working correctly, not failing. Each fallback instrument gets a fresh retry budget.

A2ATaskSubmitted dataclass

A2ATaskSubmitted(job_id, sheet_num, target_agent, task_description, context=dict(), timestamp=time())

An agent requests a task from another running agent.

The conductor routes the task to the target agent's inbox, persisted in the target's job state. The target picks it up on their next A2A-enabled sheet.

A2ATaskRouted dataclass

A2ATaskRouted(job_id, sheet_num, source_agent, target_agent, task_id, timestamp=time())

The conductor has delivered a task to the target agent's inbox.

Confirms that the task is persisted and waiting for the target agent to process on their next relevant sheet.

A2ATaskCompleted dataclass

A2ATaskCompleted(job_id, sheet_num, task_id, artifacts=dict(), timestamp=time())

A delegated task has been completed with artifacts.

Results are routed back to the requesting agent's inbox. The requester picks them up on their next relevant sheet.

A2ATaskFailed dataclass

A2ATaskFailed(job_id, sheet_num, task_id, reason, timestamp=time())

A delegated task could not be fulfilled.

The requesting agent is notified with the reason for failure so they can decide whether to retry, delegate elsewhere, or handle it themselves.

DispatchRetry dataclass

DispatchRetry(timestamp=time())

Internal signal to retry dispatch after a backpressure delay.

When the baton encounters backpressure during dispatch, it schedules this event via the timer wheel rather than blocking.

CircuitBreakerRecovery dataclass

CircuitBreakerRecovery(instrument, timestamp=time())

Timer fired — check if a circuit-broken instrument can accept a probe.

When a circuit breaker trips OPEN, the baton schedules this event via the timer wheel. On fire, the instrument transitions from OPEN to HALF_OPEN, allowing one probe request through. The next dispatch cycle picks up any PENDING sheets blocked by the dead-ended fallback chain.

If the probe succeeds, the breaker closes. If it fails, the breaker reopens and a new recovery timer is scheduled with increased backoff.

GH#169: Without this, sheets whose entire fallback chain is circuit-broken stay PENDING forever — the score appears RUNNING but is dead.

Functions

to_observer_event

to_observer_event(event)

Convert a BatonEvent to the ObserverEvent TypedDict format.

The baton publishes events to the EventBus so that the dashboard, learning hub, and notification system can consume them. Baton events use the baton.* namespace to distinguish from runner events.

Returns a dict matching the ObserverEvent TypedDict shape: {job_id, sheet_num, event, data, timestamp}

Source code in src/marianne/daemon/baton/events.py
def to_observer_event(event: BatonEvent) -> ObserverEvent:
    """Convert a BatonEvent to the ObserverEvent TypedDict format.

    The baton publishes events to the EventBus so that the dashboard,
    learning hub, and notification system can consume them. Baton events
    use the ``baton.*`` namespace to distinguish from runner events.

    Returns a dict matching the ObserverEvent TypedDict shape:
    ``{job_id, sheet_num, event, data, timestamp}``
    """
    match event:
        case SheetAttemptResult():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.sheet.attempt_result",
                "data": {
                    "instrument": event.instrument_name,
                    "model": event.model_used,
                    "attempt": event.attempt,
                    "success": event.execution_success,
                    VALIDATION_PASS_RATE_KEY: event.validation_pass_rate,
                    "cost_usd": event.cost_usd,
                    "rate_limited": event.rate_limited,
                    "duration_seconds": event.duration_seconds,
                },
                "timestamp": event.timestamp,
            }

        case SheetSkipped():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.sheet.skipped",
                "data": {"reason": event.reason},
                "timestamp": event.timestamp,
            }

        case RateLimitHit():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.rate_limit.active",
                "data": {
                    "instrument": event.instrument,
                    "estimated_seconds": event.wait_seconds,
                },
                "timestamp": event.timestamp,
            }

        case RateLimitExpired():
            return {
                "job_id": "",
                "sheet_num": 0,
                "event": "baton.rate_limit.cleared",
                "data": {"instrument": event.instrument},
                "timestamp": event.timestamp,
            }

        case RetryDue():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.sheet.retry_scheduled",
                "data": {},
                "timestamp": event.timestamp,
            }

        case StaleCheck():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.sheet.stale_check",
                "data": {},
                "timestamp": event.timestamp,
            }

        case CronTick():
            return {
                "job_id": "",
                "sheet_num": 0,
                "event": "baton.cron.fired",
                "data": {
                    "entry_name": event.entry_name,
                    "score_path": event.score_path,
                },
                "timestamp": event.timestamp,
            }

        case JobTimeout():
            return {
                "job_id": event.job_id,
                "sheet_num": 0,
                "event": "baton.job.timeout",
                "data": {},
                "timestamp": event.timestamp,
            }

        case PacingComplete():
            return {
                "job_id": event.job_id,
                "sheet_num": 0,
                "event": "baton.pacing.complete",
                "data": {},
                "timestamp": event.timestamp,
            }

        case EscalationNeeded():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.fermata",
                "data": {
                    "reason": event.reason,
                    "options": event.options,
                },
                "timestamp": event.timestamp,
            }

        case EscalationResolved():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.fermata.resolved",
                "data": {"decision": event.decision},
                "timestamp": event.timestamp,
            }

        case EscalationTimeout():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.fermata.timeout",
                "data": {},
                "timestamp": event.timestamp,
            }

        case PauseJob():
            return {
                "job_id": event.job_id,
                "sheet_num": 0,
                "event": "baton.job.paused",
                "data": {"reason": "user"},
                "timestamp": event.timestamp,
            }

        case ResumeJob():
            return {
                "job_id": event.job_id,
                "sheet_num": 0,
                "event": "baton.job.resumed",
                "data": {"config_changed": event.new_config is not None},
                "timestamp": event.timestamp,
            }

        case CancelJob():
            return {
                "job_id": event.job_id,
                "sheet_num": 0,
                "event": "baton.job.cancelled",
                "data": {},
                "timestamp": event.timestamp,
            }

        case ConfigReloaded():
            return {
                "job_id": event.job_id,
                "sheet_num": 0,
                "event": "baton.config.reloaded",
                "data": {},
                "timestamp": event.timestamp,
            }

        case ShutdownRequested():
            return {
                "job_id": "",
                "sheet_num": 0,
                "event": "baton.shutdown.requested",
                "data": {"graceful": event.graceful},
                "timestamp": event.timestamp,
            }

        case ProcessExited():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.process.exited",
                "data": {
                    "pid": event.pid,
                    "exit_code": event.exit_code,
                },
                "timestamp": event.timestamp,
            }

        case ResourceAnomaly():
            return {
                "job_id": "",
                "sheet_num": 0,
                "event": "baton.resource.anomaly",
                "data": {
                    "severity": event.severity,
                    "metric": event.metric,
                    "value": event.value,
                },
                "timestamp": event.timestamp,
            }

        case InstrumentFallback():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.instrument.fallback",
                "data": {
                    "from_instrument": event.from_instrument,
                    "to_instrument": event.to_instrument,
                    "reason": event.reason,
                },
                "timestamp": event.timestamp,
            }

        case DispatchRetry():
            return {
                "job_id": "",
                "sheet_num": 0,
                "event": "baton.dispatch.retry",
                "data": {},
                "timestamp": event.timestamp,
            }

        case CircuitBreakerRecovery():
            return {
                "job_id": "",
                "sheet_num": 0,
                "event": "baton.circuit_breaker.recovery",
                "data": {"instrument": event.instrument},
                "timestamp": event.timestamp,
            }

        case A2ATaskSubmitted():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.a2a.task.submitted",
                "data": {
                    "target_agent": event.target_agent,
                    "task_description": event.task_description,
                },
                "timestamp": event.timestamp,
            }

        case A2ATaskRouted():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.a2a.task.routed",
                "data": {
                    "source_agent": event.source_agent,
                    "target_agent": event.target_agent,
                    "task_id": event.task_id,
                },
                "timestamp": event.timestamp,
            }

        case A2ATaskCompleted():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.a2a.task.completed",
                "data": {
                    "task_id": event.task_id,
                    "artifacts": event.artifacts,
                },
                "timestamp": event.timestamp,
            }

        case A2ATaskFailed():
            return {
                "job_id": event.job_id,
                "sheet_num": event.sheet_num,
                "event": "baton.a2a.task.failed",
                "data": {
                    "task_id": event.task_id,
                    "reason": event.reason,
                },
                "timestamp": event.timestamp,
            }

    # Unreachable — the match is exhaustive over the union type.
    # If a new event type is added to BatonEvent without a case here,
    # mypy will flag it via type narrowing.
    msg = f"Unknown baton event type: {type(event).__name__}"
    raise ValueError(msg)