Skip to content

base

base

Abstract base for state backends.

Attributes

Classes

StateBackend

Bases: ABC

Abstract base class for state storage backends.

Implementations handle persistence of job state for resumable execution.

Attributes
supports_execution_history property
supports_execution_history

Whether this backend persists execution history.

Backends that override record_execution should also override this to return True. Callers can check this to avoid silent data loss when a backend unexpectedly lacks execution recording.

Functions
load abstractmethod async
load(job_id)

Load state for a job.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required

Returns:

Type Description
CheckpointState | None

CheckpointState if found, None otherwise

Source code in src/marianne/state/base.py
@abstractmethod
async def load(self, job_id: str) -> CheckpointState | None:
    """Load state for a job.

    Args:
        job_id: Unique job identifier

    Returns:
        CheckpointState if found, None otherwise
    """
    ...
save abstractmethod async
save(state)

Save job state.

Parameters:

Name Type Description Default
state CheckpointState

Checkpoint state to persist

required
Source code in src/marianne/state/base.py
@abstractmethod
async def save(self, state: CheckpointState) -> None:
    """Save job state.

    Args:
        state: Checkpoint state to persist
    """
    ...
delete abstractmethod async
delete(job_id)

Delete state for a job.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/marianne/state/base.py
@abstractmethod
async def delete(self, job_id: str) -> bool:
    """Delete state for a job.

    Args:
        job_id: Unique job identifier

    Returns:
        True if deleted, False if not found
    """
    ...
list_jobs abstractmethod async
list_jobs()

List all jobs with state.

Returns:

Type Description
list[CheckpointState]

List of all checkpoint states

Source code in src/marianne/state/base.py
@abstractmethod
async def list_jobs(self) -> list[CheckpointState]:
    """List all jobs with state.

    Returns:
        List of all checkpoint states
    """
    ...
get_next_sheet abstractmethod async
get_next_sheet(job_id)

Get the next sheet to process for a job.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required

Returns:

Type Description
int | None

Next sheet number, or None if complete

Source code in src/marianne/state/base.py
@abstractmethod
async def get_next_sheet(self, job_id: str) -> int | None:
    """Get the next sheet to process for a job.

    Args:
        job_id: Unique job identifier

    Returns:
        Next sheet number, or None if complete
    """
    ...
mark_sheet_status abstractmethod async
mark_sheet_status(job_id, sheet_num, status, error_message=None)

Update status of a specific sheet.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required
sheet_num int

Sheet number to update

required
status SheetStatus

New status

required
error_message str | None

Optional error message for failures

None
Source code in src/marianne/state/base.py
@abstractmethod
async def mark_sheet_status(
    self,
    job_id: str,
    sheet_num: int,
    status: SheetStatus,
    error_message: str | None = None,
) -> None:
    """Update status of a specific sheet.

    Args:
        job_id: Unique job identifier
        sheet_num: Sheet number to update
        status: New status
        error_message: Optional error message for failures
    """
    ...
close async
close()

Release any resources held by the backend (Q018/#37).

Optional method — backends that hold persistent connections or file handles should override this. The default no-op is safe for backends that use per-operation connections (e.g. aiosqlite context managers).

Source code in src/marianne/state/base.py
async def close(self) -> None:  # noqa: B027 — intentional concrete no-op default
    """Release any resources held by the backend (Q018/#37).

    Optional method — backends that hold persistent connections or file
    handles should override this. The default no-op is safe for backends
    that use per-operation connections (e.g. aiosqlite context managers).
    """
record_execution async
record_execution(job_id, sheet_num, attempt_num, prompt=None, output=None, exit_code=None, duration_seconds=None)

Record an execution attempt in history.

Optional method -- backends that support execution recording (e.g. SQLite) override this. The default logs at WARNING level so silent data loss is traceable, then returns None.

Parameters:

Name Type Description Default
job_id str

Job identifier.

required
sheet_num int

Sheet number.

required
attempt_num int

Attempt number within the sheet.

required
prompt str | None

The prompt sent to the backend.

None
output str | None

The output received.

None
exit_code int | None

Exit code from the execution.

None
duration_seconds float | None

Execution duration.

None

Returns:

Type Description
int | None

The ID of the inserted record, or None if not supported.

Source code in src/marianne/state/base.py
async def record_execution(
    self,
    job_id: str,
    sheet_num: int,
    attempt_num: int,
    prompt: str | None = None,
    output: str | None = None,
    exit_code: int | None = None,
    duration_seconds: float | None = None,
) -> int | None:
    """Record an execution attempt in history.

    Optional method -- backends that support execution recording (e.g. SQLite)
    override this. The default logs at WARNING level so silent data
    loss is traceable, then returns None.

    Args:
        job_id: Job identifier.
        sheet_num: Sheet number.
        attempt_num: Attempt number within the sheet.
        prompt: The prompt sent to the backend.
        output: The output received.
        exit_code: Exit code from the execution.
        duration_seconds: Execution duration.

    Returns:
        The ID of the inserted record, or None if not supported.
    """
    _logger.warning(
        "record_execution_noop",
        extra={"job_id": job_id, SHEET_NUM_KEY: sheet_num},
    )
    return None
infer_state_from_artifacts async
infer_state_from_artifacts(job_id, workspace, artifact_pattern)

Infer last completed sheet from artifact files.

Fallback when state file is missing - checks for output files. Based on the fallback logic in run-sheet-review.sh.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required
workspace str

Workspace directory path

required
artifact_pattern str

Glob pattern for sheet artifacts (e.g., "sheet*-security-report.md")

required

Returns:

Type Description
int | None

Inferred last completed sheet number, or None

Source code in src/marianne/state/base.py
async def infer_state_from_artifacts(
    self,
    job_id: str,
    workspace: str,
    artifact_pattern: str,
) -> int | None:
    """Infer last completed sheet from artifact files.

    Fallback when state file is missing - checks for output files.
    Based on the fallback logic in run-sheet-review.sh.

    Args:
        job_id: Unique job identifier
        workspace: Workspace directory path
        artifact_pattern: Glob pattern for sheet artifacts (e.g., "sheet*-security-report.md")

    Returns:
        Inferred last completed sheet number, or None
    """
    import re
    from pathlib import Path

    workspace_path = Path(workspace)
    if not workspace_path.exists():
        return None

    # Find all matching artifact files
    artifacts = list(workspace_path.glob(artifact_pattern))
    if not artifacts:
        return None

    # Extract sheet numbers and find max
    sheet_nums = []
    for artifact in artifacts:
        match = re.search(r"sheet(\d+)", artifact.name)
        if match:
            sheet_nums.append(int(match.group(1)))

    return max(sheet_nums) if sheet_nums else None