Index
a2a
¶
Agent-to-Agent (A2A) protocol implementation.
Provides agent card registration, task inbox persistence, and event handling for inter-agent task delegation mediated by the conductor.
Components:
- AgentCardRegistry — tracks which agents are running and what they can do
- A2AInbox — per-job persistent task queue for incoming A2A tasks
The conductor routes A2A events through the baton's event bus. Tasks persist across sheet boundaries — when an agent sends a task, the conductor persists it in the target's inbox, and the target picks it up on their next A2A-enabled sheet.
Classes¶
A2AInbox
¶
Per-job task inbox for A2A protocol.
The conductor maintains one inbox per running job. Tasks are added when other agents submit work, and consumed when the owning agent's A2A-enabled sheets execute.
Serialization: to_dict() / from_dict() for atomic
persistence with job state. The inbox is saved alongside
CheckpointState — same atomicity guarantees.
Usage::
inbox = A2AInbox(job_id="j1", agent_name="canyon")
# Route a task
task = inbox.submit_task(
source_job_id="j2",
source_agent="forge",
description="Review architecture for module X",
)
# Inject pending tasks into sheet context
context_text = inbox.render_pending_context()
# Mark tasks as accepted when injected
inbox.mark_accepted(task.task_id)
# Complete a task with results
inbox.complete_task(task.task_id, artifacts={"review": "..."})
Source code in src/marianne/daemon/a2a/inbox.py
Attributes¶
Functions¶
submit_task
¶
Add a new task to the inbox.
Called by the conductor when routing an A2ATaskSubmitted event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_job_id
|
str
|
Job ID of the requesting agent. |
required |
source_agent
|
str
|
Name of the requesting agent. |
required |
description
|
str
|
What needs to be done. |
required |
context
|
dict[str, Any] | None
|
Optional additional context. |
None
|
Returns:
| Type | Description |
|---|---|
A2ATask
|
The created task with a unique ID. |
Source code in src/marianne/daemon/a2a/inbox.py
get_task
¶
get_pending_tasks
¶
Get all tasks in PENDING status.
Used to inject pending work into the agent's next sheet.
Source code in src/marianne/daemon/a2a/inbox.py
mark_accepted
¶
Mark a task as accepted (injected into a sheet).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
The task to accept. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the task was found and transitioned, False otherwise. |
Source code in src/marianne/daemon/a2a/inbox.py
complete_task
¶
Mark a task as completed with optional artifacts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
The task to complete. |
required |
artifacts
|
dict[str, Any] | None
|
Output artifacts from the completed work. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the task was found and completed, False otherwise. |
Source code in src/marianne/daemon/a2a/inbox.py
fail_task
¶
Mark a task as failed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
The task that failed. |
required |
reason
|
str
|
Why the task could not be fulfilled. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the task was found and failed, False otherwise. |
Source code in src/marianne/daemon/a2a/inbox.py
render_pending_context
¶
Render pending tasks as markdown context for sheet injection.
Produces a section that the musician reads to understand incoming A2A tasks. Injected as cadenza context on A2A-enabled sheets.
Returns:
| Type | Description |
|---|---|
str
|
Markdown string, or empty string if no pending tasks. |
Source code in src/marianne/daemon/a2a/inbox.py
to_dict
¶
Serialize for atomic persistence with job state.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict representation suitable for JSON serialization. |
Source code in src/marianne/daemon/a2a/inbox.py
from_dict
classmethod
¶
Restore from serialized state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dict from |
required |
Returns:
| Type | Description |
|---|---|
A2AInbox
|
Reconstructed inbox with all tasks. |
Source code in src/marianne/daemon/a2a/inbox.py
AgentCardRegistry
¶
In-memory registry of agent cards for running jobs.
Thread-safe for single-threaded asyncio use (no locking needed). The baton's event loop is the sole writer; query methods are read-only.
Usage::
registry = AgentCardRegistry()
# On job start
card = AgentCard(name="canyon", description="...", skills=[...])
registry.register("job-123", card)
# Discovery
agents = registry.query() # all running agents
architects = registry.query_by_skill("architecture-review")
# On job end
registry.deregister("job-123")
Source code in src/marianne/daemon/a2a/registry.py
Attributes¶
Functions¶
register
¶
Register an agent card for a running job.
If the job_id is already registered, the card is replaced. If an agent with the same name is registered under a different job, the old registration is removed (agent name must be unique).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job this agent card belongs to. |
required |
card
|
AgentCard
|
The agent's identity card. |
required |
Source code in src/marianne/daemon/a2a/registry.py
deregister
¶
Remove an agent card when a job completes or is cancelled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job to deregister. |
required |
Returns:
| Type | Description |
|---|---|
AgentCard | None
|
The removed card, or None if the job wasn't registered. |
Source code in src/marianne/daemon/a2a/registry.py
get
¶
Get the agent card for a specific job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job to look up. |
required |
Returns:
| Type | Description |
|---|---|
AgentCard | None
|
The agent card, or None if not registered. |
Source code in src/marianne/daemon/a2a/registry.py
get_job_id_for_agent
¶
Resolve an agent name to its job_id.
Used by the conductor to route A2A tasks — when an agent sends a task to "canyon", this resolves to canyon's job_id so the task can be persisted in the correct inbox.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_name
|
str
|
The target agent's name. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The job_id, or None if the agent isn't running. |
Source code in src/marianne/daemon/a2a/registry.py
query
¶
List all registered agent cards.
Returns:
| Type | Description |
|---|---|
list[AgentCard]
|
List of all currently registered cards (snapshot). |
query_by_skill
¶
Find agents that offer a specific skill.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
skill_id
|
str
|
The skill identifier to search for. |
required |
Returns:
| Type | Description |
|---|---|
list[AgentCard]
|
List of agent cards that declare the given skill. |
Source code in src/marianne/daemon/a2a/registry.py
clear
¶
Remove all registrations. Used on conductor shutdown.