Skip to content

Index

core

Core domain models and configuration.

Classes

CheckpointState

Bases: BaseModel

Complete checkpoint state for a job run.

This is the primary state object that gets persisted and restored for resumable job execution.

Zombie Detection

A job is considered a "zombie" when the state shows RUNNING status but the associated process (tracked by pid) is no longer alive. This can happen when: - External timeout wrapper sends SIGKILL - System crash or forced termination - WSL shutdown while job running

Use is_zombie() to detect this state, and mark_zombie_detected() to recover from it.

Worktree Isolation

When isolation is enabled, jobs execute in a separate git worktree. The worktree tracking fields record the worktree state for: - Resume operations (reuse existing worktree) - Cleanup on completion (remove or preserve based on outcome) - Debugging (know which worktree was used)

Functions
record_hook_result
record_hook_result(result)

Append a hook result to the checkpoint state.

Parameters:

Name Type Description Default
result dict[str, Any]

Serialized HookResult dict from hook execution.

required
Source code in src/marianne/core/checkpoint.py
def record_hook_result(self, result: dict[str, Any]) -> None:
    """Append a hook result to the checkpoint state.

    Args:
        result: Serialized HookResult dict from hook execution.
    """
    self.hook_results.append(result)
    self.updated_at = utc_now()
record_circuit_breaker_change
record_circuit_breaker_change(state, trigger, consecutive_failures)

Record a circuit breaker state transition.

Persists circuit breaker state changes so that mzt status can display ground-truth CB state instead of inferring it from failure patterns.

Parameters:

Name Type Description Default
state str

Current CB state after transition ("closed", "open", "half_open").

required
trigger str

What caused the transition (e.g., "failure_recorded", "success_recorded").

required
consecutive_failures int

Number of consecutive failures at time of transition.

required
Source code in src/marianne/core/checkpoint.py
def record_circuit_breaker_change(
    self,
    state: str,
    trigger: str,
    consecutive_failures: int,
) -> None:
    """Record a circuit breaker state transition.

    Persists circuit breaker state changes so that ``mzt status``
    can display ground-truth CB state instead of inferring it from
    failure patterns.

    Args:
        state: Current CB state after transition ("closed", "open", "half_open").
        trigger: What caused the transition (e.g., "failure_recorded", "success_recorded").
        consecutive_failures: Number of consecutive failures at time of transition.
    """
    self.circuit_breaker_history.append({
        "state": state,
        "timestamp": utc_now().isoformat(),
        "trigger": trigger,
        "consecutive_failures": consecutive_failures,
    })
    self.updated_at = utc_now()

    _logger.debug(
        "circuit_breaker_change_recorded",
        job_id=self.job_id,
        state=state,
        trigger=trigger,
        consecutive_failures=consecutive_failures,
    )
add_synthesis
add_synthesis(batch_id, result)

Add or update a synthesis result.

Parameters:

Name Type Description Default
batch_id str

The batch identifier.

required
result SynthesisResultDict

Synthesis result as dict (from SynthesisResult.to_dict()).

required
Source code in src/marianne/core/checkpoint.py
def add_synthesis(self, batch_id: str, result: SynthesisResultDict) -> None:
    """Add or update a synthesis result.

    Args:
        batch_id: The batch identifier.
        result: Synthesis result as dict (from SynthesisResult.to_dict()).
    """
    self.synthesis_results[batch_id] = result
    self.updated_at = utc_now()

    _logger.debug(
        "synthesis_added",
        job_id=self.job_id,
        batch_id=batch_id,
        status=result.get("status"),
    )
get_next_sheet
get_next_sheet()

Determine the next sheet to process.

Returns None if all sheets are complete.

Source code in src/marianne/core/checkpoint.py
def get_next_sheet(self) -> int | None:
    """Determine the next sheet to process.

    Returns None if all sheets are complete.
    """
    if self.status in (JobStatus.COMPLETED, JobStatus.CANCELLED):
        return None

    # Check for in-progress sheet (resume from crash)
    if self.current_sheet is not None:
        sheet_state = self.sheets.get(self.current_sheet)
        if sheet_state and sheet_state.status == SheetStatus.IN_PROGRESS:
            return self.current_sheet

    # Find next pending sheet after last completed
    for sheet_num in range(self.last_completed_sheet + 1, self.total_sheets + 1):
        sheet_state = self.sheets.get(sheet_num)
        if sheet_state is None:
            return sheet_num
        if sheet_state.status in (SheetStatus.PENDING, SheetStatus.FAILED):
            return sheet_num

    return None
mark_sheet_started
mark_sheet_started(sheet_num)

Mark a sheet as started.

Source code in src/marianne/core/checkpoint.py
def mark_sheet_started(self, sheet_num: int) -> None:
    """Mark a sheet as started."""
    previous_status = self.status
    self.current_sheet = sheet_num
    self.status = JobStatus.RUNNING
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)

    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.IN_PROGRESS
    sheet.started_at = utc_now()
    sheet.attempt_count += 1
    # Clear stale fields from previous attempts so retry starts clean.
    sheet.error_message = None
    sheet.exit_code = None
    sheet.execution_mode = None

    _logger.debug(
        "sheet_started",
        job_id=self.job_id,
        sheet_num=sheet_num,
        attempt_count=sheet.attempt_count,
        previous_status=previous_status.value,
        total_sheets=self.total_sheets,
    )
mark_sheet_completed
mark_sheet_completed(sheet_num, validation_passed=True, validation_details=None, execution_duration_seconds=None)

Mark a sheet as completed.

Parameters:

Name Type Description Default
sheet_num int

Sheet number that completed.

required
validation_passed bool

Whether validation checks passed.

True
validation_details list[ValidationDetailDict] | None

Detailed validation results.

None
execution_duration_seconds float | None

How long the sheet execution took.

None
Source code in src/marianne/core/checkpoint.py
def mark_sheet_completed(
    self,
    sheet_num: int,
    validation_passed: bool = True,
    validation_details: list[ValidationDetailDict] | None = None,
    execution_duration_seconds: float | None = None,
) -> None:
    """Mark a sheet as completed.

    Args:
        sheet_num: Sheet number that completed.
        validation_passed: Whether validation checks passed.
        validation_details: Detailed validation results.
        execution_duration_seconds: How long the sheet execution took.
    """
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)
    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.COMPLETED
    sheet.completed_at = utc_now()
    sheet.exit_code = 0
    sheet.validation_passed = validation_passed
    sheet.validation_details = validation_details
    if execution_duration_seconds is not None:
        sheet.execution_duration_seconds = execution_duration_seconds

    # Only advance the watermark, never retreat it (Q016/#37).
    # In parallel execution, sheets may complete out of order.
    if sheet_num > self.last_completed_sheet:
        self.last_completed_sheet = sheet_num
    self.current_sheet = None

    # Check if job is complete — all sheets must be completed, not just
    # the highest-numbered one, to handle parallel out-of-order completion.
    job_completed = (
        len(self.sheets) >= self.total_sheets
        and all(
            s.status == SheetStatus.COMPLETED
            for s in self.sheets.values()
        )
    )
    if job_completed:
        self.status = JobStatus.COMPLETED
        self.completed_at = utc_now()

    _logger.debug(
        "sheet_completed",
        job_id=self.job_id,
        sheet_num=sheet_num,
        validation_passed=validation_passed,
        attempt_count=sheet.attempt_count,
        job_completed=job_completed,
        progress=f"{self.last_completed_sheet}/{self.total_sheets}",
    )
mark_sheet_failed
mark_sheet_failed(sheet_num, error_message, error_category=None, exit_code=None, exit_signal=None, exit_reason=None, execution_duration_seconds=None, error_code=None)

Mark a sheet as failed.

Parameters:

Name Type Description Default
sheet_num int

Sheet number that failed.

required
error_message str

Human-readable error description.

required
error_category ErrorCategory | str | None

Error category from ErrorClassifier (e.g., "signal", "timeout").

None
exit_code int | None

Process exit code (None if killed by signal).

None
exit_signal int | None

Signal number if killed by signal (e.g., 9=SIGKILL, 15=SIGTERM).

None
exit_reason ExitReason | None

Why execution ended ("completed", "timeout", "killed", "error").

None
execution_duration_seconds float | None

How long the sheet execution took.

None
error_code str | None

Structured error code (e.g., "E001", "E006"). More specific than error_category — distinguishes stale (E006) from timeout (E001).

None
Source code in src/marianne/core/checkpoint.py
def mark_sheet_failed(
    self,
    sheet_num: int,
    error_message: str,
    error_category: ErrorCategory | str | None = None,
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    execution_duration_seconds: float | None = None,
    error_code: str | None = None,
) -> None:
    """Mark a sheet as failed.

    Args:
        sheet_num: Sheet number that failed.
        error_message: Human-readable error description.
        error_category: Error category from ErrorClassifier (e.g., "signal", "timeout").
        exit_code: Process exit code (None if killed by signal).
        exit_signal: Signal number if killed by signal (e.g., 9=SIGKILL, 15=SIGTERM).
        exit_reason: Why execution ended ("completed", "timeout", "killed", "error").
        execution_duration_seconds: How long the sheet execution took.
        error_code: Structured error code (e.g., "E001", "E006"). More specific
            than error_category — distinguishes stale (E006) from timeout (E001).
    """
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)
    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.FAILED
    sheet.completed_at = utc_now()
    sheet.error_message = error_message
    if error_category is not None and not isinstance(error_category, ErrorCategory):
        try:
            error_category = ErrorCategory(error_category)
        except ValueError:
            _logger.warning("unknown_error_category", value=error_category)
            error_category = None
    sheet.error_category = error_category
    sheet.error_code = error_code
    sheet.exit_code = exit_code
    sheet.exit_signal = exit_signal
    sheet.exit_reason = exit_reason
    if execution_duration_seconds is not None:
        sheet.execution_duration_seconds = execution_duration_seconds

    self.current_sheet = None
    self.total_retry_count += 1

    _logger.debug(
        "sheet_failed",
        job_id=self.job_id,
        sheet_num=sheet_num,
        error_category=error_category,
        exit_code=exit_code,
        exit_signal=exit_signal,
        exit_reason=exit_reason,
        attempt_count=sheet.attempt_count,
        total_retry_count=self.total_retry_count,
        error_message=error_message[:100] if error_message else None,
    )
mark_sheet_skipped
mark_sheet_skipped(sheet_num, reason=None)

Mark a sheet as skipped.

v21 Evolution: Proactive Checkpoint System - supports skipping sheets via checkpoint response.

Parameters:

Name Type Description Default
sheet_num int

Sheet number to skip.

required
reason str | None

Optional reason for skipping (stored in error_message field).

