Skip to content

Index

a2a

Agent-to-Agent (A2A) protocol implementation.

Provides agent card registration, task inbox persistence, and event handling for inter-agent task delegation mediated by the conductor.

Components: - AgentCardRegistry — tracks which agents are running and what they can do - A2AInbox — per-job persistent task queue for incoming A2A tasks

The conductor routes A2A events through the baton's event bus. Tasks persist across sheet boundaries — when an agent sends a task, the conductor persists it in the target's inbox, and the target picks it up on their next A2A-enabled sheet.

Classes

A2AInbox

A2AInbox(*, job_id, agent_name)

Per-job task inbox for A2A protocol.

The conductor maintains one inbox per running job. Tasks are added when other agents submit work, and consumed when the owning agent's A2A-enabled sheets execute.

Serialization: to_dict() / from_dict() for atomic persistence with job state. The inbox is saved alongside CheckpointState — same atomicity guarantees.

Usage::

inbox = A2AInbox(job_id="j1", agent_name="canyon")

# Route a task
task = inbox.submit_task(
    source_job_id="j2",
    source_agent="forge",
    description="Review architecture for module X",
)

# Inject pending tasks into sheet context
context_text = inbox.render_pending_context()

# Mark tasks as accepted when injected
inbox.mark_accepted(task.task_id)

# Complete a task with results
inbox.complete_task(task.task_id, artifacts={"review": "..."})
Source code in src/marianne/daemon/a2a/inbox.py
def __init__(self, *, job_id: str, agent_name: str) -> None:
    if not job_id:
        raise ValueError("job_id must not be empty")
    if not agent_name:
        raise ValueError("agent_name must not be empty")

    self._job_id = job_id
    self._agent_name = agent_name
    self._tasks: dict[str, A2ATask] = {}
Attributes
job_id property
job_id

The job this inbox belongs to.

agent_name property
agent_name

The agent this inbox belongs to.

task_count property
task_count

Total number of tasks in the inbox.

pending_count property
pending_count

Number of pending tasks waiting for the agent.

Functions
submit_task
submit_task(*, source_job_id, source_agent, description, context=None)

Add a new task to the inbox.

Called by the conductor when routing an A2ATaskSubmitted event.

Parameters:

Name Type Description Default
source_job_id str

Job ID of the requesting agent.

required
source_agent str

Name of the requesting agent.

required
description str

What needs to be done.

required
context dict[str, Any] | None

Optional additional context.

None

Returns:

Type Description
A2ATask

The created task with a unique ID.

Source code in src/marianne/daemon/a2a/inbox.py
def submit_task(
    self,
    *,
    source_job_id: str,
    source_agent: str,
    description: str,
    context: dict[str, Any] | None = None,
) -> A2ATask:
    """Add a new task to the inbox.

    Called by the conductor when routing an A2ATaskSubmitted event.

    Args:
        source_job_id: Job ID of the requesting agent.
        source_agent: Name of the requesting agent.
        description: What needs to be done.
        context: Optional additional context.

    Returns:
        The created task with a unique ID.
    """
    task_id = str(uuid.uuid4())
    task = A2ATask(
        task_id=task_id,
        source_job_id=source_job_id,
        source_agent=source_agent,
        target_agent=self._agent_name,
        description=description,
        context=context or {},
    )
    self._tasks[task_id] = task

    _logger.info(
        "a2a.inbox.task_submitted",
        extra={
            "job_id": self._job_id,
            "task_id": task_id,
            "source_agent": source_agent,
            "target_agent": self._agent_name,
        },
    )

    return task
get_task
get_task(task_id)

Get a specific task by ID.

Source code in src/marianne/daemon/a2a/inbox.py
def get_task(self, task_id: str) -> A2ATask | None:
    """Get a specific task by ID."""
    return self._tasks.get(task_id)
get_pending_tasks
get_pending_tasks()

Get all tasks in PENDING status.

Used to inject pending work into the agent's next sheet.

Source code in src/marianne/daemon/a2a/inbox.py
def get_pending_tasks(self) -> list[A2ATask]:
    """Get all tasks in PENDING status.

    Used to inject pending work into the agent's next sheet.
    """
    return [
        t for t in self._tasks.values()
        if t.status == A2ATaskStatus.PENDING
    ]
