Skip to content

daemon_adapter

daemon_adapter

StateBackend adapter that reads job state from the running daemon via IPC.

Wraps DaemonClient to satisfy the StateBackend ABC so the dashboard can consume live daemon data without touching the filesystem directly. All write methods raise NotImplementedError — the dashboard is read-only.

Classes

DaemonStateAdapter

DaemonStateAdapter(client)

Bases: StateBackend

Read-only StateBackend backed by live daemon IPC calls.

Parameters

client: An already-configured DaemonClient instance.

Source code in src/marianne/dashboard/state/daemon_adapter.py
def __init__(self, client: DaemonClient) -> None:
    self._client = client
Functions
load async
load(job_id)

Load state for a job from the daemon.

Returns None if the job is not found (DaemonError).

Source code in src/marianne/dashboard/state/daemon_adapter.py
async def load(self, job_id: str) -> CheckpointState | None:
    """Load state for a job from the daemon.

    Returns ``None`` if the job is not found (``DaemonError``).
    """
    try:
        data = await self._client.get_job_status(job_id, "")
        return CheckpointState(**data)
    except DaemonError:
        _logger.debug("load_job_not_found", extra={"job_id": job_id})
        return None
list_jobs async
list_jobs()

List all jobs by querying the daemon roster then enriching each.

Source code in src/marianne/dashboard/state/daemon_adapter.py
async def list_jobs(self) -> list[CheckpointState]:
    """List all jobs by querying the daemon roster then enriching each."""
    roster: list[dict[str, Any]] = await self._client.list_jobs()
    results: list[CheckpointState] = []

    for entry in roster:
        job_id = entry.get("job_id", "")
        try:
            data = await self._client.get_job_status(job_id, "")
            results.append(CheckpointState(**data))
        except DaemonError:
            _logger.debug(
                "list_jobs_fallback",
                extra={"job_id": job_id},
            )
            # Construct a minimal CheckpointState from roster data
            results.append(
                CheckpointState(
                    job_id=job_id,
                    job_name=job_id,
                    total_sheets=1,
                    status=JobStatus(entry.get("status", "pending")),
                    created_at=entry.get("submitted_at") or utc_now(),
                )
            )

    return results
close async
close()

No-op — DaemonClient uses per-request connections.

Source code in src/marianne/dashboard/state/daemon_adapter.py
async def close(self) -> None:
    """No-op — DaemonClient uses per-request connections."""

Functions