Skip to content

registry

registry

Persistent job registry for the Marianne daemon.

SQLite-backed registry that tracks all jobs submitted to the daemon. Survives daemon restarts so mzt list always shows job history.

Separate from the learning store (which tracks patterns across jobs). This DB tracks operational state: which jobs exist, their workspaces, PIDs, and statuses.

All database methods are async (via aiosqlite) so they never block the daemon's asyncio event loop — even under heavy concurrent load.

Classes

DaemonJobStatus

Bases: str, Enum

Status values for daemon-managed jobs.

Inherits from str so meta.status serializes directly as a plain string in JSON/dict output — no .value calls needed.

JobRecord dataclass

JobRecord(job_id, config_path, workspace, status=QUEUED, pid=None, submitted_at=time(), started_at=None, completed_at=None, error_message=None, current_sheet=None, total_sheets=None, last_event_at=None, log_path=None, snapshot_path=None, checkpoint_json=None)

A single job's registry entry.

Functions
to_dict
to_dict()

Serialize for JSON-RPC responses.

Source code in src/marianne/daemon/registry.py
def to_dict(self) -> dict[str, Any]:
    """Serialize for JSON-RPC responses."""
    result: dict[str, Any] = {
        "job_id": self.job_id,
        "config_path": self.config_path,
        "workspace": self.workspace,
        "status": self.status,
        "pid": self.pid,
        "submitted_at": self.submitted_at,
        "started_at": self.started_at,
        "completed_at": self.completed_at,
        "current_sheet": self.current_sheet,
        "total_sheets": self.total_sheets,
    }
    if self.error_message:
        result["error_message"] = self.error_message
    if self.log_path:
        result["log_path"] = self.log_path
    if self.snapshot_path:
        result["snapshot_path"] = self.snapshot_path
    return result

JobRegistry

JobRegistry(db_path)

Async SQLite-backed persistent job registry.

Uses aiosqlite so all I/O happens off the event loop thread. The daemon is single-threaded (asyncio) so contention is minimal, but the DB is safe for external readers (e.g. a monitoring tool reading the same file).

Usage::

registry = JobRegistry(db_path)
await registry.open()   # creates tables, sets WAL mode
...
await registry.close()

Or as an async context manager::

async with JobRegistry(db_path) as registry:
    await registry.register_job(...)
Source code in src/marianne/daemon/registry.py
def __init__(self, db_path: Path) -> None:
    self._db_path = db_path
    db_path.parent.mkdir(parents=True, exist_ok=True)
    self._conn: aiosqlite.Connection | None = None
Functions
open async
open()

Open the database connection and create tables.

Source code in src/marianne/daemon/registry.py
async def open(self) -> None:
    """Open the database connection and create tables."""
    conn = await aiosqlite.connect(str(self._db_path))
    try:
        conn.row_factory = aiosqlite.Row
        await conn.execute("PRAGMA journal_mode=WAL")
        await self._create_tables(conn)
    except Exception:
        await conn.close()
        raise
    self._conn = conn
    _logger.info("registry.opened", path=str(self._db_path))
register_job async
register_job(job_id, config_path, workspace)

Register a newly submitted job.

Source code in src/marianne/daemon/registry.py
async def register_job(
    self,
    job_id: str,
    config_path: Path,
    workspace: Path,
) -> None:
    """Register a newly submitted job."""
    await self._db.execute(
        """
        INSERT OR REPLACE INTO jobs
            (job_id, config_path, workspace, status, submitted_at)
        VALUES (?, ?, ?, 'queued', ?)
        """,
        (job_id, str(config_path), str(workspace), time.time()),
    )
    await self._db.commit()
update_status async
update_status(job_id, status, *, pid=None, error_message=None, snapshot_path=None)

Update a job's status and optional fields.

Source code in src/marianne/daemon/registry.py
async def update_status(
    self,
    job_id: str,
    status: str,
    *,
    pid: int | None = None,
    error_message: str | None = None,
    snapshot_path: str | None = None,
) -> None:
    """Update a job's status and optional fields."""
    updates = ["status = ?"]
    params: list[Any] = [status]

    if pid is not None:
        updates.append("pid = ?")
        params.append(pid)

    if status == "running" and pid is not None:
        updates.append("started_at = ?")
        params.append(time.time())

    if status in _TERMINAL_STATUSES:
        updates.append("completed_at = ?")
        params.append(time.time())

    if error_message is not None:
        updates.append("error_message = ?")
        params.append(error_message)

    if snapshot_path is not None:
        updates.append("snapshot_path = ?")
        params.append(snapshot_path)

    params.append(job_id)
    await self._db.execute(
        f"UPDATE jobs SET {', '.join(updates)} WHERE job_id = ?",
        params,
    )
    await self._db.commit()
