Skip to content

checkpoint

checkpoint

Checkpoint and state management models.

Defines the state that gets persisted between runs for resumable orchestration.

Classes

ValidationDetailDict

Bases: TypedDict

Schema for individual validation result entries in SheetState.validation_details.

Most keys are optional (total=False) to support partial dicts from legacy data and simplified test fixtures. passed is required because a validation result without a pass/fail status is meaningless.

PromptMetricsDict

Bases: TypedDict

Schema for prompt analysis metrics in SheetState.prompt_metrics.

All keys are optional (total=False) to support partial metrics from legacy data or simplified test fixtures.

ProgressSnapshotDict

Bases: TypedDict

Schema for execution progress snapshots in SheetState.progress_snapshots.

All keys are optional (total=False) because snapshots may contain varying subsets depending on when they were captured.

ErrorContextDict

Bases: TypedDict

Schema for error context in CheckpointErrorRecord.context.

All keys optional since context varies by error type. Values may be None when the information is not available.

AppliedPatternDict

Bases: TypedDict

Schema for a single applied pattern in SheetState.applied_patterns.

Replaces the parallel applied_pattern_ids / applied_pattern_descriptions lists with a single structured list for safety and clarity.

OutcomeDataDict

Bases: TypedDict

Schema for structured outcome data in SheetState.outcome_data.

All keys optional since this is extensible for learning/pattern recognition.

SynthesisResultDict

Bases: TypedDict

Schema for synthesis result entries in CheckpointState.synthesis_results.

All keys are optional (total=False) to support partial data from legacy state files and test fixtures.

SheetStatus

Bases: str, Enum

Status of a single sheet.

The baton tracks 11 scheduling states. Status display and persistence use all 11. Consumers that only care about terminal/non-terminal can check is_terminal.

Attributes
is_terminal property
is_terminal

Whether this status represents a final state.

is_active property
is_active

Whether this status means the sheet is currently executing.

OutcomeCategory

Bases: str, Enum