mark_accepted
mark_accepted(task_id)

Mark a task as accepted (injected into a sheet).

Parameters:

Name Type Description Default
task_id str

The task to accept.

required

Returns:

Type Description
bool

True if the task was found and transitioned, False otherwise.

Source code in src/marianne/daemon/a2a/inbox.py
def mark_accepted(self, task_id: str) -> bool:
    """Mark a task as accepted (injected into a sheet).

    Args:
        task_id: The task to accept.

    Returns:
        True if the task was found and transitioned, False otherwise.
    """
    task = self._tasks.get(task_id)
    if task is None or task.status != A2ATaskStatus.PENDING:
        return False

    # Pydantic model is not frozen, so we can update in place
    self._tasks[task_id] = task.model_copy(
        update={"status": A2ATaskStatus.ACCEPTED}
    )

    _logger.debug(
        "a2a.inbox.task_accepted",
        extra={
            "job_id": self._job_id,
            "task_id": task_id,
            "agent_name": self._agent_name,
        },
    )
    return True
complete_task
complete_task(task_id, *, artifacts=None)

Mark a task as completed with optional artifacts.

Parameters:

Name Type Description Default
task_id str

The task to complete.

required
artifacts dict[str, Any] | None

Output artifacts from the completed work.

None

Returns:

Type Description
bool

True if the task was found and completed, False otherwise.

Source code in src/marianne/daemon/a2a/inbox.py
def complete_task(
    self,
    task_id: str,
    *,
    artifacts: dict[str, Any] | None = None,
) -> bool:
    """Mark a task as completed with optional artifacts.

    Args:
        task_id: The task to complete.
        artifacts: Output artifacts from the completed work.

    Returns:
        True if the task was found and completed, False otherwise.
    """
    task = self._tasks.get(task_id)
    if task is None:
        return False
    if task.status not in (A2ATaskStatus.PENDING, A2ATaskStatus.ACCEPTED):
        return False

    self._tasks[task_id] = task.model_copy(
        update={
            "status": A2ATaskStatus.COMPLETED,
            "artifacts": artifacts or {},
        }
    )

    _logger.info(
        "a2a.inbox.task_completed",
        extra={
            "job_id": self._job_id,
            "task_id": task_id,
            "agent_name": self._agent_name,
            "has_artifacts": bool(artifacts),
        },
    )
    return True
fail_task
fail_task(task_id, *, reason)

Mark a task as failed.

Parameters:

Name Type Description Default
task_id str

The task that failed.

required
reason str

Why the task could not be fulfilled.

required

Returns:

Type Description
bool

True if the task was found and failed, False otherwise.

Source code in src/marianne/daemon/a2a/inbox.py
def fail_task(self, task_id: str, *, reason: str) -> bool:
    """Mark a task as failed.

    Args:
        task_id: The task that failed.
        reason: Why the task could not be fulfilled.

    Returns:
        True if the task was found and failed, False otherwise.
    """
    task = self._tasks.get(task_id)
    if task is None:
        return False
    if task.status not in (A2ATaskStatus.PENDING, A2ATaskStatus.ACCEPTED):
        return False

    self._tasks[task_id] = task.model_copy(
        update={
            "status": A2ATaskStatus.FAILED,
            "failure_reason": reason,
        }
    )

    _logger.info(
        "a2a.inbox.task_failed",
        extra={
            "job_id": self._job_id,
            "task_id": task_id,
            "agent_name": self._agent_name,
            "reason": reason,
        },
    )
    return True
render_pending_context
render_pending_context()

Render pending tasks as markdown context for sheet injection.

Produces a section that the musician reads to understand incoming A2A tasks. Injected as cadenza context on A2A-enabled sheets.

Returns:

Type Description
str

Markdown string, or empty string if no pending tasks.

