Skip to content

Index

state

Dashboard state adapters.

Classes

DaemonStateAdapter

DaemonStateAdapter(client)

Bases: StateBackend

Read-only StateBackend backed by live daemon IPC calls.

Parameters

client: An already-configured DaemonClient instance.

Source code in src/marianne/dashboard/state/daemon_adapter.py
def __init__(self, client: DaemonClient) -> None:
    self._client = client
Functions
load async
load(job_id)

Load state for a job from the daemon.

Returns None if the job is not found (DaemonError).

Source code in src/marianne/dashboard/state/daemon_adapter.py
async def load(self, job_id: str) -> CheckpointState | None:
    """Load state for a job from the daemon.

    Returns ``None`` if the job is not found (``DaemonError``).
    """
    try:
        data = await self._client.get_job_status(job_id, "")
        return CheckpointState(**data)
    except DaemonError:
        _logger.debug("load_job_not_found", extra={"job_id": job_id})
        return None
list_jobs async
list_jobs()

List all jobs by querying the daemon roster then enriching each.

Source code in src/marianne/dashboard/state/daemon_adapter.py
async def list_jobs(self) -> list[CheckpointState]:
    """List all jobs by querying the daemon roster then enriching each."""
    roster: list[dict[str, Any]] = await self._client.list_jobs()
    results: list[CheckpointState] = []

    for entry in roster:
        job_id = entry.get("job_id", "")
        try:
            data = await self._client.get_job_status(job_id, "")
            results.append(CheckpointState(**data))
        except DaemonError:
            _logger.debug(
                "list_jobs_fallback",
                extra={"job_id": job_id},
            )
            # Construct a minimal CheckpointState from roster data
            results.append(
                CheckpointState(
                    job_id=job_id,
                    job_name=job_id,
                    total_sheets=1,
                    status=JobStatus(entry.get("status", "pending")),
                    created_at=entry.get("submitted_at") or utc_now(),
                )
            )

    return results
close async
close()

No-op — DaemonClient uses per-request connections.

Source code in src/marianne/dashboard/state/daemon_adapter.py
async def close(self) -> None:
    """No-op — DaemonClient uses per-request connections."""