update_config_metadata async
update_config_metadata(job_id, *, config_path=None, workspace=None)

Update config-derived metadata for a job.

Called during config reconciliation to keep registry in sync with the reloaded config.

Source code in src/marianne/daemon/registry.py
async def update_config_metadata(
    self,
    job_id: str,
    *,
    config_path: str | None = None,
    workspace: str | None = None,
) -> None:
    """Update config-derived metadata for a job.

    Called during config reconciliation to keep registry in sync
    with the reloaded config.
    """
    updates: list[str] = []
    params: list[Any] = []

    if config_path is not None:
        updates.append("config_path = ?")
        params.append(config_path)
    if workspace is not None:
        updates.append("workspace = ?")
        params.append(workspace)

    if not updates:
        return

    params.append(job_id)
    await self._db.execute(
        f"UPDATE jobs SET {', '.join(updates)} WHERE job_id = ?",
        params,
    )
    await self._db.commit()
update_progress async
update_progress(job_id, current_sheet, total_sheets)

Update per-sheet progress counters for a running job.

Source code in src/marianne/daemon/registry.py
async def update_progress(
    self,
    job_id: str,
    current_sheet: int,
    total_sheets: int,
) -> None:
    """Update per-sheet progress counters for a running job."""
    await self._db.execute(
        "UPDATE jobs SET current_sheet = ?, total_sheets = ?, "
        "last_event_at = ? WHERE job_id = ?",
        (current_sheet, total_sheets, time.time(), job_id),
    )
    await self._db.commit()
save_checkpoint async
save_checkpoint(job_id, checkpoint_json)

Persist a serialized CheckpointState for a job.

Called on every state publish so the registry always has the latest checkpoint. This is the daemon's single source of truth for historical job status — no disk fallback needed.

Source code in src/marianne/daemon/registry.py
async def save_checkpoint(self, job_id: str, checkpoint_json: str) -> None:
    """Persist a serialized CheckpointState for a job.

    Called on every state publish so the registry always has the
    latest checkpoint.  This is the daemon's single source of
    truth for historical job status — no disk fallback needed.
    """
    await self._db.execute(
        "UPDATE jobs SET checkpoint_json = ?, last_event_at = ? "
        "WHERE job_id = ?",
        (checkpoint_json, time.time(), job_id),
    )
    await self._db.commit()
load_checkpoint async
load_checkpoint(job_id)

Load the stored checkpoint JSON for a job.

Returns the raw JSON string, or None if no checkpoint was saved.

Source code in src/marianne/daemon/registry.py
async def load_checkpoint(self, job_id: str) -> str | None:
    """Load the stored checkpoint JSON for a job.

    Returns the raw JSON string, or None if no checkpoint was saved.
    """
    cursor = await self._db.execute(
        "SELECT checkpoint_json FROM jobs WHERE job_id = ?",
        (job_id,),
    )
    row = await cursor.fetchone()
    if row is None:
        return None
    result: str | None = row["checkpoint_json"]
    return result
store_hook_config async
store_hook_config(job_id, config_json)

Store hook configuration for a job at submission time.

Source code in src/marianne/daemon/registry.py
async def store_hook_config(self, job_id: str, config_json: str) -> None:
    """Store hook configuration for a job at submission time."""
    await self._db.execute(
        "UPDATE jobs SET hook_config_json = ? WHERE job_id = ?",
        (config_json, job_id),
    )
    await self._db.commit()
get_hook_config async
get_hook_config(job_id)

Load stored hook config JSON for a job.

Source code in src/marianne/daemon/registry.py
async def get_hook_config(self, job_id: str) -> str | None:
    """Load stored hook config JSON for a job."""
    cursor = await self._db.execute(
        "SELECT hook_config_json FROM jobs WHERE job_id = ?",
        (job_id,),
    )
    row = await cursor.fetchone()
    if row is None:
        return None
    result: str | None = row["hook_config_json"]
    return result
store_hook_results async
store_hook_results(job_id, results_json)

Store hook execution results for a job.

Source code in src/marianne/daemon/registry.py
async def store_hook_results(self, job_id: str, results_json: str) -> None:
    """Store hook execution results for a job."""
    await self._db.execute(
        "UPDATE jobs SET hook_results_json = ? WHERE job_id = ?",
        (results_json, job_id),
    )
    await self._db.commit()
get_job async
get_job(job_id)

Get a single job by ID.

Source code in src/marianne/daemon/registry.py
async def get_job(self, job_id: str) -> JobRecord | None:
    """Get a single job by ID."""
    cursor = await self._db.execute(
        "SELECT * FROM jobs WHERE job_id = ?", (job_id,)
    )
    row = await cursor.fetchone()
    if row is None:
        return None
    return self._row_to_record(row)
list_jobs async
list_jobs(*, status=None, limit=50)

