adapter
adapter
¶
BatonAdapter — wires the baton into the conductor.
Step 28: Replace monolithic execution with the baton's event-driven model.
The adapter is the bridge between the conductor (JobManager) and the baton (BatonCore). It handles:
-
Job submission → baton registration (Surface 1) JobConfig → Sheet[] → SheetExecutionState[] → BatonCore.register_job()
-
Dispatch callback → backend acquisition (Surface 2) BatonCore dispatches → adapter acquires backend → spawns musician task
-
Prompt assembly (Surface 3) Creates SheetContext, calls PromptBuilder.build_sheet_prompt()
-
State synchronization (Surface 4) BatonSheetStatus ↔ CheckpointState.SheetStatus mapping
-
EventBus integration (Surface 5) Baton events → ObserverEvent format → EventBus.publish()
-
Rate limit callback bridge (Surface 6) Musician extracts wait time → SheetAttemptResult → baton handles
-
Lifecycle (Surface 8) The adapter is always initialized by the conductor at startup
Design decisions: - Checkpoint is source of truth. Baton rebuilds from checkpoint on restart. - Save checkpoint FIRST, then update baton state (prevents re-execution). - The adapter does NOT own the baton's main loop — the manager runs it. - Concert support: sequential score submission (option 1 from wiring analysis).
See: workspaces/v1-beta-v3/movement-2/step-28-wiring-analysis.md
Attributes¶
Classes¶
BatonAdapter
¶
BatonAdapter(*, event_bus=None, max_concurrent_sheets=10, state_sync_callback=None, persist_callback=None)
Bridges the conductor (JobManager) and the baton (BatonCore).
The adapter owns: - A BatonCore instance (event loop, sheet registry, state machine) - A mapping of job_id → Sheet[] (for prompt rendering at dispatch time) - Active musician tasks (asyncio.Task per dispatched sheet)
The adapter does NOT own: - The BackendPool (injected by the manager) - The EventBus (injected by the manager) - The CheckpointState (managed by the manager's state backend)
Usage::
adapter = BatonAdapter(event_bus=bus)
adapter.set_backend_pool(pool)
# Register a job
adapter.register_job("j1", sheets, deps)
# Run the baton (blocks until shutdown)
await adapter.run()
Initialize the BatonAdapter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_bus
|
EventBus | None
|
Optional EventBus for publishing events to subscribers. |
None
|
max_concurrent_sheets
|
int
|
Global concurrency ceiling for dispatch. |
10
|
state_sync_callback
|
StateSyncCallback | None
|
Deprecated — kept for backward compat. Phase 2 uses persist_callback instead. |
None
|
persist_callback
|
PersistCallback | None
|
Called with job_id after significant state transitions (terminal, dispatch) to persist CheckpointState to the registry. Replaces the sync layer. |
None
|
Source code in src/marianne/daemon/baton/adapter.py
Attributes¶
Functions¶
set_backend_pool
¶
Inject the BackendPool for backend acquisition.
Must be called before dispatching any sheets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool
|
BackendPool
|
The backend pool from the manager. |
required |
Source code in src/marianne/daemon/baton/adapter.py
register_job
¶
register_job(job_id, sheets, dependencies, *, max_cost_usd=None, max_retries=3, max_completion=5, escalation_enabled=False, self_healing_enabled=False, prompt_config=None, parallel_enabled=False, cross_sheet=None, pacing_seconds=0.0, live_sheets=None)
Register a job with the baton for event-driven execution.
Converts Sheet entities to SheetExecutionState and registers them with the baton's sheet registry.
Phase 2: when live_sheets is provided, uses those SheetState
objects directly instead of creating new ones. This ensures the
baton writes to the same objects that live in _live_states,
eliminating the need for a sync layer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier (conductor job_id). |
required |
sheets
|
list[Sheet]
|
Sheet entities from build_sheets(). |
required |
dependencies
|
dict[int, list[int]]
|
Dependency graph {sheet_num: [dep_nums]}. |
required |
max_cost_usd
|
float | None
|
Optional per-job cost limit. |
None
|
max_retries
|
int
|
Max normal retry attempts per sheet. |
3
|
max_completion
|
int
|
Max completion mode attempts per sheet. |
5
|
escalation_enabled
|
bool
|
Enter fermata on exhaustion. |
False
|
self_healing_enabled
|
bool
|
Try self-healing on exhaustion. |
False
|
prompt_config
|
PromptConfig | None
|
Optional PromptConfig for full prompt rendering. When provided, creates a PromptRenderer for this job that handles the complete 9-layer prompt assembly pipeline. |
None
|
parallel_enabled
|
bool
|
Whether parallel execution is enabled (for preamble concurrency warning). |
False
|
cross_sheet
|
CrossSheetConfig | None
|
Optional CrossSheetConfig for cross-sheet context (F-210). When provided, the adapter collects previous sheet outputs and workspace files at dispatch time. |
None
|
Source code in src/marianne/daemon/baton/adapter.py
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 | |
deregister_job
¶
Remove a job from the adapter and baton.
Cleans up all per-job state including active tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job to remove. |
required |
Source code in src/marianne/daemon/baton/adapter.py
recover_job
¶
recover_job(job_id, sheets, dependencies, checkpoint, *, max_cost_usd=None, max_retries=3, max_completion=5, escalation_enabled=False, self_healing_enabled=False, prompt_config=None, parallel_enabled=False, cross_sheet=None, pacing_seconds=0.0, live_sheets=None)
Recover a job from a checkpoint after conductor restart.
Rebuilds baton state from the persisted CheckpointState. Terminal sheets (completed, failed, skipped) keep their status. In-progress sheets are reset to PENDING because their musicians died when the conductor restarted. Attempt counts are preserved to avoid infinite retries.
Design invariant: Checkpoint is the source of truth. The baton rebuilds from checkpoint, not the reverse.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
sheets
|
list[Sheet]
|
Sheet entities from build_sheets() — same config that produced the original job. |
required |
dependencies
|
dict[int, list[int]]
|
Dependency graph {sheet_num: [dep_nums]}. |
required |
checkpoint
|
CheckpointState
|
Persisted CheckpointState loaded from workspace. |
required |
max_cost_usd
|
float | None
|
Optional per-job cost limit. |
None
|
max_retries
|
int
|
Max normal retry attempts per sheet. |
3
|
max_completion
|
int
|
Max completion mode attempts per sheet. |
5
|
escalation_enabled
|
bool
|
Enter fermata on exhaustion. |
False
|
self_healing_enabled
|
bool
|
Try self-healing on exhaustion. |
False
|
prompt_config
|
PromptConfig | None
|
Optional PromptConfig for prompt rendering. |
None
|
parallel_enabled
|
bool
|
Whether parallel execution is enabled. |
False
|
cross_sheet
|
CrossSheetConfig | None
|
Optional CrossSheetConfig for cross-sheet context (F-210). |
None
|
Source code in src/marianne/daemon/baton/adapter.py
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 | |
get_sheet
¶
Get a Sheet entity for a registered job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier. |
required |
sheet_num
|
int
|
The sheet number. |
required |
Returns:
| Type | Description |
|---|---|
Sheet | None
|
The Sheet entity, or None if not found. |
Source code in src/marianne/daemon/baton/adapter.py
wait_for_completion
async
¶
Wait until a job reaches terminal state.
Blocks until all sheets in the job are completed, failed, skipped, or cancelled. Used by the manager's _run_job_task to await baton execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job to wait for. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if all sheets completed successfully, False if any failed. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the job is not registered. |
Source code in src/marianne/daemon/baton/adapter.py
has_completed_sheets
¶
Check if any sheet in the job reached COMPLETED status.
F-145: Used by the manager to set completed_new_work after baton execution. The zero-work guard for concert chaining needs to know whether any sheet completed new work — not just whether all sheets succeeded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one sheet completed, False otherwise. |
Source code in src/marianne/daemon/baton/adapter.py
clear_instrument_rate_limit
¶
Clear instrument rate limit state in the baton core.
Delegates to BatonCore.clear_instrument_rate_limit(). Also
moves WAITING sheets back to PENDING so they can be re-dispatched.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instrument
|
str | None
|
Instrument name to clear, or |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of instruments whose rate limit was cleared. |
Source code in src/marianne/daemon/baton/adapter.py
publish_sheet_skipped
async
¶
Publish a sheet skip event to the EventBus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
SheetSkipped
|
The sheet skip event. |
required |
Source code in src/marianne/daemon/baton/adapter.py
publish_job_event
async
¶
Publish a job-level event to the EventBus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier. |
required |
event_name
|
str
|
Event name (e.g., "job.started", "job.completed"). |
required |
data
|
dict[str, Any] | None
|
Optional event data. |
None
|
Source code in src/marianne/daemon/baton/adapter.py
run
async
¶
Run the baton's event loop with dispatch integration.
Processes events from the inbox, updates state, dispatches ready sheets, and publishes events to the EventBus.
Runs until the baton receives a ShutdownRequested event.
Source code in src/marianne/daemon/baton/adapter.py
1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 | |
shutdown
async
¶
Gracefully shut down the adapter.
Cancels all active musician tasks and closes the backend pool.
Source code in src/marianne/daemon/baton/adapter.py
Functions¶
baton_to_checkpoint_status
¶
Identity mapping — Phase 2 unified the enums.
Kept for backward compatibility with tests that import this function. Since BatonSheetStatus IS SheetStatus, this is just status.value.
Source code in src/marianne/daemon/baton/adapter.py
checkpoint_to_baton_status
¶
Reconstruct SheetStatus from string — Phase 2 unified the enums.
Kept for backward compatibility with tests that import this function.
Source code in src/marianne/daemon/baton/adapter.py
attempt_result_to_observer_event
¶
Convert a SheetAttemptResult to the ObserverEvent format.
Maps baton musician results to the event names the EventBus subscribers expect (dashboard, learning hub, notifications).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result
|
SheetAttemptResult
|
The musician's execution report. |
required |
Returns:
| Type | Description |
|---|---|
ObserverEvent
|
Dict matching the ObserverEvent TypedDict shape. |
Source code in src/marianne/daemon/baton/adapter.py
skipped_to_observer_event
¶
Convert a SheetSkipped event to ObserverEvent format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
SheetSkipped
|
The sheet skip event. |
required |
Returns:
| Type | Description |
|---|---|
ObserverEvent
|
Dict matching the ObserverEvent TypedDict shape. |
Source code in src/marianne/daemon/baton/adapter.py
sheets_to_execution_states
¶
Convert Sheet entities to SheetExecutionState dict for baton registration.
Each Sheet becomes a SheetExecutionState with the baton's extended tracking fields. The sheet_num is the dict key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sheets
|
list[Sheet]
|
List of Sheet entities from build_sheets(). |
required |
max_retries
|
int
|
Maximum normal retry attempts per sheet. |
3
|
max_completion
|
int
|
Maximum completion mode attempts per sheet. |
5
|
Returns:
| Type | Description |
|---|---|
dict[int, SheetExecutionState]
|
Dict of sheet_num → SheetExecutionState. |
Source code in src/marianne/daemon/baton/adapter.py
extract_dependencies
¶
Extract baton-compatible dependency graph from a JobConfig.
The baton expects: {sheet_num: [dep_sheet_num, ...]}
When config.sheet.dependencies is set (non-empty), it is used
as the authoritative DAG. Stage-level dependencies are expanded to
sheet-level: if stage S has a fan-out of 3 (sheets 4,5,6) and
depends on stage T (sheets 1,2,3), each of 4/5/6 depends on all of
1/2/3. Stages not listed in the dependencies map are treated as
having no dependencies (independent).
When config.sheet.dependencies is empty or absent, falls back to
the legacy linear chain: all sheets in stage N+1 depend on all sheets
in stage N.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Any
|
Parsed JobConfig with sheet.get_fan_out_metadata(). |
required |
Returns:
| Type | Description |
|---|---|
dict[int, list[int]]
|
Dict of sheet_num → list of dependency sheet_nums. |
Source code in src/marianne/daemon/baton/adapter.py
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 | |