None
Source code in src/marianne/core/checkpoint.py
def mark_sheet_skipped(
    self,
    sheet_num: int,
    reason: str | None = None,
) -> None:
    """Mark a sheet as skipped.

    v21 Evolution: Proactive Checkpoint System - supports skipping sheets
    via checkpoint response.

    Args:
        sheet_num: Sheet number to skip.
        reason: Optional reason for skipping (stored in error_message field).
    """
    self.updated_at = utc_now()

    if sheet_num not in self.sheets:
        self.sheets[sheet_num] = SheetState(sheet_num=sheet_num)
    sheet = self.sheets[sheet_num]
    sheet.status = SheetStatus.SKIPPED
    sheet.completed_at = utc_now()
    if reason:
        sheet.error_message = reason

    self.current_sheet = None

    _logger.info(
        "sheet_skipped",
        job_id=self.job_id,
        sheet_num=sheet_num,
        reason=reason,
    )
mark_job_failed
mark_job_failed(error_message)

Mark the entire job as failed.

Source code in src/marianne/core/checkpoint.py
def mark_job_failed(self, error_message: str) -> None:
    """Mark the entire job as failed."""
    previous_status = self.status
    self.status = JobStatus.FAILED
    self.error_message = error_message
    self.pid = None  # Clear PID so stale PID doesn't block resume
    self.completed_at = utc_now()
    self.updated_at = utc_now()

    _logger.error(
        "job_failed",
        job_id=self.job_id,
        previous_status=previous_status.value,
        last_completed_sheet=self.last_completed_sheet,
        total_sheets=self.total_sheets,
        total_retry_count=self.total_retry_count,
        error_message=error_message[:200] if error_message else None,
    )
mark_job_paused
mark_job_paused()

Mark the job as paused.

Source code in src/marianne/core/checkpoint.py
def mark_job_paused(self) -> None:
    """Mark the job as paused."""
    previous_status = self.status
    self.status = JobStatus.PAUSED
    self.pid = None  # Clear PID so stale PID doesn't block resume
    self.updated_at = utc_now()

    _logger.info(
        "job_paused",
        job_id=self.job_id,
        previous_status=previous_status.value,
        last_completed_sheet=self.last_completed_sheet,
        total_sheets=self.total_sheets,
        current_sheet=self.current_sheet,
    )
get_progress
get_progress()

Get progress as (completed, total).

Source code in src/marianne/core/checkpoint.py
def get_progress(self) -> tuple[int, int]:
    """Get progress as (completed, total)."""
    completed = sum(
        1 for b in self.sheets.values()
        if b.status == SheetStatus.COMPLETED
    )
    return completed, self.total_sheets
get_progress_percent
get_progress_percent()

Get progress as percentage.

Source code in src/marianne/core/checkpoint.py
def get_progress_percent(self) -> float:
    """Get progress as percentage."""
    completed, total = self.get_progress()
    return (completed / total * 100) if total > 0 else 0.0
is_zombie
is_zombie()

Check if this job is a zombie (RUNNING but process dead).

A zombie state occurs when: 1. Status is RUNNING 2. PID is set 3. Process with that PID is no longer alive

Note: This only checks if the PID is dead. It does NOT use time-based stale detection, as jobs can legitimately run for hours or days.

Returns:

Type Description
bool

True if job appears to be a zombie, False otherwise.

Source code in src/marianne/core/checkpoint.py
def is_zombie(self) -> bool:
    """Check if this job is a zombie (RUNNING but process dead).

    A zombie state occurs when:
    1. Status is RUNNING
    2. PID is set
    3. Process with that PID is no longer alive

    Note: This only checks if the PID is dead. It does NOT use time-based
    stale detection, as jobs can legitimately run for hours or days.

    Returns:
        True if job appears to be a zombie, False otherwise.
    """
    # Only RUNNING jobs can be zombies
    if self.status != JobStatus.RUNNING:
        return False

    # If no PID recorded, can't determine - not a zombie by this check
    if self.pid is None:
        return False

    # Check if process is alive
    try:
        # os.kill with signal 0 checks if process exists without killing it
        os.kill(self.pid, 0)
        # Process exists - not a zombie
        return False
    except ProcessLookupError:
        # Process doesn't exist - definite zombie
        _logger.warning(
            "zombie_dead_pid_detected",
            job_id=self.job_id,
            pid=self.pid,
        )
        return True
    except PermissionError:
        # Can't check (different user) - assume not zombie
        return False
    except OSError:
        # Other OS error - assume not zombie to be safe
        return False
mark_zombie_detected
mark_zombie_detected(reason=None)

Mark this job as recovered from zombie state.

Changes status from RUNNING to PAUSED, clears PID, and records the zombie recovery in the error message.

Parameters:

Name Type Description Default
reason str | None

Optional additional context about why zombie was detected.

None
Source code in src/marianne/core/checkpoint.py
def mark_zombie_detected(self, reason: str | None = None) -> None:
    """Mark this job as recovered from zombie state.

    Changes status from RUNNING to PAUSED, clears PID, and records
    the zombie recovery in the error message.

    Args:
        reason: Optional additional context about why zombie was detected.
    """
    previous_status = self.status
    previous_pid = self.pid

    self.status = JobStatus.PAUSED
    self.pid = None
    self.updated_at = utc_now()

    # Build zombie recovery message - this is informational, not an error
    # The job has been successfully recovered and can be resumed
    zombie_msg = (
        f"Recovered from stale running state (PID {previous_pid} no longer active). "
        "Job is now paused and ready to resume."
    )
    if reason:
        zombie_msg += f" Trigger: {reason}"

    # Only set error_message if there isn't already a real error
    # Zombie recovery is informational, not an error condition
    if not self.error_message:
        self.error_message = zombie_msg

    _logger.warning(
        "zombie_recovered",
        job_id=self.job_id,
        previous_status=previous_status.value,
        previous_pid=previous_pid,
        reason=reason,
    )
set_running_pid
set_running_pid(pid=None)

Set the PID of the running orchestrator process.

Call this when starting job execution to enable zombie detection. If pid is None, uses the current process PID.

Parameters:

Name Type Description Default
pid int | None

Process ID to record. Defaults to current process.

None
Source code in src/marianne/core/checkpoint.py
def set_running_pid(self, pid: int | None = None) -> None:
    """Set the PID of the running orchestrator process.

    Call this when starting job execution to enable zombie detection.
    If pid is None, uses the current process PID.

    Args:
        pid: Process ID to record. Defaults to current process.
    """
    self.pid = pid if pid is not None else os.getpid()
    self.updated_at = utc_now()

    _logger.debug(
        "running_pid_set",
        job_id=self.job_id,
        pid=self.pid,
    )

SheetState

Bases: BaseModel

State for a single sheet.

Attributes
applied_pattern_ids property writable
applied_pattern_ids

Backward-compatible accessor for pattern IDs.

applied_pattern_descriptions property writable
applied_pattern_descriptions

Backward-compatible accessor for pattern descriptions.

can_retry property
can_retry

Whether this sheet has remaining retry budget.

can_complete property
can_complete

Whether this sheet has remaining completion mode budget.

is_exhausted property
is_exhausted

Whether both retry and completion budgets are exhausted.

total_attempts property
total_attempts

Total attempts across all modes.

has_fallback_available property
has_fallback_available

Whether there is another instrument in the fallback chain to try.

Functions
record_attempt
record_attempt(result)

Record an attempt result and update tracking state.

Only non-successful, non-rate-limited attempts increment normal_attempts. Successes and rate-limited attempts are recorded in attempt_results for history but don't consume retry budget.

Source code in src/marianne/core/checkpoint.py
def record_attempt(self, result: Any) -> None:
    """Record an attempt result and update tracking state.

    Only non-successful, non-rate-limited attempts increment
    ``normal_attempts``. Successes and rate-limited attempts are
    recorded in ``attempt_results`` for history but don't consume
    retry budget.
    """
    self.attempt_results.append(result)
    self.total_cost_usd += result.cost_usd
    self.total_duration_seconds += result.duration_seconds

    if not result.rate_limited and not result.execution_success:
        self.normal_attempts += 1
advance_fallback
advance_fallback(reason)

Advance to the next instrument in the fallback chain.

Records the transition, switches instrument_name, resets retry budget, and increments current_instrument_index.

Returns the new instrument name, or None if the chain is exhausted.

Source code in src/marianne/core/checkpoint.py
def advance_fallback(self, reason: str) -> str | None:
    """Advance to the next instrument in the fallback chain.

    Records the transition, switches instrument_name, resets retry
    budget, and increments current_instrument_index.

    Returns the new instrument name, or None if the chain is exhausted.
    """
    if not self.has_fallback_available:
        return None

    from_instrument = self.instrument_name or ""
    self.fallback_attempts[from_instrument] = self.normal_attempts

    to_instrument = self.fallback_chain[self.current_instrument_index]
    self.current_instrument_index += 1
    self.instrument_name = to_instrument

    # Fresh retry budget for the new instrument
    self.normal_attempts = 0
    self.completion_attempts = 0

    # Record the transition
    import datetime as _dt

    self.instrument_fallback_history.append({
        "from": from_instrument,
        "to": to_instrument,
        "reason": reason,
        "timestamp": _dt.datetime.now(tz=_dt.UTC).isoformat(),
    })

    # Trim to prevent unbounded growth
    if len(self.instrument_fallback_history) > MAX_INSTRUMENT_FALLBACK_HISTORY:
        self.instrument_fallback_history = (
            self.instrument_fallback_history[-MAX_INSTRUMENT_FALLBACK_HISTORY:]
        )

    return to_instrument
to_dict
to_dict()

Serialize to dict. Compatibility with dataclass SheetExecutionState.