List jobs, most recent first.

Source code in src/marianne/daemon/registry.py
async def list_jobs(
    self,
    *,
    status: str | None = None,
    limit: int = 50,
) -> list[JobRecord]:
    """List jobs, most recent first."""
    if status:
        cursor = await self._db.execute(
            "SELECT * FROM jobs WHERE status = ? ORDER BY submitted_at DESC LIMIT ?",
            (status, limit),
        )
    else:
        cursor = await self._db.execute(
            "SELECT * FROM jobs ORDER BY submitted_at DESC LIMIT ?",
            (limit,),
        )
    rows = await cursor.fetchall()
    return [self._row_to_record(r) for r in rows]
has_active_job async
has_active_job(job_id)

Check if a job ID exists and is in an active state.

Source code in src/marianne/daemon/registry.py
async def has_active_job(self, job_id: str) -> bool:
    """Check if a job ID exists and is in an active state."""
    cursor = await self._db.execute(
        "SELECT 1 FROM jobs WHERE job_id = ? AND status IN ('queued', 'running')",
        (job_id,),
    )
    row = await cursor.fetchone()
    return row is not None
get_orphaned_jobs async
get_orphaned_jobs()

Find jobs that were running when the daemon last stopped.

These are jobs with status 'queued' or 'running' — after a daemon restart they're orphans since their asyncio tasks no longer exist.

Source code in src/marianne/daemon/registry.py
async def get_orphaned_jobs(self) -> list[JobRecord]:
    """Find jobs that were running when the daemon last stopped.

    These are jobs with status 'queued' or 'running' — after a daemon
    restart they're orphans since their asyncio tasks no longer exist.
    """
    cursor = await self._db.execute(
        "SELECT * FROM jobs WHERE status IN ('queued', 'running') "
        "ORDER BY submitted_at DESC"
    )
    rows = await cursor.fetchall()
    return [self._row_to_record(r) for r in rows]
mark_orphans_failed async
mark_orphans_failed()

Mark all orphaned jobs as failed on daemon startup.

Returns the number of jobs marked.

Source code in src/marianne/daemon/registry.py
async def mark_orphans_failed(self) -> int:
    """Mark all orphaned jobs as failed on daemon startup.

    Returns the number of jobs marked.
    """
    cursor = await self._db.execute(
        """
        UPDATE jobs SET
            status = 'failed',
            completed_at = ?,
            error_message = 'Daemon restarted while job was active'
        WHERE status IN ('queued', 'running')
        """,
        (time.time(),),
    )
    await self._db.commit()
    count = cursor.rowcount
    if count > 0:
        _logger.warning("registry.orphans_marked_failed", count=count)
    return count
delete_jobs async
delete_jobs(*, job_ids=None, statuses=None, older_than_seconds=None)

Delete terminal jobs from the registry.

Never deletes active jobs (queued/running) regardless of filter.

Parameters:

Name Type Description Default
job_ids list[str] | None

Only delete these specific job IDs.

None
statuses list[str] | None

Only delete jobs with these statuses. Defaults to all terminal statuses.

None
older_than_seconds float | None

Only delete jobs older than this many seconds.

None

Returns:

Type Description
int

Number of deleted rows.

Source code in src/marianne/daemon/registry.py
async def delete_jobs(
    self,
    *,
    job_ids: list[str] | None = None,
    statuses: list[str] | None = None,
    older_than_seconds: float | None = None,
) -> int:
    """Delete terminal jobs from the registry.

    Never deletes active jobs (queued/running) regardless of filter.

    Args:
        job_ids: Only delete these specific job IDs.
        statuses: Only delete jobs with these statuses.
                  Defaults to all terminal statuses.
        older_than_seconds: Only delete jobs older than this many seconds.

    Returns:
        Number of deleted rows.
    """
    safe = set(statuses or _TERMINAL_STATUSES)
    safe -= _ACTIVE_STATUSES

    conditions = ["status IN ({})".format(",".join("?" for _ in safe))]
    params: list[Any] = list(safe)

    if job_ids is not None:
        conditions.append(
            "job_id IN ({})".format(",".join("?" for _ in job_ids))
        )
        params.extend(job_ids)

    if older_than_seconds is not None:
        conditions.append("submitted_at < ?")
        params.append(time.time() - older_than_seconds)

    sql = "DELETE FROM jobs WHERE " + " AND ".join(conditions)
    cursor = await self._db.execute(sql, params)
    await self._db.commit()
    count = cursor.rowcount
    if count > 0:
        _logger.info("registry.delete_jobs", deleted=count)
    return count
close async
close()

Close the database connection.

Source code in src/marianne/daemon/registry.py
async def close(self) -> None:
    """Close the database connection."""
    if self._conn is not None:
        await self._conn.close()
        self._conn = None

Functions