Source code in src/marianne/daemon/a2a/inbox.py
def render_pending_context(self) -> str:
    """Render pending tasks as markdown context for sheet injection.

    Produces a section that the musician reads to understand
    incoming A2A tasks. Injected as cadenza context on A2A-enabled
    sheets.

    Returns:
        Markdown string, or empty string if no pending tasks.
    """
    pending = self.get_pending_tasks()
    if not pending:
        return ""

    lines = [
        "## A2A Inbox — Pending Tasks",
        "",
        f"You have {len(pending)} task(s) from other agents:",
        "",
    ]

    for i, task in enumerate(pending, 1):
        lines.append(f"### Task {i}: from {task.source_agent}")
        lines.append(f"**Task ID:** `{task.task_id}`")
        lines.append(f"**Description:** {task.description}")
        if task.context:
            lines.append("**Context:**")
            for key, value in task.context.items():
                lines.append(f"  - {key}: {value}")
        lines.append("")

    lines.append(
        "To complete a task, include its task_id in your output "
        "with the results. To decline, explain why."
    )

    return "\n".join(lines)
to_dict
to_dict()

Serialize for atomic persistence with job state.

Returns:

Type Description
dict[str, Any]

Dict representation suitable for JSON serialization.

Source code in src/marianne/daemon/a2a/inbox.py
def to_dict(self) -> dict[str, Any]:
    """Serialize for atomic persistence with job state.

    Returns:
        Dict representation suitable for JSON serialization.
    """
    return {
        "job_id": self._job_id,
        "agent_name": self._agent_name,
        "tasks": {
            tid: task.model_dump(mode="json")
            for tid, task in self._tasks.items()
        },
    }
from_dict classmethod
from_dict(data)

Restore from serialized state.

Parameters:

Name Type Description Default
data dict[str, Any]

Dict from to_dict().

required

Returns:

Type Description
A2AInbox

Reconstructed inbox with all tasks.

Source code in src/marianne/daemon/a2a/inbox.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> A2AInbox:
    """Restore from serialized state.

    Args:
        data: Dict from ``to_dict()``.

    Returns:
        Reconstructed inbox with all tasks.
    """
    inbox = cls(
        job_id=data["job_id"],
        agent_name=data["agent_name"],
    )
    for tid, task_data in data.get("tasks", {}).items():
        inbox._tasks[tid] = A2ATask.model_validate(task_data)
    return inbox

AgentCardRegistry

AgentCardRegistry()

In-memory registry of agent cards for running jobs.

Thread-safe for single-threaded asyncio use (no locking needed). The baton's event loop is the sole writer; query methods are read-only.

Usage::

registry = AgentCardRegistry()

# On job start
card = AgentCard(name="canyon", description="...", skills=[...])
registry.register("job-123", card)

# Discovery
agents = registry.query()  # all running agents
architects = registry.query_by_skill("architecture-review")

# On job end
registry.deregister("job-123")
Source code in src/marianne/daemon/a2a/registry.py
def __init__(self) -> None:
    # job_id → AgentCard
    self._cards: dict[str, AgentCard] = {}
    # agent_name → job_id (reverse index for routing)
    self._name_to_job: dict[str, str] = {}
Attributes
count property
count

Number of registered agent cards.

Functions
register
register(job_id, card)

Register an agent card for a running job.

If the job_id is already registered, the card is replaced. If an agent with the same name is registered under a different job, the old registration is removed (agent name must be unique).

Parameters:

Name Type Description Default
job_id str

The job this agent card belongs to.

required
card AgentCard

The agent's identity card.

required
Source code in src/marianne/daemon/a2a/registry.py
def register(self, job_id: str, card: AgentCard) -> None:
    """Register an agent card for a running job.

    If the job_id is already registered, the card is replaced.
    If an agent with the same name is registered under a different
    job, the old registration is removed (agent name must be unique).

    Args:
        job_id: The job this agent card belongs to.
        card: The agent's identity card.
    """
    if not job_id:
        raise ValueError("job_id must not be empty")
    if not card.name:
        raise ValueError("agent card name must not be empty")

    # If this agent name is already registered under a different job,
    # remove the stale entry first (name uniqueness invariant).
    existing_job = self._name_to_job.get(card.name)
    if existing_job is not None and existing_job != job_id:
        _logger.warning(
            "a2a.registry.name_conflict",
            extra={
                "agent_name": card.name,
                "old_job_id": existing_job,
                "new_job_id": job_id,
            },
        )
        self._cards.pop(existing_job, None)

    # If this job_id had a different card, clean up the old name index
    old_card = self._cards.get(job_id)
    if old_card is not None and old_card.name != card.name:
        self._name_to_job.pop(old_card.name, None)

    self._cards[job_id] = card
    self._name_to_job[card.name] = job_id

    _logger.info(
        "a2a.registry.registered",
        extra={
            "job_id": job_id,
            "agent_name": card.name,
            "skill_count": len(card.skills),
        },
    )