Source code in src/marianne/core/checkpoint.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to dict. Compatibility with dataclass SheetExecutionState."""
    return self.model_dump(mode="json")
from_dict classmethod
from_dict(data)

Restore from dict. Compatibility with dataclass SheetExecutionState.

Source code in src/marianne/core/checkpoint.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> SheetState:
    """Restore from dict. Compatibility with dataclass SheetExecutionState."""
    return cls.model_validate(data)
capture_output
capture_output(stdout, stderr, max_bytes=MAX_OUTPUT_CAPTURE_BYTES)

Capture tail of stdout/stderr for debugging.

Stores the last max_bytes of each output stream. Sets output_truncated to True if either stream was larger than the limit.

Credential scanning (F-003): Before storing, both streams are scanned for API key patterns (sk-ant-, AKIA, AIzaSy, Bearer tokens) and matches are replaced with [REDACTED_] placeholders. This prevents leaked credentials from propagating to learning store, dashboard, diagnostics, and MCP resources.

Parameters:

Name Type Description Default
stdout str

Full stdout string from execution.

required
stderr str

Full stderr string from execution.

required
max_bytes int

Maximum bytes to capture per stream (default 10KB).

MAX_OUTPUT_CAPTURE_BYTES
Source code in src/marianne/core/checkpoint.py
def capture_output(
    self,
    stdout: str,
    stderr: str,
    max_bytes: int = MAX_OUTPUT_CAPTURE_BYTES,
) -> None:
    """Capture tail of stdout/stderr for debugging.

    Stores the last `max_bytes` of each output stream. Sets output_truncated
    to True if either stream was larger than the limit.

    Credential scanning (F-003): Before storing, both streams are scanned
    for API key patterns (sk-ant-*, AKIA*, AIzaSy*, Bearer tokens) and
    matches are replaced with [REDACTED_*] placeholders. This prevents
    leaked credentials from propagating to learning store, dashboard,
    diagnostics, and MCP resources.

    Args:
        stdout: Full stdout string from execution.
        stderr: Full stderr string from execution.
        max_bytes: Maximum bytes to capture per stream (default 10KB).
    """
    from marianne.utils.credential_scanner import redact_credentials

    # Convert to bytes to measure actual size, then slice
    stdout_bytes = stdout.encode("utf-8", errors="replace")
    stderr_bytes = stderr.encode("utf-8", errors="replace")

    stdout_truncated = len(stdout_bytes) > max_bytes
    stderr_truncated = len(stderr_bytes) > max_bytes

    # Capture tail (last N bytes) and decode back to string
    if stdout_truncated:
        self.stdout_tail = stdout_bytes[-max_bytes:].decode(
            "utf-8", errors="replace"
        )
    else:
        self.stdout_tail = stdout if stdout else None

    if stderr_truncated:
        self.stderr_tail = stderr_bytes[-max_bytes:].decode(
            "utf-8", errors="replace"
        )
    else:
        self.stderr_tail = stderr if stderr else None

    # Scan and redact credential patterns before storage (F-003)
    self.stdout_tail = redact_credentials(self.stdout_tail)
    self.stderr_tail = redact_credentials(self.stderr_tail)

    self.output_truncated = stdout_truncated or stderr_truncated
add_error_to_history
add_error_to_history(error)

Append an error record and enforce the history size limit.

All callers that add errors to error_history should use this method instead of appending directly so that the list never exceeds MAX_ERROR_HISTORY entries.

Parameters:

Name Type Description Default
error CheckpointErrorRecord

The error record to add.

required
Source code in src/marianne/core/checkpoint.py
def add_error_to_history(self, error: CheckpointErrorRecord) -> None:
    """Append an error record and enforce the history size limit.

    All callers that add errors to ``error_history`` should use this
    method instead of appending directly so that the list never exceeds
    ``MAX_ERROR_HISTORY`` entries.

    Args:
        error: The error record to add.
    """
    self.error_history.append(error)
    if len(self.error_history) > MAX_ERROR_HISTORY:
        self.error_history = self.error_history[-MAX_ERROR_HISTORY:]
add_fallback_to_history
add_fallback_to_history(record)

Append a fallback record and enforce the history size limit.

All callers that add entries to instrument_fallback_history should use this method instead of appending directly so the list never exceeds MAX_INSTRUMENT_FALLBACK_HISTORY entries.

Parameters:

Name Type Description Default
record dict[str, str]

Dict with keys from, to, reason, timestamp.

required
Source code in src/marianne/core/checkpoint.py
def add_fallback_to_history(self, record: dict[str, str]) -> None:
    """Append a fallback record and enforce the history size limit.

    All callers that add entries to ``instrument_fallback_history``
    should use this method instead of appending directly so the list
    never exceeds ``MAX_INSTRUMENT_FALLBACK_HISTORY`` entries.

    Args:
        record: Dict with keys from, to, reason, timestamp.
    """
    self.instrument_fallback_history.append(record)
    if len(self.instrument_fallback_history) > MAX_INSTRUMENT_FALLBACK_HISTORY:
        self.instrument_fallback_history = self.instrument_fallback_history[
            -MAX_INSTRUMENT_FALLBACK_HISTORY:
        ]

SheetStatus

Bases: str, Enum

Status of a single sheet.

The baton tracks 11 scheduling states. Status display and persistence use all 11. Consumers that only care about terminal/non-terminal can check is_terminal.

Attributes
is_terminal property
is_terminal

Whether this status represents a final state.

is_active property
is_active

Whether this status means the sheet is currently executing.

BackendConfig

Bases: BaseModel

Configuration for the execution backend.

Uses a flat structure with cross-field validation to ensure type-specific fields are only meaningful when the corresponding backend type is selected. The _validate_type_specific_fields validator warns when fields for an unselected backend are set to non-default values.

JobConfig

Bases: BaseModel

Complete configuration for an orchestration job.

Functions
to_yaml
to_yaml(*, exclude_defaults=False)

Serialize this JobConfig to valid score YAML.

The output is semantically equivalent to the original config: from_yaml_string(config.to_yaml()) produces an equivalent config (compared via model_dump()). String-level identity with the original YAML file is NOT guaranteed because workspace paths are resolved to absolute at parse time and fan-out configs are expanded.

Parameters:

Name Type Description Default
exclude_defaults bool

If True, omit fields that match their default values for cleaner output. Defaults to False (lossless).

False

Returns:

Type Description
str

A valid YAML string that from_yaml_string() can parse.

Source code in src/marianne/core/config/job.py
def to_yaml(self, *, exclude_defaults: bool = False) -> str:
    """Serialize this JobConfig to valid score YAML.

    The output is semantically equivalent to the original config:
    ``from_yaml_string(config.to_yaml())`` produces an equivalent config
    (compared via ``model_dump()``). String-level identity with the
    original YAML file is NOT guaranteed because workspace paths are
    resolved to absolute at parse time and fan-out configs are expanded.

    Args:
        exclude_defaults: If True, omit fields that match their default
            values for cleaner output. Defaults to False (lossless).

    Returns:
        A valid YAML string that ``from_yaml_string()`` can parse.
    """
    data = self.model_dump(
        mode="python",
        by_alias=True,
        exclude_defaults=exclude_defaults,
    )
    data = _prepare_for_yaml(data)
    return yaml.dump(
        data,
        default_flow_style=False,
        sort_keys=False,
        allow_unicode=True,
    )
from_yaml classmethod
from_yaml(path)

Load job configuration from a YAML file.

Source code in src/marianne/core/config/job.py
@classmethod
def from_yaml(cls, path: Path) -> JobConfig:
    """Load job configuration from a YAML file."""
    with open(path) as f:
        data = yaml.safe_load(f)
    if not isinstance(data, dict):
        raise ValueError(
            "The score file is empty or invalid. "
            "A Marianne score requires at minimum: name, sheet, and prompt sections. "
            "See 'mzt validate --help' or the score writing guide for examples."
        )
    # Pre-resolve relative workspace relative to the score file's parent
    # directory, not the current process CWD (#109).  This is critical when
    # the daemon loads a score whose path differs from the daemon's CWD.
    if "workspace" in data:
        ws = Path(str(data["workspace"]))
        if not ws.is_absolute():
            data["workspace"] = str((path.resolve().parent / ws).resolve())
    return cls.model_validate(data)
from_yaml_string classmethod
from_yaml_string(yaml_str)

Load job configuration from a YAML string.

Source code in src/marianne/core/config/job.py
@classmethod
def from_yaml_string(cls, yaml_str: str) -> JobConfig:
    """Load job configuration from a YAML string."""
    data = yaml.safe_load(yaml_str)
    if not isinstance(data, dict):
        raise ValueError(
            "The score content is empty or invalid. "
            "A Marianne score requires at minimum: name, sheet, and prompt sections."
        )
    return cls.model_validate(data)
get_state_path
get_state_path()

Get the resolved state path.

Source code in src/marianne/core/config/job.py
def get_state_path(self) -> Path:
    """Get the resolved state path."""
    if self.state_path:
        return self.state_path
    if self.state_backend == "json":
        return self.workspace / ".marianne-state.json"
    return self.workspace / ".marianne-state.db"
get_outcome_store_path
get_outcome_store_path()

Get the resolved outcome store path for learning.

Source code in src/marianne/core/config/job.py
def get_outcome_store_path(self) -> Path:
    """Get the resolved outcome store path for learning."""
    if self.learning.outcome_store_path:
        return self.learning.outcome_store_path
    if self.learning.outcome_store_type == "json":
        return self.workspace / ".marianne-outcomes.json"
    return self.workspace / ".marianne-outcomes.db"

NotificationConfig

Bases: BaseModel

Configuration for a notification channel.

PromptConfig

Bases: BaseModel

Configuration for prompt templating.

Functions
at_least_one_template
at_least_one_template()

Warn when no template source is provided (falls back to default prompt).

Source code in src/marianne/core/config/job.py
@model_validator(mode="after")
def at_least_one_template(self) -> PromptConfig:
    """Warn when no template source is provided (falls back to default prompt)."""
    if self.template is not None and self.template_file is not None:
        raise ValueError(
            "PromptConfig accepts 'template' or 'template_file', not both"
        )
    if self.template is None and self.template_file is None:
        warnings.warn(
            "PromptConfig has neither 'template' nor 'template_file'. "
            "The default preamble prompt will be used.",
            UserWarning,
            stacklevel=2,
        )
    return self

RateLimitConfig

Bases: BaseModel

Configuration for rate limit detection and handling.

RetryConfig

Bases: BaseModel

Configuration for retry behavior including partial completion recovery.

SheetConfig

Bases: BaseModel

Configuration for sheet processing.

In Marianne's musical theme, a composition is divided into sheets, each containing a portion of the work to be performed.

Fan-out support: When fan_out is specified, stages are expanded into concrete sheets at parse time. For example, total_items=7, fan_out={2: 3} produces 9 concrete sheets (stage 2 instantiated 3 times). After expansion, total_items and dependencies reflect expanded values, and fan_out is cleared to {} to prevent re-expansion on resume.

Attributes
total_sheets property
total_sheets

Calculate total number of sheets.

total_stages property
total_stages

Return the original stage count.

After fan-out expansion, total_items reflects expanded sheet count. total_stages preserves the original logical stage count from fan_out_stage_map. When no fan-out was used, total_stages == total_sheets (identity).

Functions
strip_computed_fields classmethod
strip_computed_fields(data)

Strip computed properties that users may include in YAML.

total_sheets is computed from size/total_items, not configurable. Accept it silently for backward compatibility — rejecting it would break existing scores that include it.

Source code in src/marianne/core/config/job.py
@model_validator(mode="before")
@classmethod
def strip_computed_fields(cls, data: Any) -> Any:
    """Strip computed properties that users may include in YAML.

    total_sheets is computed from size/total_items, not configurable.
    Accept it silently for backward compatibility — rejecting it would
    break existing scores that include it.
    """
    if isinstance(data, dict) and "total_sheets" in data:
        data.pop("total_sheets")
    return data
validate_per_sheet_instruments classmethod
validate_per_sheet_instruments(v)

Validate per-sheet instrument assignments.

Source code in src/marianne/core/config/job.py
@field_validator("per_sheet_instruments")
@classmethod
def validate_per_sheet_instruments(
    cls, v: dict[int, str],
) -> dict[int, str]:
    """Validate per-sheet instrument assignments."""
    for sheet_num, instrument in v.items():
        if not isinstance(sheet_num, int) or sheet_num < 1:
            raise ValueError(
                f"Per-sheet instrument key must be a positive integer, "
                f"got {sheet_num}"
            )
        if not instrument:
            raise ValueError(
                f"Per-sheet instrument name for sheet {sheet_num} "
                f"must not be empty"
            )
    return v
validate_per_sheet_fallbacks classmethod
validate_per_sheet_fallbacks(v)

Validate per-sheet fallback chain keys are positive integers.

Source code in src/marianne/core/config/job.py
@field_validator("per_sheet_fallbacks")
@classmethod
def validate_per_sheet_fallbacks(
    cls, v: dict[int, list[str]],
) -> dict[int, list[str]]:
    """Validate per-sheet fallback chain keys are positive integers."""
    for sheet_num in v:
        if not isinstance(sheet_num, int) or sheet_num < 1:
            raise ValueError(
                f"Per-sheet fallback key must be a positive integer, "
                f"got {sheet_num}"
            )
    return v
validate_instrument_map classmethod
validate_instrument_map(v)

Validate instrument_map: no duplicate sheets, valid names.

Source code in src/marianne/core/config/job.py
@field_validator("instrument_map")
@classmethod
def validate_instrument_map(
    cls, v: dict[str, list[int]],
) -> dict[str, list[int]]:
    """Validate instrument_map: no duplicate sheets, valid names."""
    seen_sheets: dict[int, str] = {}
    for instrument, sheets in v.items():
        if not instrument:
            raise ValueError(
                "Instrument name in instrument_map must not be empty"
            )
        for sheet_num in sheets:
            if not isinstance(sheet_num, int) or sheet_num < 1:
                raise ValueError(
                    f"Sheet number in instrument_map must be a positive "
                    f"integer, got {sheet_num} for instrument '{instrument}'"
                )
            if sheet_num in seen_sheets:
                raise ValueError(
                    f"Sheet {sheet_num} assigned to multiple instruments "
                    f"in instrument_map: '{seen_sheets[sheet_num]}' and "
                    f"'{instrument}'"
                )
            seen_sheets[sheet_num] = instrument
    return v
get_fan_out_metadata
get_fan_out_metadata(sheet_num)

Get fan-out metadata for a specific sheet.

Parameters:

Name Type Description Default
sheet_num int

Concrete sheet number (1-indexed).

required

Returns:

Type Description
FanOutMetadata

FanOutMetadata with stage, instance, and fan_count.

FanOutMetadata

When no fan-out is configured, returns identity metadata

FanOutMetadata

(stage=sheet_num, instance=1, fan_count=1).

Source code in src/marianne/core/config/job.py
def get_fan_out_metadata(self, sheet_num: int) -> FanOutMetadata:  # noqa: F821
    """Get fan-out metadata for a specific sheet.

    Args:
        sheet_num: Concrete sheet number (1-indexed).

    Returns:
        FanOutMetadata with stage, instance, and fan_count.
        When no fan-out is configured, returns identity metadata
        (stage=sheet_num, instance=1, fan_count=1).
    """
    from marianne.core.fan_out import FanOutMetadata

    if self.fan_out_stage_map and sheet_num in self.fan_out_stage_map:
        meta = self.fan_out_stage_map[sheet_num]
        return FanOutMetadata(
            stage=meta["stage"],
            instance=meta["instance"],
            fan_count=meta["fan_count"],
        )
    return FanOutMetadata(stage=sheet_num, instance=1, fan_count=1)
validate_fan_out classmethod
validate_fan_out(v)

Validate fan_out field values.

Source code in src/marianne/core/config/job.py
@field_validator("fan_out")
@classmethod
def validate_fan_out(cls, v: dict[int, int]) -> dict[int, int]:
    """Validate fan_out field values."""
    for stage, count in v.items():
        if not isinstance(stage, int) or stage < 1:
            raise ValueError(
                f"Fan-out stage must be positive integer, got {stage}"
            )
        if not isinstance(count, int) or count < 1:
            raise ValueError(
                f"Fan-out count for stage {stage} must be >= 1, got {count}"
            )
    return v
validate_dependencies classmethod
validate_dependencies(v, info)

Validate dependency declarations.

Note: Full validation (range checks, cycle detection) happens when the DependencyDAG is built at runtime, since total_sheets isn't available during field validation.

Source code in src/marianne/core/config/job.py
@field_validator("dependencies")
@classmethod
def validate_dependencies(
    cls, v: dict[int, list[int]], info: ValidationInfo
) -> dict[int, list[int]]:
    """Validate dependency declarations.

    Note: Full validation (range checks, cycle detection) happens when
    the DependencyDAG is built at runtime, since total_sheets isn't
    available during field validation.
    """
    for sheet_num, deps in v.items():
        if not isinstance(sheet_num, int) or sheet_num < 1:
            raise ValueError(f"Sheet number must be positive integer, got {sheet_num}")
        if not isinstance(deps, list):
            raise ValueError(f"Dependencies for sheet {sheet_num} must be a list")
        for dep in deps:
            if not isinstance(dep, int) or dep < 1:
                raise ValueError(
                    f"Dependency must be positive integer, got {dep} for sheet {sheet_num}"
                )
            if dep == sheet_num:
                raise ValueError(f"Sheet {sheet_num} cannot depend on itself")
    return v
expand_fan_out_config
expand_fan_out_config()

Expand fan_out declarations into concrete sheet assignments.

This runs after field validators. When fan_out is non-empty: 1. Validates constraints (size=1, start_item=1) 2. Calls expand_fan_out() to compute concrete sheet assignments 3. Overwrites total_items and dependencies with expanded values 4. Stores metadata in fan_out_stage_map for resume support 5. Clears fan_out={} to prevent re-expansion on resume

Source code in src/marianne/core/config/job.py
@model_validator(mode="after")
def expand_fan_out_config(self) -> SheetConfig:
    """Expand fan_out declarations into concrete sheet assignments.

    This runs after field validators. When fan_out is non-empty:
    1. Validates constraints (size=1, start_item=1)
    2. Calls expand_fan_out() to compute concrete sheet assignments
    3. Overwrites total_items and dependencies with expanded values
    4. Stores metadata in fan_out_stage_map for resume support
    5. Clears fan_out={} to prevent re-expansion on resume
    """
    if not self.fan_out:
        return self

    # Enforce constraints for fan-out
    if self.size != 1:
        raise ValueError(
            f"fan_out requires size=1, got size={self.size}. "
            "Each stage must map to exactly one sheet for fan-out to work."
        )
    if self.start_item != 1:
        raise ValueError(
            f"fan_out requires start_item=1, got start_item={self.start_item}. "
            "Fan-out stages are 1-indexed from the beginning."
        )

    from marianne.core.fan_out import expand_fan_out

    expansion = expand_fan_out(
        total_stages=self.total_items,
        fan_out=self.fan_out,
        stage_dependencies=self.dependencies,
    )

    # Overwrite with expanded values
    self.total_items = expansion.total_sheets
    self.dependencies = expansion.expanded_dependencies

    # Expand skip_when: stage-keyed → sheet-keyed
    if self.skip_when:
        expanded_skip_when: dict[int, str] = {}
        for stage, expr in self.skip_when.items():
            for sheet_num in expansion.stage_sheets.get(stage, [stage]):
                expanded_skip_when[sheet_num] = expr
        self.skip_when = expanded_skip_when

    # Expand skip_when_command: stage-keyed → sheet-keyed
    if self.skip_when_command:
        expanded_skip_when_command: dict[int, SkipWhenCommand] = {}
        for stage, cmd in self.skip_when_command.items():
            for sheet_num in expansion.stage_sheets.get(stage, [stage]):
                expanded_skip_when_command[sheet_num] = cmd
        self.skip_when_command = expanded_skip_when_command

    # Store serializable metadata for resume
    self.fan_out_stage_map = {
        sheet_num: {
            "stage": meta.stage,
            "instance": meta.instance,
            "fan_count": meta.fan_count,
        }
        for sheet_num, meta in expansion.sheet_metadata.items()
    }

    # Clear fan_out to prevent re-expansion on resume
    self.fan_out = {}

    return self
validate_dependency_range
validate_dependency_range()

Validate that dependency sheet numbers are within the valid range.

Runs after fan-out expansion so total_sheets reflects the final count.

Source code in src/marianne/core/config/job.py
@model_validator(mode="after")
def validate_dependency_range(self) -> SheetConfig:
    """Validate that dependency sheet numbers are within the valid range.

    Runs after fan-out expansion so total_sheets reflects the final count.
    """
    if not self.dependencies:
        return self
    max_sheet = self.total_sheets
    for sheet_num, deps in self.dependencies.items():
        if sheet_num < 1 or sheet_num > max_sheet:
            raise ValueError(
                f"Dependency key sheet {sheet_num} is out of range "
                f"(valid: 1-{max_sheet})"
            )
        for dep in deps:
            if dep < 1 or dep > max_sheet:
                raise ValueError(
                    f"Sheet {sheet_num} depends on sheet {dep}, "
                    f"which is out of range (valid: 1-{max_sheet})"
                )
    return self

ValidationRule

Bases: BaseModel

A single validation rule for checking sheet outputs.

Supports staged execution via the stage field. Validations are run in stage order (1, 2, 3...). If any validation in a stage fails, higher stages are skipped (fail-fast behavior).

Typical stage layout: - Stage 1: Syntax & compilation (cargo check, cargo fmt --check) - Stage 2: Testing (cargo test, pytest) - Stage 3: Code quality (clippy -D warnings, ruff check) - Stage 4: Security (cargo audit, npm audit)

ClassifiedError dataclass

ClassifiedError(category, message, error_code=UNKNOWN, original_error=None, exit_code=None, exit_signal=None, exit_reason=None, retriable=True, suggested_wait_seconds=None, error_info=None)

An error with its classification and metadata.

ClassifiedError combines high-level category (for retry logic) with specific error codes (for diagnostics and logging). The error_code provides stable identifiers for programmatic handling while category determines retry behavior.

Attributes
error_info class-attribute instance-attribute
error_info = None

Optional structured metadata for this error.

is_signal_kill property
is_signal_kill

True if process was killed by a signal.

signal_name property
signal_name

Human-readable signal name if killed by signal.

code property
code

Get the error code string value (e.g., 'E001').

severity property
severity

Get the severity level for this error.

ErrorCategory

Bases: str, Enum

Categories of errors with different retry behaviors.

Attributes
RATE_LIMIT class-attribute instance-attribute
RATE_LIMIT = 'rate_limit'

Retriable with long wait - API/service is rate limiting.

TRANSIENT class-attribute instance-attribute
TRANSIENT = 'transient'

Retriable with backoff - temporary network/service issues.

VALIDATION class-attribute instance-attribute
VALIDATION = 'validation'

Retriable - Claude ran but didn't produce expected output.

AUTH class-attribute instance-attribute
AUTH = 'auth'

Fatal - authentication/authorization failure, needs user intervention.

NETWORK class-attribute instance-attribute
NETWORK = 'network'

Retriable with backoff - network connectivity issues.

TIMEOUT class-attribute instance-attribute
TIMEOUT = 'timeout'

Retriable - operation timed out.

SIGNAL class-attribute instance-attribute
SIGNAL = 'signal'

Process killed by signal - may be retriable depending on signal.

FATAL class-attribute instance-attribute
FATAL = 'fatal'

Non-retriable - stop job immediately.

CONFIGURATION class-attribute instance-attribute
CONFIGURATION = 'configuration'

Non-retriable - configuration error needs user intervention (e.g., MCP setup).

PREFLIGHT class-attribute instance-attribute
PREFLIGHT = 'preflight'

Pre-execution check failure — config or environment not ready.

ESCALATION class-attribute instance-attribute
ESCALATION = 'escalation'

Escalation-based abort — grounding or escalation policy triggered.

ErrorClassifier

ErrorClassifier(rate_limit_patterns=None, auth_patterns=None, network_patterns=None)

Classifies errors based on patterns and exit codes.

Pattern matching follows the approach from run-sheet-review.sh which checks output for rate limit indicators.

Initialize classifier with detection patterns.

Parameters:

Name Type Description Default
rate_limit_patterns list[str] | None

Regex patterns indicating rate limiting

None
auth_patterns list[str] | None

Regex patterns indicating auth failures

None
network_patterns list[str] | None

Regex patterns indicating network issues

None
Source code in src/marianne/core/errors/classifier.py
def __init__(
    self,
    rate_limit_patterns: list[str] | None = None,
    auth_patterns: list[str] | None = None,
    network_patterns: list[str] | None = None,
):
    """Initialize classifier with detection patterns.

    Args:
        rate_limit_patterns: Regex patterns indicating rate limiting
        auth_patterns: Regex patterns indicating auth failures
        network_patterns: Regex patterns indicating network issues
    """
    self.rate_limit_patterns = _compile_patterns(
        rate_limit_patterns or _DEFAULT_RATE_LIMIT_PATTERNS
    )
    self.auth_patterns = _compile_patterns(auth_patterns or _DEFAULT_AUTH_PATTERNS)
    self.network_patterns = _compile_patterns(network_patterns or _DEFAULT_NETWORK_PATTERNS)
    self.dns_patterns = _compile_patterns(_DEFAULT_DNS_PATTERNS)
    self.ssl_patterns = _compile_patterns(_DEFAULT_SSL_PATTERNS)
    self.capacity_patterns = _compile_patterns(_DEFAULT_CAPACITY_PATTERNS)
    self.quota_exhaustion_patterns = _compile_patterns(_DEFAULT_QUOTA_EXHAUSTION_PATTERNS)
    self.reset_time_patterns = _compile_patterns(_DEFAULT_RESET_TIME_PATTERNS)
    self.mcp_patterns = _compile_patterns(_DEFAULT_MCP_PATTERNS)
    self.cli_mode_patterns = _compile_patterns(_DEFAULT_CLI_MODE_PATTERNS)
    self.enoent_patterns = _compile_patterns(_DEFAULT_ENOENT_PATTERNS)
    self.stale_patterns = _compile_patterns(_DEFAULT_STALE_PATTERNS)

    # Pre-computed combined regex patterns for _matches_any().
    # Each pattern list is merged into a single alternation regex so that
    # matching is a single .search() call per category.
    self._combined_cache: dict[int, re.Pattern[str]] = {}
    for attr_name in (
        "rate_limit_patterns", "auth_patterns", "network_patterns",
        "dns_patterns", "ssl_patterns", "capacity_patterns",
        "quota_exhaustion_patterns", "mcp_patterns",
        "cli_mode_patterns", "enoent_patterns", "stale_patterns",
    ):
        patterns = getattr(self, attr_name)
        if patterns:
            alternation = "|".join(f"(?:{p.pattern})" for p in patterns)
            self._combined_cache[id(patterns)] = re.compile(
                alternation, re.IGNORECASE
            )
Functions
parse_reset_time
parse_reset_time(text)

Parse reset time from message and return seconds until reset.

Supports patterns like: - "resets at 9pm" -> seconds until 9pm (or next day if past) - "resets at 21:00" -> seconds until 21:00 - "resets in 3 hours" -> 3 * 3600 seconds - "resets in 30 minutes" -> 30 * 60 seconds

Parameters:

Name Type Description Default
text str

Error message that may contain reset time info.

required

Returns:

Type Description
float | None

Seconds until reset, or None if no reset time found.

float | None

Returns minimum of RESET_TIME_MINIMUM_WAIT_SECONDS to avoid immediate retries.

Source code in src/marianne/core/errors/classifier.py
def parse_reset_time(self, text: str) -> float | None:
    """Parse reset time from message and return seconds until reset.

    Supports patterns like:
    - "resets at 9pm" -> seconds until 9pm (or next day if past)
    - "resets at 21:00" -> seconds until 21:00
    - "resets in 3 hours" -> 3 * 3600 seconds
    - "resets in 30 minutes" -> 30 * 60 seconds

    Args:
        text: Error message that may contain reset time info.

    Returns:
        Seconds until reset, or None if no reset time found.
        Returns minimum of RESET_TIME_MINIMUM_WAIT_SECONDS to avoid immediate retries.
    """

    for pattern in self.reset_time_patterns:
        match = pattern.search(text)
        if not match:
            continue

        groups = match.groups()

        # Pattern: "resets in X hours/minutes"
        if (
            len(groups) == 2
            and groups[1]
            and groups[1].lower() in ("hour", "hr", "minute", "min")
        ):
            amount = int(groups[0])
            unit = groups[1].lower()
            seconds: float = amount * 3600 if unit in ("hour", "hr") else amount * 60
            return self._clamp_wait(seconds)

        # Pattern: "resets at X:XX" (24-hour time)
        if len(groups) == 2 and groups[1] and groups[1].isdigit():
            hour = int(groups[0])
            minute = int(groups[1])
            now = datetime.now()
            reset_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
            if reset_time <= now:
                reset_time += timedelta(days=1)  # Next day
            seconds = (reset_time - now).total_seconds()
            return self._clamp_wait(seconds)

        # Pattern: "resets at Xpm/Xam"
        if len(groups) == 2 and groups[1] and groups[1].lower() in ("am", "pm"):
            hour = int(groups[0])
            meridiem = groups[1].lower()
            if meridiem == "pm" and hour != 12:
                hour += 12
            elif meridiem == "am" and hour == 12:
                hour = 0
            now = datetime.now()
            reset_time = now.replace(hour=hour, minute=0, second=0, microsecond=0)
            if reset_time <= now:
                reset_time += timedelta(days=1)  # Next day
            seconds = (reset_time - now).total_seconds()
            return self._clamp_wait(seconds)

    # No pattern matched, return default wait
    return None
extract_rate_limit_wait
extract_rate_limit_wait(text)

Extract wait duration from rate limit error text.

Supports common patterns from Anthropic, Claude Code, and generic APIs: - "retry after N seconds/minutes/hours" - "try again in N seconds/minutes/hours" - "wait N seconds/minutes/hours" - "Retry-After: N" (header value) - "resets in N hours/minutes" (delegates to parse_reset_time)

Parameters:

Name Type Description Default
text str

Error message or combined stdout/stderr.

required

Returns:

Type Description
float | None

Seconds to wait, clamped to [MIN, MAX], or None if no pattern matches.

Source code in src/marianne/core/errors/classifier.py
def extract_rate_limit_wait(self, text: str) -> float | None:
    """Extract wait duration from rate limit error text.

    Supports common patterns from Anthropic, Claude Code, and generic APIs:
    - "retry after N seconds/minutes/hours"
    - "try again in N seconds/minutes/hours"
    - "wait N seconds/minutes/hours"
    - "Retry-After: N" (header value)
    - "resets in N hours/minutes" (delegates to parse_reset_time)

    Args:
        text: Error message or combined stdout/stderr.

    Returns:
        Seconds to wait, clamped to [MIN, MAX], or None if no pattern matches.
    """
    if not text:
        return None

    # Defer to existing parse_reset_time for "resets in/at" patterns
    reset_time = self.parse_reset_time(text)
    if reset_time is not None:
        return reset_time

    import re as _re

    patterns: list[tuple[_re.Pattern[str], float]] = [
        # "retry after N seconds/second"
        (_re.compile(r"retry\s+after\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "retry after N minutes/minute"
        (_re.compile(r"retry\s+after\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "retry after N hours/hour"
        (_re.compile(r"retry\s+after\s+(\d+)\s*h(?:ours?)?", _re.IGNORECASE), 3600.0),
        # "try again in N seconds"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "try again in N minutes"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "try again in N hours"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*h(?:ours?)?", _re.IGNORECASE), 3600.0),
        # "wait N seconds"
        (_re.compile(r"wait\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "wait N minutes"
        (_re.compile(r"wait\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "Retry-After: N" (header value, always seconds)
        (_re.compile(r"[Rr]etry-?After\s*:\s*(\d+)", _re.IGNORECASE), 1.0),
    ]

    for pattern, multiplier in patterns:
        match = pattern.search(text)
        if match:
            value = int(match.group(1))
            return self._clamp_wait(value * multiplier)

    return None
classify
classify(stdout='', stderr='', exit_code=None, exit_signal=None, exit_reason=None, exception=None, output_format=None)

Classify an error based on output, exit code, and signal.

Delegates to sub-classifiers in priority order: 1. Signal-based exits (_classify_signal) 2. Timeout exit reason 3. Pattern-matching on output (_classify_by_pattern) 4. Exit code analysis (_classify_by_exit_code) 5. Unknown fallback

Parameters:

Name Type Description Default
stdout str

Standard output from the command

''
stderr str

Standard error from the command

''
exit_code int | None

Process exit code (0 = success), None if killed by signal

None
exit_signal int | None

Signal number if killed by signal

None
exit_reason ExitReason | None

Why execution ended (completed, timeout, killed, error)

None
exception Exception | None

Optional exception that was raised

None
output_format str | None

Backend output format ("text", "json", "stream-json"). When "text", exit code 1 is classified as E209 (validation) instead of E009 (unknown).

None

Returns:

Type Description
ClassifiedError

ClassifiedError with category, error_code, and metadata

Source code in src/marianne/core/errors/classifier.py
def classify(
    self,
    stdout: str = "",
    stderr: str = "",
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    exception: Exception | None = None,
    output_format: str | None = None,
) -> ClassifiedError:
    """Classify an error based on output, exit code, and signal.

    Delegates to sub-classifiers in priority order:
    1. Signal-based exits (_classify_signal)
    2. Timeout exit reason
    3. Pattern-matching on output (_classify_by_pattern)
    4. Exit code analysis (_classify_by_exit_code)
    5. Unknown fallback

    Args:
        stdout: Standard output from the command
        stderr: Standard error from the command
        exit_code: Process exit code (0 = success), None if killed by signal
        exit_signal: Signal number if killed by signal
        exit_reason: Why execution ended (completed, timeout, killed, error)
        exception: Optional exception that was raised
        output_format: Backend output format ("text", "json", "stream-json").
            When "text", exit code 1 is classified as E209 (validation)
            instead of E009 (unknown).

    Returns:
        ClassifiedError with category, error_code, and metadata
    """
    combined = f"{stdout}\n{stderr}"
    if exception:
        combined += f"\n{str(exception)}"

    # 0. Negative exit codes indicate signal kills (e.g., -9 = SIGKILL)
    # Python's subprocess reports killed-by-signal as negative exit codes.
    if exit_code is not None and exit_code < 0:
        exit_signal = abs(exit_code)

    # 1. Signal-based exits
    if exit_signal is not None:
        result = self._classify_signal(
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            stdout=stdout,
            stderr=stderr,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            retriable=result.retriable,
            suggested_wait=result.suggested_wait_seconds,
            message=result.message,
        )
        return result

    # 2. Timeout exit reason (even without signal)
    #    Differentiate stale detection (E006) from backend timeout (E001).
    #    Stale detection writes "Stale execution:" to stderr — F-097.
    if exit_reason == "timeout":
        is_stale = "stale execution" in combined.lower()
        error_code = (
            ErrorCode.EXECUTION_STALE if is_stale
            else ErrorCode.EXECUTION_TIMEOUT
        )
        message = (
            "Stale execution detected — no output activity"
            if is_stale
            else "Command timed out"
        )
        wait_seconds = 120.0 if is_stale else 60.0
        result = ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=message,
            error_code=error_code,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=result.retriable,
            message=result.message,
        )
        return result

    # 3. Pattern-matching on output text
    pattern_result = self._classify_by_pattern(
        combined, exit_code, exit_reason, exception,
    )
    if pattern_result is not None:
        return pattern_result

    # 4. Exit code analysis (with output for non-transient detection)
    exit_code_result = self._classify_by_exit_code(
        exit_code, exit_reason, exception, combined, output_format,
    )
    if exit_code_result is not None:
        return exit_code_result

    # 5. exit_code=None: process killed or disappeared without exit code.
    # Always retriable — this is never a deterministic user error.
    # Check stderr for OOM indicators to set appropriate wait time.
    if exit_code is None:
        # OOM/kill indicators in stderr → longer wait (memory needs to free)
        oom_indicators = ("killed", "out of memory", "oom", "cannot allocate")
        stderr_lower = stderr.lower() if stderr else ""
        is_oom = any(indicator in stderr_lower for indicator in oom_indicators)
        wait_seconds = 60.0 if is_oom else 10.0
        message = (
            "Process killed (possible OOM — retrying with longer wait)"
            if is_oom
            else "Process exited without exit code (possible signal race — retrying)"
        )

        result = ClassifiedError(
            category=ErrorCategory.TRANSIENT,
            message=message,
            error_code=ErrorCode.UNKNOWN,
            original_error=exception,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_code=exit_code,
            retriable=result.retriable,
            is_oom=is_oom,
            message=result.message,
        )
        return result

    # 6. Unknown fallback
    result = ClassifiedError(
        category=ErrorCategory.FATAL,
        message=f"Unknown error (exit_code={exit_code})",
        error_code=ErrorCode.UNKNOWN,
        original_error=exception,
        exit_code=exit_code,
        exit_signal=None,
        exit_reason=exit_reason,
        retriable=False,
    )
    _logger.warning(
        _EVT_ERROR_CLASSIFIED,
        category=result.category.value,
        error_code=result.error_code.value,
        exit_code=exit_code,
        retriable=result.retriable,
        message=result.message,
    )
    return result
classify_execution
classify_execution(stdout='', stderr='', exit_code=None, exit_signal=None, exit_reason=None, exception=None, output_format=None, *, input=None)

Classify execution errors using structured JSON parsing with fallback.

This is the new multi-error classification method that: 1. Parses structured JSON errors[] from CLI output (if present) 2. Classifies each error independently (no short-circuiting) 3. Analyzes exit code and signal for additional context 4. Selects root cause using priority-based scoring 5. Returns all errors with primary/secondary designation

This method returns ClassificationResult which provides access to all detected errors while maintaining backward compatibility through the primary attribute.

Supports two calling conventions
  1. Keyword args (legacy): classify_execution(stdout=..., stderr=..., ...)
  2. Bundled (preferred): classify_execution(input=ClassificationInput(...))

When input is supplied, its fields take precedence over individual keyword arguments.

Parameters:

Name Type Description Default
stdout str

Standard output from the command (may contain JSON).

''
stderr str

Standard error from the command.

''
exit_code int | None

Process exit code (0 = success), None if killed by signal.

None
exit_signal int | None

Signal number if killed by signal.

None
exit_reason ExitReason | None

Why execution ended (completed, timeout, killed, error).

None
exception Exception | None

Optional exception that was raised.

None
output_format str | None

Expected output format (e.g. "json").

None
input ClassificationInput | None

Bundled classification input (preferred over individual kwargs).

None

Returns:

Type Description
ClassificationResult

ClassificationResult with primary error, secondary errors, and metadata.

Example
result = classifier.classify_execution(stdout, stderr, exit_code)

# Access primary (root cause) error
if result.primary.category == ErrorCategory.RATE_LIMIT:
    wait_time = result.primary.suggested_wait_seconds

# Access all errors for debugging
for error in result.all_errors:
    logger.info(f"{error.error_code.value}: {error.message}")
Source code in src/marianne/core/errors/classifier.py
def classify_execution(
    self,
    stdout: str = "",
    stderr: str = "",
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    exception: Exception | None = None,
    output_format: str | None = None,
    *,
    input: ClassificationInput | None = None,
) -> ClassificationResult:
    """Classify execution errors using structured JSON parsing with fallback.

    This is the new multi-error classification method that:
    1. Parses structured JSON errors[] from CLI output (if present)
    2. Classifies each error independently (no short-circuiting)
    3. Analyzes exit code and signal for additional context
    4. Selects root cause using priority-based scoring
    5. Returns all errors with primary/secondary designation

    This method returns ClassificationResult which provides access to
    all detected errors while maintaining backward compatibility through
    the `primary` attribute.

    Supports two calling conventions:
        1. **Keyword args** (legacy): ``classify_execution(stdout=..., stderr=..., ...)``
        2. **Bundled** (preferred): ``classify_execution(input=ClassificationInput(...))``

    When *input* is supplied, its fields take precedence over
    individual keyword arguments.

    Args:
        stdout: Standard output from the command (may contain JSON).
        stderr: Standard error from the command.
        exit_code: Process exit code (0 = success), None if killed by signal.
        exit_signal: Signal number if killed by signal.
        exit_reason: Why execution ended (completed, timeout, killed, error).
        exception: Optional exception that was raised.
        output_format: Expected output format (e.g. "json").
        input: Bundled classification input (preferred over individual kwargs).

    Returns:
        ClassificationResult with primary error, secondary errors, and metadata.

    Example:
        ```python
        result = classifier.classify_execution(stdout, stderr, exit_code)

        # Access primary (root cause) error
        if result.primary.category == ErrorCategory.RATE_LIMIT:
            wait_time = result.primary.suggested_wait_seconds

        # Access all errors for debugging
        for error in result.all_errors:
            logger.info(f"{error.error_code.value}: {error.message}")
        ```
    """
    if input is not None:
        stdout = input.stdout
        stderr = input.stderr
        exit_code = input.exit_code
        exit_signal = input.exit_signal
        exit_reason = input.exit_reason
        exception = input.exception
        output_format = input.output_format
    all_errors: list[ClassifiedError] = []
    raw_errors: list[ParsedCliError] = []
    classification_method = "structured"

    # === PHASE 1: Parse Structured JSON ===
    # Pass both stdout and stderr - errors can appear in either stream
    json_errors = try_parse_json_errors(stdout, stderr)
    raw_errors = json_errors

    if json_errors:
        for parsed_error in json_errors:
            classified = classify_single_json_error(
                parsed_error,
                exit_code=exit_code,
                exit_reason=exit_reason,
            )
            all_errors.append(classified)

    # === PHASE 2: Exit Code / Signal Analysis ===
    if exit_signal is not None:
        signal_error = self._classify_signal(
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            stdout=stdout,
            stderr=stderr,
        )
        # Only add if not duplicating an existing error code
        if not any(e.error_code == signal_error.error_code for e in all_errors):
            all_errors.append(signal_error)
            if not json_errors:
                classification_method = "exit_code"

    elif exit_reason == "timeout":
        # Differentiate stale detection (E006) from backend timeout (E001).
        # Stale detection writes "Stale execution:" to stderr — F-097.
        combined_for_stale = f"{stdout}\n{stderr}".lower()
        is_stale = "stale execution" in combined_for_stale
        timeout_code = (
            ErrorCode.EXECUTION_STALE if is_stale
            else ErrorCode.EXECUTION_TIMEOUT
        )
        timeout_message = (
            "Stale execution detected — no output activity"
            if is_stale
            else "Command timed out"
        )
        timeout_wait = 120.0 if is_stale else 60.0
        timeout_error = ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=timeout_message,
            error_code=timeout_code,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=timeout_wait,
        )
        if not any(e.error_code == timeout_code for e in all_errors):
            all_errors.append(timeout_error)
            if not json_errors:
                classification_method = "exit_code"

    elif exit_code is None and json_errors:
        # Process killed or disappeared without exit code, AND Phase 1
        # found JSON errors from partial output. Add a process-killed
        # error so select_root_cause can weigh it against JSON errors.
        # Without this, exit_code=None context is lost when Phase 1
        # finds errors (Phase 4 regex fallback is skipped).
        # When no JSON errors exist, Phase 4 calls classify() which
        # already handles exit_code=None correctly.
        stderr_lower = stderr.lower() if stderr else ""
        oom_indicators = ("killed", "out of memory", "oom", "cannot allocate")
        is_oom = any(indicator in stderr_lower for indicator in oom_indicators)
        wait_seconds = 60.0 if is_oom else 10.0
        message = (
            "Process killed (possible OOM — retrying with longer wait)"
            if is_oom
            else "Process exited without exit code "
            "(possible signal race — retrying)"
        )
        process_killed_error = ClassifiedError(
            category=ErrorCategory.TRANSIENT,
            message=message,
            error_code=ErrorCode.UNKNOWN,
            original_error=exception,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        all_errors.append(process_killed_error)

    # === PHASE 3: Exception Analysis ===
    if exception is not None:
        exc_str = str(exception).lower()
        # Try to classify based on exception message
        if "timeout" in exc_str:
            exc_error = ClassifiedError(
                category=ErrorCategory.TIMEOUT,
                message=str(exception),
                error_code=ErrorCode.EXECUTION_TIMEOUT,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=60.0,
            )
        elif "connection" in exc_str or "network" in exc_str:
            exc_error = ClassifiedError(
                category=ErrorCategory.NETWORK,
                message=str(exception),
                error_code=ErrorCode.NETWORK_CONNECTION_FAILED,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=30.0,
            )
        else:
            exc_error = ClassifiedError(
                category=ErrorCategory.TRANSIENT,
                message=str(exception),
                error_code=ErrorCode.UNKNOWN,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=30.0,
            )
        # Only add if we don't have the same error code already
        if not any(e.error_code == exc_error.error_code for e in all_errors):
            all_errors.append(exc_error)

    # === PHASE 4: Regex Fallback (only if no structured errors) ===
    if not all_errors:
        classification_method = "regex_fallback"
        fallback_error = self.classify(
            stdout=stdout,
            stderr=stderr,
            exit_code=exit_code,
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            output_format=output_format,
        )
        all_errors.append(fallback_error)

    # === PHASE 4.5: Rate Limit Override (always runs) ===
    # Rate limits in stdout/stderr must never be missed, even when Phase 1
    # found structured JSON errors that masked them. F-098: Claude CLI rate
    # limit messages appear in stdout but Phase 1 may produce generic errors
    # that prevent Phase 4 from firing.
    has_rate_limit_error = any(
        e.category == ErrorCategory.RATE_LIMIT for e in all_errors
    )
    if not has_rate_limit_error:
        combined_for_rate_limit = f"{stdout}\n{stderr}"
        if self._matches_any(combined_for_rate_limit, self.rate_limit_patterns):
            # Check for quota exhaustion (more specific) first
            if self._matches_any(
                combined_for_rate_limit, self.quota_exhaustion_patterns
            ):
                wait_seconds = (
                    self.parse_reset_time(combined_for_rate_limit)
                    or DEFAULT_QUOTA_WAIT_SECONDS
                )
                rate_limit_error = ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message="Token quota exhausted — detected in output",
                    error_code=ErrorCode.QUOTA_EXHAUSTED,
                    original_error=exception,
                    exit_code=exit_code,
                    exit_signal=None,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=wait_seconds,
                )
            else:
                error_code = (
                    ErrorCode.CAPACITY_EXCEEDED
                    if self._matches_any(
                        combined_for_rate_limit, self.capacity_patterns
                    )
                    else ErrorCode.RATE_LIMIT_API
                )
                rate_limit_error = ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message="Rate limit detected in output",
                    error_code=error_code,
                    original_error=exception,
                    exit_code=exit_code,
                    exit_signal=None,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=DEFAULT_RATE_LIMIT_WAIT_SECONDS,
                )
            all_errors.append(rate_limit_error)
            _logger.warning(
                "rate_limit_override",
                component="errors",
                error_code=rate_limit_error.error_code.value,
                message="Rate limit detected via Phase 4.5 override",
            )

    # === PHASE 5: Root Cause Selection ===
    root_cause, symptoms, confidence = select_root_cause(all_errors)

    # Log the classification result
    _logger.info(
        "execution_classified",
        method=classification_method,
        primary_code=root_cause.error_code.value,
        error_count=len(all_errors),
        confidence=confidence,
        all_codes=[e.error_code.value for e in all_errors],
    )

    return ClassificationResult(
        primary=root_cause,
        secondary=symptoms,
        raw_errors=raw_errors,
        confidence=confidence,
        classification_method=classification_method,
    )
from_config classmethod
from_config(rate_limit_patterns)

Create classifier from config patterns.

Source code in src/marianne/core/errors/classifier.py
@classmethod
def from_config(cls, rate_limit_patterns: list[str]) -> ErrorClassifier:
    """Create classifier from config patterns."""
    return cls(rate_limit_patterns=rate_limit_patterns)

ErrorCode

Bases: str, Enum

Structured error codes for comprehensive error classification.

Error codes are organized by category using numeric prefixes: - E0xx: Execution errors (timeouts, crashes, kills) - E1xx: Rate limit / capacity errors - E2xx: Validation errors - E3xx: Configuration errors - E4xx: State errors - E5xx: Backend errors - E6xx: Preflight errors

Error codes are stable identifiers that can be used for: - Programmatic error handling and routing - Log aggregation and alerting - Documentation and troubleshooting guides - Metrics and observability dashboards

Attributes
EXECUTION_TIMEOUT class-attribute instance-attribute
EXECUTION_TIMEOUT = 'E001'

Command execution exceeded timeout limit.

EXECUTION_KILLED class-attribute instance-attribute
EXECUTION_KILLED = 'E002'

Process was killed by a signal (external termination).

EXECUTION_CRASHED class-attribute instance-attribute
EXECUTION_CRASHED = 'E003'

Process crashed (segfault, bus error, abort, etc.).

EXECUTION_INTERRUPTED class-attribute instance-attribute
EXECUTION_INTERRUPTED = 'E004'

Process was interrupted by user (SIGINT/Ctrl+C).

EXECUTION_OOM class-attribute instance-attribute
EXECUTION_OOM = 'E005'

Process was killed due to out of memory condition.

EXECUTION_STALE class-attribute instance-attribute
EXECUTION_STALE = 'E006'

Execution killed by stale detection — no output for too long.

EXECUTION_UNKNOWN class-attribute instance-attribute
EXECUTION_UNKNOWN = 'E009'

Unknown execution error with non-zero exit code.

RATE_LIMIT_API class-attribute instance-attribute
RATE_LIMIT_API = 'E101'

API rate limit exceeded (429, quota, throttling).

RATE_LIMIT_CLI class-attribute instance-attribute
RATE_LIMIT_CLI = 'E102'

CLI-level rate limiting detected.

CAPACITY_EXCEEDED class-attribute instance-attribute
CAPACITY_EXCEEDED = 'E103'

Service capacity exceeded (overloaded, try again later).

QUOTA_EXHAUSTED class-attribute instance-attribute
QUOTA_EXHAUSTED = 'E104'

Token/usage quota exhausted - wait until reset time.

VALIDATION_FILE_MISSING class-attribute instance-attribute
VALIDATION_FILE_MISSING = 'E201'

Expected output file does not exist.

VALIDATION_CONTENT_MISMATCH class-attribute instance-attribute
VALIDATION_CONTENT_MISMATCH = 'E202'

Output content does not match expected pattern.

VALIDATION_COMMAND_FAILED class-attribute instance-attribute
VALIDATION_COMMAND_FAILED = 'E203'

Validation command returned non-zero exit code.

VALIDATION_TIMEOUT class-attribute instance-attribute
VALIDATION_TIMEOUT = 'E204'

Validation check timed out.

VALIDATION_GENERIC class-attribute instance-attribute
VALIDATION_GENERIC = 'E209'

Generic validation failure (output validation needed).

CONFIG_INVALID class-attribute instance-attribute
CONFIG_INVALID = 'E301'

Configuration file is malformed or invalid.

CONFIG_MISSING_FIELD class-attribute instance-attribute
CONFIG_MISSING_FIELD = 'E302'

Required configuration field is missing.

CONFIG_PATH_NOT_FOUND class-attribute instance-attribute
CONFIG_PATH_NOT_FOUND = 'E303'

Configuration file path does not exist.

CONFIG_PARSE_ERROR class-attribute instance-attribute
CONFIG_PARSE_ERROR = 'E304'

Failed to parse configuration file (YAML/JSON syntax error).

CONFIG_MCP_ERROR class-attribute instance-attribute
CONFIG_MCP_ERROR = 'E305'

MCP server/plugin configuration error (missing env vars, invalid config).

CONFIG_CLI_MODE_ERROR class-attribute instance-attribute
CONFIG_CLI_MODE_ERROR = 'E306'

Claude CLI mode mismatch (e.g., streaming mode incompatible with operation).

STATE_CORRUPTION class-attribute instance-attribute
STATE_CORRUPTION = 'E401'

Checkpoint state file is corrupted or inconsistent.

STATE_LOAD_FAILED class-attribute instance-attribute
STATE_LOAD_FAILED = 'E402'

Failed to load checkpoint state from storage.

STATE_SAVE_FAILED class-attribute instance-attribute
STATE_SAVE_FAILED = 'E403'

Failed to save checkpoint state to storage.

STATE_VERSION_MISMATCH class-attribute instance-attribute
STATE_VERSION_MISMATCH = 'E404'

Checkpoint state version is incompatible.

BACKEND_CONNECTION class-attribute instance-attribute
BACKEND_CONNECTION = 'E501'

Failed to connect to backend service.

BACKEND_AUTH class-attribute instance-attribute
BACKEND_AUTH = 'E502'

Backend authentication or authorization failed.

BACKEND_RESPONSE class-attribute instance-attribute
BACKEND_RESPONSE = 'E503'

Invalid or unexpected response from backend.

BACKEND_TIMEOUT class-attribute instance-attribute
BACKEND_TIMEOUT = 'E504'

Backend request timed out.

BACKEND_NOT_FOUND class-attribute instance-attribute
BACKEND_NOT_FOUND = 'E505'

Backend executable or service not found.

PREFLIGHT_PATH_MISSING class-attribute instance-attribute
PREFLIGHT_PATH_MISSING = 'E601'

Required path does not exist (working_dir, referenced file).

PREFLIGHT_PROMPT_TOO_LARGE class-attribute instance-attribute
PREFLIGHT_PROMPT_TOO_LARGE = 'E602'

Prompt exceeds recommended token limit.

PREFLIGHT_WORKING_DIR_INVALID class-attribute instance-attribute
PREFLIGHT_WORKING_DIR_INVALID = 'E603'

Working directory is not accessible or not a directory.

PREFLIGHT_VALIDATION_SETUP class-attribute instance-attribute
PREFLIGHT_VALIDATION_SETUP = 'E604'

Validation target path or pattern is invalid.

NETWORK_CONNECTION_FAILED class-attribute instance-attribute
NETWORK_CONNECTION_FAILED = 'E901'

Network connection failed (refused, reset, unreachable).

NETWORK_DNS_ERROR class-attribute instance-attribute
NETWORK_DNS_ERROR = 'E902'

DNS resolution failed.

NETWORK_SSL_ERROR class-attribute instance-attribute
NETWORK_SSL_ERROR = 'E903'

SSL/TLS handshake or certificate error.

NETWORK_TIMEOUT class-attribute instance-attribute
NETWORK_TIMEOUT = 'E904'

Network operation timed out.

UNKNOWN class-attribute instance-attribute
UNKNOWN = 'E999'

Unclassified error - requires investigation.

category property
category

Get the category prefix (first digit) of this error code.

Returns:

Type Description
str

Category string like "execution", "rate_limit", "validation", etc.

is_retriable property
is_retriable

Check if this error code is generally retriable.

Returns:

Type Description
bool

True if errors with this code are typically retriable.

Functions
get_retry_behavior
get_retry_behavior()

Get precise retry behavior for this error code.

Returns error-code-specific delay and retry recommendations. Uses module-level _RETRY_BEHAVIORS constant to avoid rebuilding the lookup table on every call.

Returns:

Type Description
RetryBehavior

RetryBehavior with delay, retriability, and reason.

Source code in src/marianne/core/errors/codes.py
def get_retry_behavior(self) -> RetryBehavior:
    """Get precise retry behavior for this error code.

    Returns error-code-specific delay and retry recommendations.
    Uses module-level _RETRY_BEHAVIORS constant to avoid rebuilding
    the lookup table on every call.

    Returns:
        RetryBehavior with delay, retriability, and reason.
    """
    return _RETRY_BEHAVIORS.get(
        self,
        RetryBehavior(
            delay_seconds=30.0,
            is_retriable=self.is_retriable,
            reason=f"Default behavior for {self.value}",
        ),
    )
get_severity
get_severity()

Get the severity level for this error code.

Severity assignments: - CRITICAL: Fatal errors requiring immediate attention - ERROR: Most error codes (default) - WARNING: Degraded but potentially temporary conditions - INFO: Reserved for future diagnostic codes

Returns:

Type Description
Severity

Severity level for this error code.

Source code in src/marianne/core/errors/codes.py
def get_severity(self) -> Severity:
    """Get the severity level for this error code.

    Severity assignments:
    - CRITICAL: Fatal errors requiring immediate attention
    - ERROR: Most error codes (default)
    - WARNING: Degraded but potentially temporary conditions
    - INFO: Reserved for future diagnostic codes

    Returns:
        Severity level for this error code.
    """
    # Critical errors - job cannot continue
    critical_codes = {
        ErrorCode.EXECUTION_CRASHED,
        ErrorCode.EXECUTION_OOM,
        ErrorCode.STATE_CORRUPTION,
        ErrorCode.BACKEND_AUTH,
        ErrorCode.BACKEND_NOT_FOUND,
    }
    if self in critical_codes:
        return Severity.CRITICAL

    # Warning level - degraded but potentially temporary
    warning_codes = {
        ErrorCode.CAPACITY_EXCEEDED,
        ErrorCode.VALIDATION_TIMEOUT,
        ErrorCode.EXECUTION_STALE,
    }
    if self in warning_codes:
        return Severity.WARNING

    # Default to ERROR for most codes
    return Severity.ERROR

FatalError

Bases: Exception

Non-recoverable error that should stop the job.

GracefulShutdownError

Bases: Exception

Raised when Ctrl+C is pressed to trigger graceful shutdown.

This exception is caught by the runner to save state before exiting.

RateLimitExhaustedError

RateLimitExhaustedError(message, resume_after=None, backend_type='unknown', quota_exhaustion=False)

Bases: FatalError

Rate limit or quota exhaustion — job should PAUSE, not FAIL.

Subclasses FatalError for backward compatibility: existing except FatalError blocks still catch it, but more specific except RateLimitExhaustedError blocks intercept first when ordered before except FatalError.

Attributes:

Name Type Description
resume_after

When the rate limit resets (ISO datetime), or None.

backend_type

Which backend hit the limit (e.g., "claude-cli").

quota_exhaustion

True if daily/monthly quota is exhausted, False if it's a per-minute rate limit.

Source code in src/marianne/core/errors/exceptions.py
def __init__(
    self,
    message: str,
    resume_after: datetime | Any | None = None,
    backend_type: str = "unknown",
    quota_exhaustion: bool = False,
) -> None:
    super().__init__(message)
    self.resume_after = resume_after
    self.backend_type = backend_type
    self.quota_exhaustion = quota_exhaustion

JobCompletionSummary

Bases: BaseModel

Summary of a completed job run.

Pydantic v2 model tracking key metrics for display at job completion: - Sheet success/failure/skip counts - Validation pass rate - Cost tracking - Duration and retry statistics - Hook execution results

Attributes
success_rate property
success_rate

Calculate sheet success rate as percentage.

Skipped sheets are excluded from the denominator since they were never attempted (e.g., skip_when_command conditions met).

Functions
from_checkpoint classmethod
from_checkpoint(checkpoint)

Construct a summary from checkpoint state.

Computes rates from sheet states, sums costs and durations.

Parameters:

Name Type Description Default
checkpoint CheckpointState

The checkpoint state to summarize.

required

Returns:

Type Description
JobCompletionSummary

JobCompletionSummary with computed metrics.

Source code in src/marianne/core/models.py
@classmethod
def from_checkpoint(
    cls, checkpoint: CheckpointState
) -> JobCompletionSummary:
    """Construct a summary from checkpoint state.

    Computes rates from sheet states, sums costs and durations.

    Args:
        checkpoint: The checkpoint state to summarize.

    Returns:
        JobCompletionSummary with computed metrics.
    """
    completed = 0
    failed = 0
    skipped = 0
    total_cost = 0.0
    total_duration = 0.0
    validation_passed = 0
    validation_failed = 0
    successes_no_retry = 0

    for sheet_state in checkpoint.sheets.values():
        if sheet_state.status == SheetStatus.COMPLETED:
            completed += 1
            if sheet_state.success_without_retry:
                successes_no_retry += 1
        elif sheet_state.status == SheetStatus.FAILED:
            failed += 1
        elif sheet_state.status == SheetStatus.SKIPPED:
            skipped += 1

        if sheet_state.validation_passed is True:
            validation_passed += 1
        elif sheet_state.validation_passed is False:
            validation_failed += 1

        total_cost += sheet_state.total_cost_usd
        total_duration += sheet_state.total_duration_seconds

    total_validations = validation_passed + validation_failed
    val_rate = (
        (validation_passed / total_validations * 100)
        if total_validations > 0
        else 100.0
    )
    no_retry_rate = (
        (successes_no_retry / completed * 100) if completed > 0 else 0.0
    )

    return cls(
        job_id=checkpoint.job_id,
        job_name=checkpoint.job_name,
        total_sheets=checkpoint.total_sheets,
        completed_sheets=completed,
        failed_sheets=failed,
        skipped_sheets=skipped,
        total_cost_usd=total_cost,
        total_duration_seconds=total_duration,
        validation_pass_rate=val_rate,
        success_without_retry_rate=no_retry_rate,
        validation_pass_count=validation_passed,
        validation_fail_count=validation_failed,
        successes_without_retry=successes_no_retry,
        final_status=checkpoint.status,
        total_input_tokens=checkpoint.total_input_tokens,
        total_output_tokens=checkpoint.total_output_tokens,
        total_estimated_cost=checkpoint.total_estimated_cost,
        cost_limit_hit=checkpoint.cost_limit_reached,
    )
to_dict
to_dict()

Convert summary to dictionary for JSON output.

Source code in src/marianne/core/models.py
def to_dict(self) -> dict[str, Any]:
    """Convert summary to dictionary for JSON output."""
    return {
        "job_id": self.job_id,
        "job_name": self.job_name,
        "status": self.final_status.value,
        "duration_seconds": round(self.total_duration_seconds, 2),
        "duration_formatted": self._format_duration(
            self.total_duration_seconds
        ),
        "sheets": {
            "total": self.total_sheets,
            "completed": self.completed_sheets,
            "failed": self.failed_sheets,
            "skipped": self.skipped_sheets,
            "success_rate": round(self.success_rate, 1),
        },
        "validation": {
            "passed": self.validation_pass_count,
            "failed": self.validation_fail_count,
            "pass_rate": round(self.validation_pass_rate, 1),
        },
        "execution": {
            "total_retries": self.total_retries,
            "completion_attempts": self.total_completion_attempts,
            "rate_limit_waits": self.rate_limit_waits,
            "successes_without_retry": self.successes_without_retry,
            "success_without_retry_rate": round(
                self.success_without_retry_rate, 1
            ),
        },
    }

GroundingDecisionContext dataclass

GroundingDecisionContext(passed, message, confidence=1.0, should_escalate=False, recovery_guidance=None, hooks_executed=0)

Context from grounding hooks for completion mode decisions.

Encapsulates grounding results to inform decision-making about whether to retry, complete, or escalate.

Functions
from_results classmethod
from_results(results)

Build context from grounding results list.

Source code in src/marianne/core/summary.py
@classmethod
def from_results(cls, results: list[GroundingResult]) -> GroundingDecisionContext:
    """Build context from grounding results list."""
    if not results:
        return cls(passed=True, message="No grounding hooks executed", hooks_executed=0)

    passed = all(r.passed for r in results)
    confidences = [r.confidence for r in results]
    avg_confidence = sum(confidences) / len(confidences) if confidences else 1.0
    should_escalate = any(r.should_escalate for r in results)

    failed = [r for r in results if not r.passed]
    recovery_guidance = None
    if failed:
        guidance_parts = [r.recovery_guidance for r in failed if r.recovery_guidance]
        if guidance_parts:
            recovery_guidance = "; ".join(guidance_parts)

    if passed:
        message = f"All {len(results)} grounding check(s) passed"
    else:
        failures = ", ".join(f"{r.hook_name}: {r.message}" for r in failed)
        message = f"{len(failed)}/{len(results)} grounding check(s) failed: {failures}"

    return cls(
        passed=passed,
        message=message,
        confidence=avg_confidence,
        should_escalate=should_escalate,
        recovery_guidance=recovery_guidance,
        hooks_executed=len(results),
    )
disabled classmethod
disabled()

Create context when grounding is disabled.

Source code in src/marianne/core/summary.py
@classmethod
def disabled(cls) -> GroundingDecisionContext:
    """Create context when grounding is disabled."""
    return cls(passed=True, message="Grounding not enabled", hooks_executed=0)

SheetExecutionMode

Bases: str, Enum

Mode of sheet execution.