Classification of sheet execution outcome (#7).

JobStatus

Bases: str, Enum

Status of an entire job run.

CheckpointErrorRecord

Bases: BaseModel

Record of a single error occurrence during sheet execution.

Stores structured error information for debugging and pattern analysis. Error history is trimmed to MAX_ERROR_HISTORY records per sheet to prevent unbounded state growth.

SheetState

Bases: BaseModel

State for a single sheet.

Attributes
applied_pattern_ids property writable
applied_pattern_ids

Backward-compatible accessor for pattern IDs.

applied_pattern_descriptions property writable
applied_pattern_descriptions

Backward-compatible accessor for pattern descriptions.

can_retry property
can_retry

Whether this sheet has remaining retry budget.

can_complete property
can_complete

Whether this sheet has remaining completion mode budget.

is_exhausted property
is_exhausted

Whether both retry and completion budgets are exhausted.

total_attempts property
total_attempts

Total attempts across all modes.

has_fallback_available property
has_fallback_available

Whether there is another instrument in the fallback chain to try.

Functions
record_attempt
record_attempt(result)

Record an attempt result and update tracking state.

Only non-successful, non-rate-limited attempts increment normal_attempts. Successes and rate-limited attempts are recorded in attempt_results for history but don't consume retry budget.

Source code in src/marianne/core/checkpoint.py
def record_attempt(self, result: Any) -> None:
    """Record an attempt result and update tracking state.

    Only non-successful, non-rate-limited attempts increment
    ``normal_attempts``. Successes and rate-limited attempts are
    recorded in ``attempt_results`` for history but don't consume
    retry budget.
    """
    self.attempt_results.append(result)
    self.total_cost_usd += result.cost_usd
    self.total_duration_seconds += result.duration_seconds

    if not result.rate_limited and not result.execution_success:
        self.normal_attempts += 1
advance_fallback
advance_fallback(reason)

Advance to the next instrument in the fallback chain.

Records the transition, switches instrument_name, resets retry budget, and increments current_instrument_index.

Returns the new instrument name, or None if the chain is exhausted.

Source code in src/marianne/core/checkpoint.py
def advance_fallback(self, reason: str) -> str | None:
    """Advance to the next instrument in the fallback chain.

    Records the transition, switches instrument_name, resets retry
    budget, and increments current_instrument_index.

    Returns the new instrument name, or None if the chain is exhausted.
    """
    if not self.has_fallback_available:
        return None

    from_instrument = self.instrument_name or ""
    self.fallback_attempts[from_instrument] = self.normal_attempts

    to_instrument = self.fallback_chain[self.current_instrument_index]
    self.current_instrument_index += 1
    self.instrument_name = to_instrument

    # Fresh retry budget for the new instrument
    self.normal_attempts = 0
    self.completion_attempts = 0

    # Record the transition
    import datetime as _dt

    self.instrument_fallback_history.append({
        "from": from_instrument,
        "to": to_instrument,
        "reason": reason,
        "timestamp": _dt.datetime.now(tz=_dt.UTC).isoformat(),
    })

    # Trim to prevent unbounded growth
    if len(self.instrument_fallback_history) > MAX_INSTRUMENT_FALLBACK_HISTORY:
        self.instrument_fallback_history = (
            self.instrument_fallback_history[-MAX_INSTRUMENT_FALLBACK_HISTORY:]
        )

    return to_instrument
to_dict
to_dict()

Serialize to dict. Compatibility with dataclass SheetExecutionState.

Source code in src/marianne/core/checkpoint.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to dict. Compatibility with dataclass SheetExecutionState."""
    return self.model_dump(mode="json")
from_dict classmethod
from_dict(data)

Restore from dict. Compatibility with dataclass SheetExecutionState.

Source code in src/marianne/core/checkpoint.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> SheetState:
    """Restore from dict. Compatibility with dataclass SheetExecutionState."""
    return cls.model_validate(data)
capture_output
capture_output(stdout, stderr, max_bytes=MAX_OUTPUT_CAPTURE_BYTES)

Capture tail of stdout/stderr for debugging.

Stores the last max_bytes of each output stream. Sets output_truncated to True if either stream was larger than the limit.

Credential scanning (F-003): Before storing, both streams are scanned for API key patterns (sk-ant-, AKIA, AIzaSy, Bearer tokens) and matches are replaced with [REDACTED_] placeholders. This prevents leaked credentials from propagating to learning store, dashboard, diagnostics, and MCP resources.

Parameters:

Name Type Description Default
stdout str

Full stdout string from execution.

required
stderr str

Full stderr string from execution.

required
max_bytes int

Maximum bytes to capture per stream (default 10KB).

MAX_OUTPUT_CAPTURE_BYTES
Source code in src/marianne/core/checkpoint.py
def capture_output(
    self,
    stdout: str,
    stderr: str,
    max_bytes: int = MAX_OUTPUT_CAPTURE_BYTES,
) -> None:
    """Capture tail of stdout/stderr for debugging.

    Stores the last `max_bytes` of each output stream. Sets output_truncated
    to True if either stream was larger than the limit.

    Credential scanning (F-003): Before storing, both streams are scanned
    for API key patterns (sk-ant-*, AKIA*, AIzaSy*, Bearer tokens) and
    matches are replaced with [REDACTED_*] placeholders. This prevents
    leaked credentials from propagating to learning store, dashboard,
    diagnostics, and MCP resources.

    Args:
        stdout: Full stdout string from execution.
        stderr: Full stderr string from execution.
        max_bytes: Maximum bytes to capture per stream (default 10KB).
    """
    from marianne.utils.credential_scanner import redact_credentials

    # Convert to bytes to measure actual size, then slice
    stdout_bytes = stdout.encode("utf-8", errors="replace")
    stderr_bytes = stderr.encode("utf-8", errors="replace")

    stdout_truncated = len(stdout_bytes) > max_bytes
    stderr_truncated = len(stderr_bytes) > max_bytes

    # Capture tail (last N bytes) and decode back to string
    if stdout_truncated:
        self.stdout_tail = stdout_bytes[-max_bytes:].decode(
            "utf-8", errors="replace"
        )
    else:
        self.stdout_tail = stdout if stdout else None

    if stderr_truncated:
        self.stderr_tail = stderr_bytes[-max_bytes:].decode(
            "utf-8", errors="replace"
        )
    else:
        self.stderr_tail = stderr if stderr else None

    # Scan and redact credential patterns before storage (F-003)
    self.stdout_tail = redact_credentials(self.stdout_tail)
    self.stderr_tail = redact_credentials(self.stderr_tail)

    self.output_truncated = stdout_truncated or stderr_truncated
add_error_to_history
add_error_to_history(error)

Append an error record and enforce the history size limit.

All callers that add errors to error_history should use this method instead of appending directly so that the list never exceeds MAX_ERROR_HISTORY entries.

Parameters:

Name Type Description Default
error CheckpointErrorRecord

The error record to add.

required
Source code in src/marianne/core/checkpoint.py
def add_error_to_history(self, error: CheckpointErrorRecord) -> None:
    """Append an error record and enforce the history size limit.

    All callers that add errors to ``error_history`` should use this
    method instead of appending directly so that the list never exceeds
    ``MAX_ERROR_HISTORY`` entries.

    Args:
        error: The error record to add.
    """
    self.error_history.append(error)
    if len(self.error_history) > MAX_ERROR_HISTORY:
        self.error_history = self.error_history[-MAX_ERROR_HISTORY:]
add_fallback_to_history
add_fallback_to_history(record)

Append a fallback record and enforce the history size limit.

All callers that add entries to instrument_fallback_history should use this method instead of appending directly so the list never exceeds MAX_INSTRUMENT_FALLBACK_HISTORY entries.

Parameters:

Name Type Description Default
record dict[str, str]

Dict with keys from, to, reason, timestamp.

required
Source code in src/marianne/core/checkpoint.py
def add_fallback_to_history(self, record: dict[str, str]) -> None:
    """Append a fallback record and enforce the history size limit.

    All callers that add entries to ``instrument_fallback_history``
    should use this method instead of appending directly so the list
    never exceeds ``MAX_INSTRUMENT_FALLBACK_HISTORY`` entries.

    Args:
        record: Dict with keys from, to, reason, timestamp.
    """
    self.instrument_fallback_history.append(record)
    if len(self.instrument_fallback_history) > MAX_INSTRUMENT_FALLBACK_HISTORY:
        self.instrument_fallback_history = self.instrument_fallback_history[
            -MAX_INSTRUMENT_FALLBACK_HISTORY:
        ]

CheckpointState

Bases: BaseModel

Complete checkpoint state for a job run.

This is the primary state object that gets persisted and restored for resumable job execution.

Zombie Detection

A job is considered a "zombie" when the state shows RUNNING status but the associated process (tracked by pid) is no longer alive. This can happen when: - External timeout wrapper sends SIGKILL - System crash or forced termination - WSL shutdown while job running

Use is_zombie() to detect this state, and mark_zombie_detected() to recover from it.

Worktree Isolation

When isolation is enabled, jobs execute in a separate git worktree. The worktree tracking fields record the worktree state for: - Resume operations (reuse existing worktree) - Cleanup on completion (remove or preserve based on outcome) - Debugging (know which worktree was used)

Functions
record_hook_result
record_hook_result(result)

Append a hook result to the checkpoint state.

Parameters:

Name Type Description Default
result dict[str, Any]

Serialized HookResult dict from hook execution.

required
Source code in src/marianne/core/checkpoint.py
def record_hook_result(self, result: dict[str, Any]) -> None:
    """Append a hook result to the checkpoint state.

    Args:
        result: Serialized HookResult dict from hook execution.
    """
    self.hook_results.append(result)
    self.updated_at = utc_now()
record_circuit_breaker_change
record_circuit_breaker_change(state, trigger, consecutive_failures)

Record a circuit breaker state transition.

Persists circuit breaker state changes so that mzt status can display ground-truth CB state instead of inferring it from failure patterns.

Parameters:

Name Type Description Default
state str

Current CB state after transition ("closed", "open", "half_open").

required
trigger str

What caused the transition (e.g., "failure_recorded", "success_recorded").

required
consecutive_failures int

Number of consecutive failures at time of transition.

required
Source code in src/marianne/core/checkpoint.py
def record_circuit_breaker_change(
    self,
    state: str,
    trigger: str,
    consecutive_failures: int,
) -> None:
    """Record a circuit breaker state transition.

    Persists circuit breaker state changes so that ``mzt status``
    can display ground-truth CB state instead of inferring it from
    failure patterns.

    Args:
        state: Current CB state after transition ("closed", "open", "half_open").
        trigger: What caused the transition (e.g., "failure_recorded", "success_recorded").
        consecutive_failures: Number of consecutive failures at time of transition.
    """
    self.circuit_breaker_history.append({
        "state": state,
        "timestamp": utc_now().isoformat(),
        "trigger": trigger,
        "consecutive_failures": consecutive_failures,
    })
    self.updated_at = utc_now()

    _logger.debug(
        "circuit_breaker_change_recorded",
        job_id=self.job_id,
        state=state,
        trigger=trigger,
        consecutive_failures=consecutive_failures,
    )
add_synthesis
add_synthesis(batch_id, result)

Add or update a synthesis result.

Parameters:

Name Type Description Default
batch_id str

The batch identifier.

required
result SynthesisResultDict

Synthesis result as dict (from SynthesisResult.to_dict()).

required
Source code in src/marianne/core/checkpoint.py
def add_synthesis(self, batch_id: str, result: SynthesisResultDict) -> None:
    """Add or update a synthesis result.

    Args:
        batch_id: The batch identifier.
        result: Synthesis result as dict (from SynthesisResult.to_dict()).
    """
    self.synthesis_results[batch_id] = result
    self.updated_at = utc_now()

    _logger.debug(
        "synthesis_added",
        job_id=self.job_id,
        batch_id=batch_id,
        status=result.get("status"),
    )
get_next_sheet
get_next_sheet()

Determine the next sheet to process.

Returns None if all sheets are complete.

Source code in src/marianne/core/checkpoint.py
def get_next_sheet(self) -> int | None:
    """Determine the next sheet to process.

    Returns None if all sheets are complete.
    """
    if self.status in (JobStatus.COMPLETED, JobStatus.CANCELLED):
        return None

    # Check for in-progress sheet (resume from crash)
    if self.current_sheet is not None:
        sheet_state = self.sheets.get(self.current_sheet)
        if sheet_state and sheet_state.status == SheetStatus.IN_PROGRESS:
            return self.current_sheet

    # Find next pending sheet after last completed
    for sheet_num in range(self.last_completed_sheet + 1, self.total_sheets + 1):
        sheet_state = self.sheets.get(sheet_num)
        if sheet_state is None:
            return sheet_num
        if sheet_state.status in (SheetStatus.PENDING, SheetStatus.FAILED):
            return sheet_num

    return None
mark_sheet_started
mark_sheet_started(sheet_num)

Mark a sheet as started.

Source code in src/marianne/core/checkpoint.py
def mark_sheet_started(self, sheet_num: int) -> None:
    """Mark a sheet as started."""
    previous_status = self.status
    self.current_sheet = sheet_num
    self.status = JobStatus.RUNNING
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)

    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.IN_PROGRESS
    sheet.started_at = utc_now()
    sheet.attempt_count += 1
    # Clear stale fields from previous attempts so retry starts clean.
    sheet.error_message = None
    sheet.exit_code = None
    sheet.execution_mode = None

    _logger.debug(
        "sheet_started",
        job_id=self.job_id,
        sheet_num=sheet_num,
        attempt_count=sheet.attempt_count,
        previous_status=previous_status.value,
        total_sheets=self.total_sheets,
    )
mark_sheet_completed
mark_sheet_completed(sheet_num, validation_passed=True, validation_details=None, execution_duration_seconds=None)

Mark a sheet as completed.

Parameters:

Name Type Description Default
sheet_num int

Sheet number that completed.

required
validation_passed bool

Whether validation checks passed.

True
validation_details list[ValidationDetailDict] | None

Detailed validation results.

None
execution_duration_seconds float | None

How long the sheet execution took.

None
Source code in src/marianne/core/checkpoint.py
def mark_sheet_completed(
    self,
    sheet_num: int,
    validation_passed: bool = True,
    validation_details: list[ValidationDetailDict] | None = None,
    execution_duration_seconds: float | None = None,
) -> None:
    """Mark a sheet as completed.

    Args:
        sheet_num: Sheet number that completed.
        validation_passed: Whether validation checks passed.
        validation_details: Detailed validation results.
        execution_duration_seconds: How long the sheet execution took.
    """
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)
    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.COMPLETED
    sheet.completed_at = utc_now()
    sheet.exit_code = 0
    sheet.validation_passed = validation_passed
    sheet.validation_details = validation_details
    if execution_duration_seconds is not None:
        sheet.execution_duration_seconds = execution_duration_seconds

    # Only advance the watermark, never retreat it (Q016/#37).
    # In parallel execution, sheets may complete out of order.
    if sheet_num > self.last_completed_sheet:
        self.last_completed_sheet = sheet_num
    self.current_sheet = None

    # Check if job is complete — all sheets must be completed, not just
    # the highest-numbered one, to handle parallel out-of-order completion.
    job_completed = (
        len(self.sheets) >= self.total_sheets
        and all(
            s.status == SheetStatus.COMPLETED
            for s in self.sheets.values()
        )
    )
    if job_completed:
        self.status = JobStatus.COMPLETED
        self.completed_at = utc_now()

    _logger.debug(
        "sheet_completed",
        job_id=self.job_id,
        sheet_num=sheet_num,
        validation_passed=validation_passed,
        attempt_count=sheet.attempt_count,
        job_completed=job_completed,
        progress=f"{self.last_completed_sheet}/{self.total_sheets}",
    )
mark_sheet_failed
mark_sheet_failed(sheet_num, error_message, error_category=None, exit_code=None, exit_signal=None, exit_reason=None, execution_duration_seconds=None, error_code=None)

Mark a sheet as failed.

Parameters:

Name Type Description Default
sheet_num int

Sheet number that failed.

required
error_message str

Human-readable error description.

required
error_category ErrorCategory | str | None

Error category from ErrorClassifier (e.g., "signal", "timeout").

None
exit_code int | None

Process exit code (None if killed by signal).

None
exit_signal int | None

Signal number if killed by signal (e.g., 9=SIGKILL, 15=SIGTERM).

None
exit_reason ExitReason | None

Why execution ended ("completed", "timeout", "killed", "error").

None
execution_duration_seconds float | None

How long the sheet execution took.

None
error_code str | None

Structured error code (e.g., "E001", "E006"). More specific than error_category — distinguishes stale (E006) from timeout (E001).

None
Source code in src/marianne/core/checkpoint.py
def mark_sheet_failed(
    self,
    sheet_num: int,
    error_message: str,
    error_category: ErrorCategory | str | None = None,
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    execution_duration_seconds: float | None = None,
    error_code: str | None = None,
) -> None:
    """Mark a sheet as failed.

    Args:
        sheet_num: Sheet number that failed.
        error_message: Human-readable error description.
        error_category: Error category from ErrorClassifier (e.g., "signal", "timeout").
        exit_code: Process exit code (None if killed by signal).
        exit_signal: Signal number if killed by signal (e.g., 9=SIGKILL, 15=SIGTERM).
        exit_reason: Why execution ended ("completed", "timeout", "killed", "error").
        execution_duration_seconds: How long the sheet execution took.
        error_code: Structured error code (e.g., "E001", "E006"). More specific
            than error_category — distinguishes stale (E006) from timeout (E001).
    """
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)
    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.FAILED
    sheet.completed_at = utc_now()
    sheet.error_message = error_message
    if error_category is not None and not isinstance(error_category, ErrorCategory):
        try:
            error_category = ErrorCategory(error_category)
        except ValueError:
            _logger.warning("unknown_error_category", value=error_category)
            error_category = None
    sheet.error_category = error_category
    sheet.error_code = error_code
    sheet.exit_code = exit_code
    sheet.exit_signal = exit_signal
    sheet.exit_reason = exit_reason
    if execution_duration_seconds is not None:
        sheet.execution_duration_seconds = execution_duration_seconds

    self.current_sheet = None
    self.total_retry_count += 1

    _logger.debug(
        "sheet_failed",
        job_id=self.job_id,
        sheet_num=sheet_num,
        error_category=error_category,
        exit_code=exit_code,
        exit_signal=exit_signal,
        exit_reason=exit_reason,
        attempt_count=sheet.attempt_count,
        total_retry_count=self.total_retry_count,
        error_message=error_message[:100] if error_message else None,
    )
mark_sheet_skipped
mark_sheet_skipped(sheet_num, reason=None)

Mark a sheet as skipped.

v21 Evolution: Proactive Checkpoint System - supports skipping sheets via checkpoint response.

Parameters:

Name Type Description Default
sheet_num int

Sheet number to skip.

required
reason str | None

Optional reason for skipping (stored in error_message field).

None
Source code in src/marianne/core/checkpoint.py
def mark_sheet_skipped(
    self,
    sheet_num: int,
    reason: str | None = None,
) -> None:
    """Mark a sheet as skipped.

    v21 Evolution: Proactive Checkpoint System - supports skipping sheets
    via checkpoint response.

    Args:
        sheet_num: Sheet number to skip.
        reason: Optional reason for skipping (stored in error_message field).
    """
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)
    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.SKIPPED
    sheet.completed_at = utc_now()
    if reason:
        sheet.error_message = reason

    self.current_sheet = None

    _logger.info(
        "sheet_skipped",
        job_id=self.job_id,
        sheet_num=sheet_num,
        reason=reason,
    )
mark_job_failed
mark_job_failed(error_message)

Mark the entire job as failed.

Source code in src/marianne/core/checkpoint.py
def mark_job_failed(self, error_message: str) -> None:
    """Mark the entire job as failed."""
    previous_status = self.status
    self.status = JobStatus.FAILED
    self.error_message = error_message
    self.pid = None  # Clear PID so stale PID doesn't block resume
    self.completed_at = utc_now()
    self.updated_at = utc_now()

    _logger.error(
        "job_failed",
        job_id=self.job_id,
        previous_status=previous_status.value,
        last_completed_sheet=self.last_completed_sheet,
        total_sheets=self.total_sheets,
        total_retry_count=self.total_retry_count,
        error_message=error_message[:200] if error_message else None,
    )
mark_job_paused
mark_job_paused()

Mark the job as paused.

Source code in src/marianne/core/checkpoint.py
def mark_job_paused(self) -> None:
    """Mark the job as paused."""
    previous_status = self.status
    self.status = JobStatus.PAUSED
    self.pid = None  # Clear PID so stale PID doesn't block resume
    self.updated_at = utc_now()

    _logger.info(
        "job_paused",
        job_id=self.job_id,
        previous_status=previous_status.value,
        last_completed_sheet=self.last_completed_sheet,
        total_sheets=self.total_sheets,
        current_sheet=self.current_sheet,
    )
get_progress
get_progress()

Get progress as (completed, total).

Source code in src/marianne/core/checkpoint.py
def get_progress(self) -> tuple[int, int]:
    """Get progress as (completed, total)."""
    completed = sum(
        1 for b in self.sheets.values()
        if b.status == SheetStatus.COMPLETED
    )
    return completed, self.total_sheets
get_progress_percent
get_progress_percent()

Get progress as percentage.

Source code in src/marianne/core/checkpoint.py
def get_progress_percent(self) -> float:
    """Get progress as percentage."""
    completed, total = self.get_progress()
    return (completed / total * 100) if total > 0 else 0.0
is_zombie
is_zombie()

Check if this job is a zombie (RUNNING but process dead).

A zombie state occurs when: 1. Status is RUNNING 2. PID is set 3. Process with that PID is no longer alive

Note: This only checks if the PID is dead. It does NOT use time-based stale detection, as jobs can legitimately run for hours or days.

Returns:

Type Description
bool

True if job appears to be a zombie, False otherwise.

Source code in src/marianne/core/checkpoint.py
def is_zombie(self) -> bool:
    """Check if this job is a zombie (RUNNING but process dead).

    A zombie state occurs when:
    1. Status is RUNNING
    2. PID is set
    3. Process with that PID is no longer alive

    Note: This only checks if the PID is dead. It does NOT use time-based
    stale detection, as jobs can legitimately run for hours or days.

    Returns:
        True if job appears to be a zombie, False otherwise.
    """
    # Only RUNNING jobs can be zombies
    if self.status != JobStatus.RUNNING:
        return False

    # If no PID recorded, can't determine - not a zombie by this check
    if self.pid is None:
        return False

    # Check if process is alive
    try:
        # os.kill with signal 0 checks if process exists without killing it
        os.kill(self.pid, 0)
        # Process exists - not a zombie
        return False
    except ProcessLookupError:
        # Process doesn't exist - definite zombie
        _logger.warning(
            "zombie_dead_pid_detected",
            job_id=self.job_id,
            pid=self.pid,
        )
        return True
    except PermissionError:
        # Can't check (different user) - assume not zombie
        return False
    except OSError:
        # Other OS error - assume not zombie to be safe
        return False
mark_zombie_detected
mark_zombie_detected(reason=None)

Mark this job as recovered from zombie state.

Changes status from RUNNING to PAUSED, clears PID, and records the zombie recovery in the error message.

Parameters:

Name Type Description Default
reason str | None

Optional additional context about why zombie was detected.

None
Source code in src/marianne/core/checkpoint.py
def mark_zombie_detected(self, reason: str | None = None) -> None:
    """Mark this job as recovered from zombie state.

    Changes status from RUNNING to PAUSED, clears PID, and records
    the zombie recovery in the error message.

    Args:
        reason: Optional additional context about why zombie was detected.
    """
    previous_status = self.status
    previous_pid = self.pid

    self.status = JobStatus.PAUSED
    self.pid = None
    self.updated_at = utc_now()

    # Build zombie recovery message - this is informational, not an error
    # The job has been successfully recovered and can be resumed
    zombie_msg = (
        f"Recovered from stale running state (PID {previous_pid} no longer active). "
        "Job is now paused and ready to resume."
    )
    if reason:
        zombie_msg += f" Trigger: {reason}"

    # Only set error_message if there isn't already a real error
    # Zombie recovery is informational, not an error condition
    if not self.error_message:
        self.error_message = zombie_msg

    _logger.warning(
        "zombie_recovered",
        job_id=self.job_id,
        previous_status=previous_status.value,
        previous_pid=previous_pid,
        reason=reason,
    )
set_running_pid
set_running_pid(pid=None)

Set the PID of the running orchestrator process.

Call this when starting job execution to enable zombie detection. If pid is None, uses the current process PID.

Parameters:

Name Type Description Default
pid int | None

Process ID to record. Defaults to current process.

None
Source code in src/marianne/core/checkpoint.py
def set_running_pid(self, pid: int | None = None) -> None:
    """Set the PID of the running orchestrator process.

    Call this when starting job execution to enable zombie detection.
    If pid is None, uses the current process PID.

    Args:
        pid: Process ID to record. Defaults to current process.
    """
    self.pid = pid if pid is not None else os.getpid()
    self.updated_at = utc_now()

    _logger.debug(
        "running_pid_set",
        job_id=self.job_id,
        pid=self.pid,
    )

Functions