core
core
¶
Baton core — event inbox, main loop, and sheet registry.
The baton core is the event-driven execution heart of the conductor. It processes events from five sources (musicians, timers, external commands, observer, internal dispatch), maintains per-sheet execution state, resolves ready sheets, and coordinates dispatch.
The main loop is simple by design::
while not shutting_down:
event = await inbox.get()
handle(event) # updates state
dispatch_ready() # dispatches sheets if state changed
persist() # save if dirty
Everything else — retry logic, rate limit management, cost enforcement — is done inside event handlers, not in separate subsystems.
See: docs/plans/2026-03-26-baton-design.md for the full architecture.
Attributes¶
Classes¶
BatonCore
¶
The baton's event-driven execution core.
Manages the event inbox, processes events, tracks sheet state across all jobs, resolves ready sheets, and coordinates dispatch.
The baton does NOT own backend execution — it decides WHEN to dispatch, not HOW. Sheet execution is delegated to the musician (via dispatch callbacks registered by the conductor).
Usage::
baton = BatonCore()
baton.register_job("j1", sheets, deps)
# Run the main loop (blocks until shutdown)
await baton.run()
# Or process events manually (for testing)
await baton.handle_event(some_event)
Initialize the baton core.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timer
|
Any | None
|
Optional TimerWheel for scheduling retry delays. When None, retries are set to RETRY_SCHEDULED without actual timer events (tests or manual event injection). |
None
|
inbox
|
Queue[BatonEvent] | None
|
Optional pre-created event queue. When provided, allows the caller to share the queue with other components (e.g., TimerWheel) before BatonCore is constructed. When None, a new queue is created. |
None
|
Source code in src/marianne/daemon/baton/core.py
Attributes¶
running_sheet_count
property
¶
Number of sheets currently in 'dispatched' status.
Functions¶
drain_fallback_events
¶
Return and clear collected InstrumentFallback events.
Called by the adapter after each event cycle to publish fallback events to the EventBus for observability.
Source code in src/marianne/daemon/baton/core.py
register_instrument
¶
Register an instrument for tracking.
If already registered, returns the existing state (idempotent).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Instrument name (matches InstrumentProfile.name). |
required |
max_concurrent
|
int
|
Maximum concurrent sheets on this instrument. |
4
|
Returns:
| Type | Description |
|---|---|
InstrumentState
|
The InstrumentState for the instrument. |
Source code in src/marianne/daemon/baton/core.py
set_model_concurrency
¶
Set per-model concurrency limit from instrument profile data.
Called during adapter initialization from loaded InstrumentProfiles.
Source code in src/marianne/daemon/baton/core.py
get_instrument_state
¶
build_dispatch_config
¶
Build a DispatchConfig from the current instrument state.
This bridges the gap between the baton's instrument tracking and the dispatch logic's configuration needs. Called before each dispatch cycle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_concurrent_sheets
|
int
|
Global concurrency ceiling. |
10
|
Returns:
| Type | Description |
|---|---|
DispatchConfig
|
DispatchConfig with rate-limited instruments, open circuit |
DispatchConfig
|
breakers, and per-instrument concurrency limits derived |
DispatchConfig
|
from the current InstrumentState. |
Source code in src/marianne/daemon/baton/core.py
set_job_cost_limit
¶
Set a per-job cost limit. The baton pauses the job when exceeded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job to set the limit for. |
required |
max_cost_usd
|
float
|
Maximum total cost in USD. |
required |
Source code in src/marianne/daemon/baton/core.py
get_rate_limited_instruments
¶
Get the set of currently rate-limited instrument names.
Used by dispatch logic to skip rate-limited instruments.
Source code in src/marianne/daemon/baton/core.py
clear_instrument_rate_limit
¶
Clear rate limit state on one or all instruments.
Resets rate_limited to False and rate_limit_expires_at
to None. Also moves any WAITING sheets on the cleared
instrument(s) back to PENDING so they can be re-dispatched.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instrument
|
str | None
|
Instrument name to clear, or |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of instruments whose rate limit was cleared. |
Source code in src/marianne/daemon/baton/core.py
get_open_circuit_breakers
¶
Get the set of instruments with open circuit breakers.
Used by dispatch logic to skip unhealthy instruments.
Source code in src/marianne/daemon/baton/core.py
set_sheet_cost_limit
¶
Set a per-sheet cost limit. The baton fails the sheet when exceeded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job containing the sheet. |
required |
sheet_num
|
int
|
The sheet number. |
required |
max_cost_usd
|
float
|
Maximum cost in USD for this sheet. |
required |
Source code in src/marianne/daemon/baton/core.py
calculate_retry_delay
¶
Calculate retry delay using exponential backoff.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attempt
|
int
|
0-based attempt index (0 = first retry). |
required |
Returns:
| Type | Description |
|---|---|
float
|
Delay in seconds, clamped to |
Source code in src/marianne/daemon/baton/core.py
register_job
¶
register_job(job_id, sheets, dependencies, *, escalation_enabled=False, self_healing_enabled=False, pacing_seconds=0.0)
Register a job's sheets with the baton for scheduling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
sheets
|
dict[int, SheetExecutionState]
|
Map of sheet_num → SheetExecutionState. |
required |
dependencies
|
dict[int, list[int]]
|
Map of sheet_num → list of dependency sheet_nums. Sheets not in this map have no dependencies. |
required |
escalation_enabled
|
bool
|
Whether to enter fermata on exhaustion. |
False
|
self_healing_enabled
|
bool
|
Whether to try healing on exhaustion. |
False
|
pacing_seconds
|
float
|
Inter-sheet delay after each completion (from
|
0.0
|
Source code in src/marianne/daemon/baton/core.py
deregister_job
¶
Remove a job from the baton's tracking.
Cleans up all per-job state including cost limit entries to prevent memory leaks in long-running conductors (F-062).
Source code in src/marianne/daemon/baton/core.py
get_sheet_state
¶
Get the scheduling state for a specific sheet.
Source code in src/marianne/daemon/baton/core.py
is_job_paused
¶
is_job_complete
¶
Check if all sheets in a job are in terminal state.
Source code in src/marianne/daemon/baton/core.py
get_ready_sheets
¶
Find sheets that are ready to dispatch.
A sheet is ready when: 1. Status is 'pending' or 'ready' 2. All dependencies are satisfied (completed or skipped) 3. The job is not paused
Source code in src/marianne/daemon/baton/core.py
handle_event
async
¶
Process a single event. Updates state but does NOT dispatch.
This is the core decision-making method. Each event type has a specific handler that updates sheet state.
Per the baton spec: handler exceptions are logged, not re-raised. The baton continues processing subsequent events.
Source code in src/marianne/daemon/baton/core.py
972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 | |
run
async
¶
The baton's main event loop.
Processes events from the inbox, updates state, and dispatches ready sheets. Runs until a ShutdownRequested event is received.
Source code in src/marianne/daemon/baton/core.py
get_diagnostics
¶
Get diagnostic information for a job.
Returns a dict with sheet counts by status, instrument state, and other debugging information. Returns None if job not found.