deregister
deregister(job_id)

Remove an agent card when a job completes or is cancelled.

Parameters:

Name Type Description Default
job_id str

The job to deregister.

required

Returns:

Type Description
AgentCard | None

The removed card, or None if the job wasn't registered.

Source code in src/marianne/daemon/a2a/registry.py
def deregister(self, job_id: str) -> AgentCard | None:
    """Remove an agent card when a job completes or is cancelled.

    Args:
        job_id: The job to deregister.

    Returns:
        The removed card, or None if the job wasn't registered.
    """
    card = self._cards.pop(job_id, None)
    if card is not None:
        self._name_to_job.pop(card.name, None)
        _logger.info(
            "a2a.registry.deregistered",
            extra={"job_id": job_id, "agent_name": card.name},
        )
    return card
get
get(job_id)

Get the agent card for a specific job.

Parameters:

Name Type Description Default
job_id str

The job to look up.

required

Returns:

Type Description
AgentCard | None

The agent card, or None if not registered.

Source code in src/marianne/daemon/a2a/registry.py
def get(self, job_id: str) -> AgentCard | None:
    """Get the agent card for a specific job.

    Args:
        job_id: The job to look up.

    Returns:
        The agent card, or None if not registered.
    """
    return self._cards.get(job_id)
get_job_id_for_agent
get_job_id_for_agent(agent_name)

Resolve an agent name to its job_id.

Used by the conductor to route A2A tasks — when an agent sends a task to "canyon", this resolves to canyon's job_id so the task can be persisted in the correct inbox.

Parameters:

Name Type Description Default
agent_name str

The target agent's name.

required

Returns:

Type Description
str | None

The job_id, or None if the agent isn't running.

Source code in src/marianne/daemon/a2a/registry.py
def get_job_id_for_agent(self, agent_name: str) -> str | None:
    """Resolve an agent name to its job_id.

    Used by the conductor to route A2A tasks — when an agent
    sends a task to "canyon", this resolves to canyon's job_id
    so the task can be persisted in the correct inbox.

    Args:
        agent_name: The target agent's name.

    Returns:
        The job_id, or None if the agent isn't running.
    """
    return self._name_to_job.get(agent_name)
query
query()

List all registered agent cards.

Returns:

Type Description
list[AgentCard]

List of all currently registered cards (snapshot).

Source code in src/marianne/daemon/a2a/registry.py
def query(self) -> list[AgentCard]:
    """List all registered agent cards.

    Returns:
        List of all currently registered cards (snapshot).
    """
    return list(self._cards.values())
query_by_skill
query_by_skill(skill_id)

Find agents that offer a specific skill.

Parameters:

Name Type Description Default
skill_id str

The skill identifier to search for.

required

Returns:

Type Description
list[AgentCard]

List of agent cards that declare the given skill.

Source code in src/marianne/daemon/a2a/registry.py
def query_by_skill(self, skill_id: str) -> list[AgentCard]:
    """Find agents that offer a specific skill.

    Args:
        skill_id: The skill identifier to search for.

    Returns:
        List of agent cards that declare the given skill.
    """
    return [
        card for card in self._cards.values()
        if any(s.id == skill_id for s in card.skills)
    ]
clear
clear()

Remove all registrations. Used on conductor shutdown.

Source code in src/marianne/daemon/a2a/registry.py
def clear(self) -> None:
    """Remove all registrations. Used on conductor shutdown."""
    count = len(self._cards)
    self._cards.clear()
    self._name_to_job.clear()
    if count > 0:
        _logger.info(
            "a2a.registry.cleared",
            extra={"deregistered_count": count},
        )