Skip to content

Index

execution

Execution layer for Marianne jobs.

Contains validation, retry logic, circuit breaker, and adaptive retry strategy.

Classes

FatalError

Bases: Exception

Non-recoverable error that should stop the job.

SheetExecutionMode

Bases: str, Enum

Mode of sheet execution.

CircuitBreaker

CircuitBreaker(failure_threshold=5, recovery_timeout=300.0, name='default')

Circuit breaker for preventing cascading failures.

The circuit breaker monitors execution success/failure and automatically blocks further requests when a failure threshold is exceeded. This prevents overwhelming a failing service and gives it time to recover.

Async-safe: All state modifications are protected by an asyncio.Lock.

Attributes:

Name Type Description
failure_threshold int

Number of consecutive failures before opening circuit.

recovery_timeout float

Seconds to wait before testing recovery (OPEN -> HALF_OPEN).

state float

Current circuit state.

stats float

Statistics about circuit breaker behavior.

Initialize circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of consecutive failures before opening the circuit. Default is 5.

5
recovery_timeout float

Seconds to wait in OPEN state before transitioning to HALF_OPEN to test recovery. Default is 300 (5 minutes).

300.0
name str

Name for this circuit breaker (used in logging).

'default'
Source code in src/marianne/execution/circuit_breaker.py
def __init__(
    self,
    failure_threshold: int = 5,
    recovery_timeout: float = 300.0,
    name: str = "default",
) -> None:
    """Initialize circuit breaker.

    Args:
        failure_threshold: Number of consecutive failures before opening
            the circuit. Default is 5.
        recovery_timeout: Seconds to wait in OPEN state before transitioning
            to HALF_OPEN to test recovery. Default is 300 (5 minutes).
        name: Name for this circuit breaker (used in logging).
    """
    if failure_threshold < 1:
        raise ValueError("failure_threshold must be at least 1")
    if recovery_timeout <= 0:
        raise ValueError("recovery_timeout must be positive")

    self._failure_threshold = failure_threshold
    self._recovery_timeout = recovery_timeout
    self._name = name

    # State (protected by lock)
    self._state = CircuitState.CLOSED
    self._failure_count = 0
    self._last_failure_time: float | None = None
    self._stats = CircuitBreakerStats()

    # Async safety — asyncio.Lock is the correct primitive for
    # async code (threading.Lock blocks the event loop).
    self._lock = asyncio.Lock()

    _logger.debug(
        "circuit_breaker.initialized",
        name=name,
        failure_threshold=failure_threshold,
        recovery_timeout=recovery_timeout,
    )
Attributes
failure_threshold property
failure_threshold

Number of consecutive failures before opening circuit.

recovery_timeout property
recovery_timeout

Seconds to wait before testing recovery.

name property
name

Name of this circuit breaker.

Functions
get_state async
get_state()

Get the current circuit state.

This method handles automatic state transitions: - If OPEN and recovery_timeout has elapsed, transitions to HALF_OPEN.

Returns:

Type Description
CircuitState

Current CircuitState.

Source code in src/marianne/execution/circuit_breaker.py
async def get_state(self) -> CircuitState:
    """Get the current circuit state.

    This method handles automatic state transitions:
    - If OPEN and recovery_timeout has elapsed, transitions to HALF_OPEN.

    Returns:
        Current CircuitState.
    """
    async with self._lock:
        self._maybe_transition_to_half_open()
        return self._state
can_execute async
can_execute()

Check if a request can be executed.

Returns True if: - Circuit is CLOSED (normal operation) - Circuit is HALF_OPEN (testing recovery) - Circuit is OPEN but recovery_timeout has elapsed (transitions to HALF_OPEN)

Returns False if: - Circuit is OPEN and recovery_timeout hasn't elapsed

Returns:

Type Description
bool

True if the request should be allowed, False if it should be blocked.

Source code in src/marianne/execution/circuit_breaker.py
async def can_execute(self) -> bool:
    """Check if a request can be executed.

    Returns True if:
    - Circuit is CLOSED (normal operation)
    - Circuit is HALF_OPEN (testing recovery)
    - Circuit is OPEN but recovery_timeout has elapsed (transitions to HALF_OPEN)

    Returns False if:
    - Circuit is OPEN and recovery_timeout hasn't elapsed

    Returns:
        True if the request should be allowed, False if it should be blocked.
    """
    async with self._lock:
        self._maybe_transition_to_half_open()
        return self._state in (CircuitState.CLOSED, CircuitState.HALF_OPEN)
record_success async
record_success()

Record a successful operation.

Effects by state: - CLOSED: Resets consecutive failure count - HALF_OPEN: Transitions to CLOSED (recovery confirmed) - OPEN: No effect (shouldn't happen - request blocked)

Source code in src/marianne/execution/circuit_breaker.py
async def record_success(self) -> None:
    """Record a successful operation.

    Effects by state:
    - CLOSED: Resets consecutive failure count
    - HALF_OPEN: Transitions to CLOSED (recovery confirmed)
    - OPEN: No effect (shouldn't happen - request blocked)
    """
    async with self._lock:
        self._stats.total_successes += 1
        self._stats.consecutive_failures = 0
        self._failure_count = 0

        if self._state == CircuitState.HALF_OPEN:
            # Recovery confirmed - close the circuit
            _logger.info(
                "circuit_breaker.state_changed",
                name=self._name,
                from_state=CircuitState.HALF_OPEN.value,
                to_state=CircuitState.CLOSED.value,
                reason="recovery_confirmed",
            )
            self._set_state(CircuitState.CLOSED)
        elif self._state == CircuitState.CLOSED:
            _logger.debug(
                "circuit_breaker.success_recorded",
                name=self._name,
                state=self._state.value,
            )
record_failure async
record_failure()

Record a failed operation.

Effects by state: - CLOSED: Increments failure count, may transition to OPEN - HALF_OPEN: Transitions to OPEN (recovery failed) - OPEN: No effect (shouldn't happen - request blocked)

Source code in src/marianne/execution/circuit_breaker.py
async def record_failure(self) -> None:
    """Record a failed operation.

    Effects by state:
    - CLOSED: Increments failure count, may transition to OPEN
    - HALF_OPEN: Transitions to OPEN (recovery failed)
    - OPEN: No effect (shouldn't happen - request blocked)
    """
    async with self._lock:
        now = time.monotonic()
        self._stats.total_failures += 1
        self._stats.consecutive_failures += 1
        self._stats.last_failure_at = now
        self._failure_count += 1
        self._last_failure_time = now

        if self._state == CircuitState.HALF_OPEN:
            # Recovery test failed - reopen the circuit
            _logger.info(
                "circuit_breaker.state_changed",
                name=self._name,
                from_state=CircuitState.HALF_OPEN.value,
                to_state=CircuitState.OPEN.value,
                reason="recovery_test_failed",
            )
            self._set_state(CircuitState.OPEN)
        elif self._state == CircuitState.CLOSED:
            if self._failure_count >= self._failure_threshold:
                # Threshold exceeded - open the circuit
                _logger.info(
                    "circuit_breaker.state_changed",
                    name=self._name,
                    from_state=CircuitState.CLOSED.value,
                    to_state=CircuitState.OPEN.value,
                    reason="failure_threshold_exceeded",
                    failure_count=self._failure_count,
                    failure_threshold=self._failure_threshold,
                )
                self._set_state(CircuitState.OPEN)
            else:
                _logger.debug(
                    "circuit_breaker.failure_recorded",
                    name=self._name,
                    state=self._state.value,
                    failure_count=self._failure_count,
                    failure_threshold=self._failure_threshold,
                )
time_until_retry async
time_until_retry()

Get time remaining until retry is allowed.

Returns:

Type Description
float | None

Seconds until the circuit transitions to HALF_OPEN, or None if

float | None

the circuit is not OPEN.

Source code in src/marianne/execution/circuit_breaker.py
async def time_until_retry(self) -> float | None:
    """Get time remaining until retry is allowed.

    Returns:
        Seconds until the circuit transitions to HALF_OPEN, or None if
        the circuit is not OPEN.
    """
    async with self._lock:
        if self._state != CircuitState.OPEN:
            return None

        if self._last_failure_time is None:
            return None

        elapsed = time.monotonic() - self._last_failure_time
        remaining = self._recovery_timeout - elapsed
        return max(0.0, remaining)
record_cost async
record_cost(input_tokens, output_tokens, estimated_cost)

Record token usage and estimated cost from an execution.

Updates running totals for cost tracking. Call this after each successful or failed execution that consumed tokens.

Parameters:

Name Type Description Default
input_tokens int

Number of input tokens consumed.

required
output_tokens int

Number of output tokens consumed.

required
estimated_cost float

Estimated cost in USD for this execution.

required
Source code in src/marianne/execution/circuit_breaker.py
async def record_cost(
    self,
    input_tokens: int,
    output_tokens: int,
    estimated_cost: float,
) -> None:
    """Record token usage and estimated cost from an execution.

    Updates running totals for cost tracking. Call this after each
    successful or failed execution that consumed tokens.

    Args:
        input_tokens: Number of input tokens consumed.
        output_tokens: Number of output tokens consumed.
        estimated_cost: Estimated cost in USD for this execution.
    """
    async with self._lock:
        self._stats.total_input_tokens += input_tokens
        self._stats.total_output_tokens += output_tokens
        self._stats.total_estimated_cost += estimated_cost

        _logger.debug(
            "circuit_breaker.cost_recorded",
            name=self._name,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            estimated_cost=round(estimated_cost, 6),
            total_input_tokens=self._stats.total_input_tokens,
            total_output_tokens=self._stats.total_output_tokens,
            total_estimated_cost=round(self._stats.total_estimated_cost, 4),
        )
check_cost_threshold async
check_cost_threshold(max_cost)

Check if total estimated cost exceeds a threshold.

Parameters:

Name Type Description Default
max_cost float

Maximum allowed cost in USD.

required

Returns:

Type Description
bool

True if threshold is exceeded (should stop), False otherwise.

Source code in src/marianne/execution/circuit_breaker.py
async def check_cost_threshold(self, max_cost: float) -> bool:
    """Check if total estimated cost exceeds a threshold.

    Args:
        max_cost: Maximum allowed cost in USD.

    Returns:
        True if threshold is exceeded (should stop), False otherwise.
    """
    async with self._lock:
        exceeded = self._stats.total_estimated_cost > max_cost
        if exceeded:
            _logger.warning(
                "circuit_breaker.cost_threshold_exceeded",
                name=self._name,
                total_estimated_cost=round(self._stats.total_estimated_cost, 4),
                max_cost=max_cost,
            )
        return exceeded
get_stats async
get_stats()

Get current statistics.

Returns:

Type Description
CircuitBreakerStats

Copy of current CircuitBreakerStats.

Source code in src/marianne/execution/circuit_breaker.py
async def get_stats(self) -> CircuitBreakerStats:
    """Get current statistics.

    Returns:
        Copy of current CircuitBreakerStats.
    """
    async with self._lock:
        # Return a copy to prevent external modification
        return CircuitBreakerStats(
            total_successes=self._stats.total_successes,
            total_failures=self._stats.total_failures,
            times_opened=self._stats.times_opened,
            times_half_opened=self._stats.times_half_opened,
            times_closed=self._stats.times_closed,
            last_failure_at=self._stats.last_failure_at,
            last_state_change_at=self._stats.last_state_change_at,
            consecutive_failures=self._stats.consecutive_failures,
            total_input_tokens=self._stats.total_input_tokens,
            total_output_tokens=self._stats.total_output_tokens,
            total_estimated_cost=self._stats.total_estimated_cost,
        )
reset async
reset()

Reset the circuit breaker to initial state.

This resets: - State to CLOSED - Failure counts to 0 - Last failure time to None

Statistics are NOT reset (use get_stats() to view history).

Source code in src/marianne/execution/circuit_breaker.py
async def reset(self) -> None:
    """Reset the circuit breaker to initial state.

    This resets:
    - State to CLOSED
    - Failure counts to 0
    - Last failure time to None

    Statistics are NOT reset (use get_stats() to view history).
    """
    async with self._lock:
        old_state = self._state
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
        self._stats.consecutive_failures = 0

        if old_state != CircuitState.CLOSED:
            self._stats.times_closed += 1
            self._stats.last_state_change_at = time.monotonic()
            _logger.info(
                "circuit_breaker.reset",
                name=self._name,
                from_state=old_state.value,
            )
force_open async
force_open()

Force the circuit to OPEN state.

Useful for manual intervention or testing.

Source code in src/marianne/execution/circuit_breaker.py
async def force_open(self) -> None:
    """Force the circuit to OPEN state.

    Useful for manual intervention or testing.
    """
    async with self._lock:
        if self._state != CircuitState.OPEN:
            old_state = self._state
            self._set_state(CircuitState.OPEN)
            self._last_failure_time = time.monotonic()
            _logger.info(
                "circuit_breaker.force_opened",
                name=self._name,
                from_state=old_state.value,
            )
force_close async
force_close()

Force the circuit to CLOSED state.

Useful for manual intervention or testing. Also resets failure counts.

Source code in src/marianne/execution/circuit_breaker.py
async def force_close(self) -> None:
    """Force the circuit to CLOSED state.

    Useful for manual intervention or testing. Also resets failure counts.
    """
    async with self._lock:
        old_state = self._state
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._stats.consecutive_failures = 0

        if old_state != CircuitState.CLOSED:
            self._stats.times_closed += 1
            self._stats.last_state_change_at = time.monotonic()
            _logger.info(
                "circuit_breaker.force_closed",
                name=self._name,
                from_state=old_state.value,
            )
__repr__
__repr__()

Get string representation of circuit breaker.

Source code in src/marianne/execution/circuit_breaker.py
def __repr__(self) -> str:
    """Get string representation of circuit breaker."""
    return (
        f"CircuitBreaker(name={self._name!r}, state={self._state.value}, "
        f"failures={self._failure_count}/{self._failure_threshold})"
    )

CircuitBreakerStats dataclass

CircuitBreakerStats(total_successes=0, total_failures=0, times_opened=0, times_half_opened=0, times_closed=0, last_failure_at=None, last_state_change_at=None, consecutive_failures=0, total_input_tokens=0, total_output_tokens=0, total_estimated_cost=0.0)

Statistics for circuit breaker monitoring.

Provides visibility into the circuit breaker's behavior for observability and debugging, including cost tracking.

Attributes
total_successes class-attribute instance-attribute
total_successes = 0

Total number of successful operations recorded.

total_failures class-attribute instance-attribute
total_failures = 0

Total number of failed operations recorded.

times_opened class-attribute instance-attribute
times_opened = 0

Number of times the circuit has transitioned to OPEN state.

times_half_opened class-attribute instance-attribute
times_half_opened = 0

Number of times the circuit has transitioned to HALF_OPEN state.

times_closed class-attribute instance-attribute
times_closed = 0

Number of times the circuit has transitioned to CLOSED from another state.

last_failure_at class-attribute instance-attribute
last_failure_at = None

Timestamp of the most recent failure (monotonic time).

last_state_change_at class-attribute instance-attribute
last_state_change_at = None

Timestamp of the most recent state transition.

consecutive_failures class-attribute instance-attribute
consecutive_failures = 0

Current count of consecutive failures (resets on success).

total_input_tokens class-attribute instance-attribute
total_input_tokens = 0

Total input tokens consumed across all executions.

total_output_tokens class-attribute instance-attribute
total_output_tokens = 0

Total output tokens consumed across all executions.

total_estimated_cost class-attribute instance-attribute
total_estimated_cost = 0.0

Total estimated cost in USD across all executions.

Functions
to_dict
to_dict()

Convert stats to dictionary for logging/serialization.

Returns:

Type Description
dict[str, Any]

Dictionary representation of all statistics.

Source code in src/marianne/execution/circuit_breaker.py
def to_dict(self) -> dict[str, Any]:
    """Convert stats to dictionary for logging/serialization.

    Returns:
        Dictionary representation of all statistics.
    """
    return {
        "total_successes": self.total_successes,
        "total_failures": self.total_failures,
        "times_opened": self.times_opened,
        "times_half_opened": self.times_half_opened,
        "times_closed": self.times_closed,
        "consecutive_failures": self.consecutive_failures,
        "last_failure_at": self.last_failure_at,
        "last_state_change_at": self.last_state_change_at,
        "total_input_tokens": self.total_input_tokens,
        "total_output_tokens": self.total_output_tokens,
        "total_estimated_cost": self.total_estimated_cost,
    }

CircuitState

Bases: str, Enum

State of the circuit breaker.

The circuit breaker transitions between these states based on success/failure patterns:

  • CLOSED: Normal operation. Failures are tracked but requests are allowed.
  • OPEN: Blocking mode. Requests are rejected until recovery_timeout elapses.
  • HALF_OPEN: Testing mode. A single request is allowed to test recovery.
Attributes
CLOSED class-attribute instance-attribute
CLOSED = 'closed'

Normal operation - requests are allowed and failures are tracked.

OPEN class-attribute instance-attribute
OPEN = 'open'

Blocking calls - requests are rejected, waiting for recovery timeout.

HALF_OPEN class-attribute instance-attribute
HALF_OPEN = 'half_open'

Testing recovery - one request is allowed to test if service recovered.

AdaptiveRetryStrategy

AdaptiveRetryStrategy(config=None, delay_history=None, global_learning_store=None)

Intelligent retry strategy that analyzes error patterns.

The strategy examines error history to detect patterns and make informed retry decisions. Key features:

  1. Rapid Failure Detection: If multiple errors occur in a short window, applies longer backoff to avoid overwhelming the system.

  2. Repeated Error Detection: If the same error code appears repeatedly, may recommend different strategies or lower confidence.

  3. Rate Limit Handling: Uses suggested wait times from rate limit errors, with additional buffer.

  4. Cascading Failure Detection: If errors are getting different/worse, may recommend stopping to prevent further damage.

  5. Recovery Detection: If recent attempts succeeded after failures, uses shorter delays to capitalize on recovery.

  6. Delay Learning with Circuit Breaker: When a DelayHistory is provided, the strategy learns optimal delays from past outcomes. A circuit breaker protects against bad learned delays by reverting to static delays after 3 consecutive failures.

Circuit Breaker State Design

The circuit breaker state (_learned_delay_failures, _use_learned_delay) is intentionally ephemeral and NOT persisted. This is a deliberate design choice with the following trade-offs:

Benefits: - After restart, the system gets a "fresh start" to try learned delays - Avoids persisting potentially stale circuit breaker state - Simple implementation without additional state management

Trade-offs: - After restart, may retry with a previously-failed learned delay once - Circuit breaker will re-trigger after 3 failures if the learned delay is still problematic

The DelayHistory itself CAN be persisted (it's just delay outcomes), but the circuit breaker resets on each AdaptiveRetryStrategy instantiation. Use reset_circuit_breaker() to manually reset circuit breaker state for a specific error code during runtime.

Thread-safe: No mutable state; all analysis is based on input history.

Example

strategy = AdaptiveRetryStrategy()

Analyze error history

recommendation = strategy.analyze(error_history)

Log the decision

logger.info( "retry_decision", should_retry=recommendation.should_retry, delay=recommendation.delay_seconds, pattern=recommendation.detected_pattern.value, reason=recommendation.reason, )

Initialize the adaptive retry strategy.

Parameters:

Name Type Description Default
config RetryStrategyConfig | None

Optional configuration. Uses defaults if not provided.

None
delay_history DelayHistory | None

Optional delay history for learning. If not provided, learning features are disabled (purely static delays).

None
global_learning_store GlobalLearningStore | None

Optional global learning store for cross-workspace learned delays (Evolution #3: Learned Wait Time Injection). If provided, blend_historical_delay() will query global store for cross-workspace learned delays when in-memory history is insufficient.

None
Source code in src/marianne/execution/retry_strategy.py
def __init__(
    self,
    config: RetryStrategyConfig | None = None,
    delay_history: DelayHistory | None = None,
    global_learning_store: GlobalLearningStore | None = None,
) -> None:
    """Initialize the adaptive retry strategy.

    Args:
        config: Optional configuration. Uses defaults if not provided.
        delay_history: Optional delay history for learning. If not provided,
            learning features are disabled (purely static delays).
        global_learning_store: Optional global learning store for cross-workspace
            learned delays (Evolution #3: Learned Wait Time Injection).
            If provided, blend_historical_delay() will query global store
            for cross-workspace learned delays when in-memory history is
            insufficient.
    """
    self.config = config or RetryStrategyConfig()
    self._delay_history = delay_history
    self._global_store = global_learning_store

    # Circuit breaker: protects against bad learned delays.
    # See class docstring "Circuit Breaker State Design" for rationale.
    self._circuit_breaker = LearnedDelayCircuitBreaker()

    # Backward-compatible aliases for any direct access in tests
    self._learned_delay_failures = self._circuit_breaker._failures
    self._use_learned_delay = self._circuit_breaker._enabled
Functions
analyze
analyze(error_history, max_retries=None)

Analyze error history and recommend retry behavior.

This is the main entry point for the adaptive retry strategy. It examines the error history to detect patterns and returns a recommendation with reasoning.

Parameters:

Name Type Description Default
error_history list[ErrorRecord]

List of ErrorRecords in chronological order.

required
max_retries int | None

Optional maximum retries to consider (for confidence).

None

Returns:

Type Description
RetryRecommendation

RetryRecommendation with decision, delay, and reasoning.

Source code in src/marianne/execution/retry_strategy.py
def analyze(
    self,
    error_history: list[ErrorRecord],
    max_retries: int | None = None,
) -> RetryRecommendation:
    """Analyze error history and recommend retry behavior.

    This is the main entry point for the adaptive retry strategy.
    It examines the error history to detect patterns and returns
    a recommendation with reasoning.

    Args:
        error_history: List of ErrorRecords in chronological order.
        max_retries: Optional maximum retries to consider (for confidence).

    Returns:
        RetryRecommendation with decision, delay, and reasoning.
    """
    if not error_history:
        # No errors - this shouldn't happen, but handle gracefully
        return RetryRecommendation(
            should_retry=True,
            delay_seconds=self.config.base_delay,
            reason="No error history - using default retry",
            confidence=0.5,
            detected_pattern=RetryPattern.NONE,
            strategy_used="default",
        )

    # Get the most recent error
    latest_error = error_history[-1]
    attempt_count = len(error_history)

    # Check for non-retriable error first
    if not latest_error.retriable:
        return self._recommend_no_retry(
            latest_error,
            "Error is not retriable",
            confidence=0.95,
            pattern=RetryPattern.NONE,
        )

    # Detect patterns in the error history
    pattern = self._detect_pattern(error_history)

    # Get recommendation based on pattern
    recommendation = self._recommend_for_pattern(
        pattern=pattern,
        error_history=error_history,
        latest_error=latest_error,
        attempt_count=attempt_count,
        max_retries=max_retries,
    )

    # Propagate root cause confidence from latest error to recommendation
    recommendation.root_cause_confidence = latest_error.root_cause_confidence

    # Log the decision including root cause confidence
    _logger.info(
        "retry_strategy.decision",
        should_retry=recommendation.should_retry,
        delay_seconds=round(recommendation.delay_seconds, 2),
        confidence=round(recommendation.confidence, 3),
        detected_pattern=pattern.value,
        strategy_used=recommendation.strategy_used,
        attempt_count=attempt_count,
        latest_error_code=latest_error.error_code.value,
        reason=recommendation.reason,
        root_cause_confidence=(
            round(latest_error.root_cause_confidence, 3)
            if latest_error.root_cause_confidence is not None
            else None
        ),
        secondary_error_count=latest_error.secondary_error_count,
    )

    return recommendation
blend_historical_delay
blend_historical_delay(error_code, static_delay)

Blend learned delay with static delay for an error code.

Priority order: 1. Circuit breaker override → static 2. In-memory delay history (job-specific learning) 3. Global learning store (cross-workspace learned delays) 4. Static delay (fallback)

Parameters:

Name Type Description Default
error_code ErrorCode

The error code to get delay for.

required
static_delay float

The static delay from ErrorCode.get_retry_behavior().

required

Returns:

Type Description
tuple[float, str]

Tuple of (blended_delay, strategy_name).

Source code in src/marianne/execution/retry_strategy.py
def blend_historical_delay(
    self,
    error_code: ErrorCode,
    static_delay: float,
) -> tuple[float, str]:
    """Blend learned delay with static delay for an error code.

    Priority order:
    1. Circuit breaker override → static
    2. In-memory delay history (job-specific learning)
    3. Global learning store (cross-workspace learned delays)
    4. Static delay (fallback)

    Args:
        error_code: The error code to get delay for.
        static_delay: The static delay from ErrorCode.get_retry_behavior().

    Returns:
        Tuple of (blended_delay, strategy_name).
    """
    if not self._circuit_breaker.is_enabled(error_code):
        return static_delay, "static_circuit_breaker"

    result = self._try_inmemory_delay(error_code, static_delay)
    if result is not None:
        return result

    result = self._try_global_delay(error_code, static_delay)
    if result is not None:
        return result

    # Fallback: distinguish bootstrap phase from no-history
    if self._delay_history is not None:
        return static_delay, "static_bootstrap"
    return static_delay, "static"
record_delay_outcome
record_delay_outcome(error_code, delay_used, succeeded)

Record the outcome of a retry delay for learning.

Should be called after each retry attempt to update the delay history. Also updates circuit breaker state.

Parameters:

Name Type Description Default
error_code ErrorCode

The error code that was being retried.

required
delay_used float

The delay in seconds that was used.

required
succeeded bool

Whether the retry succeeded after this delay.

required
Source code in src/marianne/execution/retry_strategy.py
def record_delay_outcome(
    self,
    error_code: ErrorCode,
    delay_used: float,
    succeeded: bool,
) -> None:
    """Record the outcome of a retry delay for learning.

    Should be called after each retry attempt to update the delay history.
    Also updates circuit breaker state.

    Args:
        error_code: The error code that was being retried.
        delay_used: The delay in seconds that was used.
        succeeded: Whether the retry succeeded after this delay.
    """
    if self._delay_history is None:
        return

    # Record the outcome
    outcome = DelayOutcome(
        error_code=error_code,
        delay_seconds=delay_used,
        succeeded_after=succeeded,
    )
    self._delay_history.record(outcome)

    # Update circuit breaker state
    self._circuit_breaker.record_outcome(error_code, succeeded)
reset_circuit_breaker
reset_circuit_breaker(error_code)

Reset circuit breaker for an error code, re-enabling learned delays.

Call this method when you want to give learned delays another chance after the circuit breaker has tripped. Common scenarios:

  • After manual intervention that fixed the underlying issue
  • After a cooling-off period with successful static delays
  • At the start of a new batch/job where conditions may have changed

Note: The circuit breaker state is ephemeral (not persisted), so it automatically resets when a new AdaptiveRetryStrategy is instantiated. This method is for resetting during runtime without reinstantiation.

Parameters:

Name Type Description Default
error_code ErrorCode

The error code to reset circuit breaker for.

required
Example
After manual fix, give learned delays another chance

strategy.reset_circuit_breaker(ErrorCode.E101)

Source code in src/marianne/execution/retry_strategy.py
def reset_circuit_breaker(self, error_code: ErrorCode) -> None:
    """Reset circuit breaker for an error code, re-enabling learned delays.

    Call this method when you want to give learned delays another chance
    after the circuit breaker has tripped. Common scenarios:

    - After manual intervention that fixed the underlying issue
    - After a cooling-off period with successful static delays
    - At the start of a new batch/job where conditions may have changed

    Note: The circuit breaker state is ephemeral (not persisted), so it
    automatically resets when a new AdaptiveRetryStrategy is instantiated.
    This method is for resetting during runtime without reinstantiation.

    Args:
        error_code: The error code to reset circuit breaker for.

    Example:
        # After manual fix, give learned delays another chance
        strategy.reset_circuit_breaker(ErrorCode.E101)
    """
    self._circuit_breaker.reset(error_code)

ErrorRecord dataclass

ErrorRecord(timestamp, error_code, category, message, exit_code=None, exit_signal=None, retriable=True, suggested_wait=None, sheet_num=None, attempt_num=1, monotonic_time=monotonic(), root_cause_confidence=None, secondary_error_count=0)

Record of a single error occurrence for pattern analysis.

Captures all relevant information about an error to enable intelligent pattern detection across multiple errors.

Attributes:

Name Type Description
timestamp datetime

When the error occurred (UTC).

error_code ErrorCode

Structured error code (e.g., E001, E101).

category ErrorCategory

High-level error category (rate_limit, transient, etc.).

message str

Human-readable error description.

exit_code int | None

Process exit code if applicable.

exit_signal int | None

Signal number if killed by signal.

retriable bool

Whether this specific error is retriable.

suggested_wait float | None

Classifier's suggested wait time in seconds.

sheet_num int | None

Sheet number where error occurred.

attempt_num int

Which attempt number this was (1-indexed).

monotonic_time float

Monotonic timestamp for precise timing calculations.

root_cause_confidence float | None

Confidence in root cause identification (0.0-1.0).

secondary_error_count int

Number of secondary errors detected.

Functions
from_classified_error classmethod
from_classified_error(error, sheet_num=None, attempt_num=1)

Create an ErrorRecord from a ClassifiedError.

This is the primary factory method for creating ErrorRecords in the retry flow.

Parameters:

Name Type Description Default
error ClassifiedError

ClassifiedError from the error classifier.

required
sheet_num int | None

Optional sheet number for context.

None
attempt_num int

Which retry attempt this represents.

1

Returns:

Type Description
ErrorRecord

ErrorRecord populated from the classified error.

Source code in src/marianne/execution/retry_strategy.py
@classmethod
def from_classified_error(
    cls,
    error: ClassifiedError,
    sheet_num: int | None = None,
    attempt_num: int = 1,
) -> ErrorRecord:
    """Create an ErrorRecord from a ClassifiedError.

    This is the primary factory method for creating ErrorRecords
    in the retry flow.

    Args:
        error: ClassifiedError from the error classifier.
        sheet_num: Optional sheet number for context.
        attempt_num: Which retry attempt this represents.

    Returns:
        ErrorRecord populated from the classified error.
    """
    return cls(
        timestamp=datetime.now(UTC),
        error_code=error.error_code,
        category=error.category,
        message=error.message,
        exit_code=error.exit_code,
        exit_signal=error.exit_signal,
        retriable=error.retriable,
        suggested_wait=error.suggested_wait_seconds,
        sheet_num=sheet_num,
        attempt_num=attempt_num,
    )
from_classification_result classmethod
from_classification_result(result, sheet_num=None, attempt_num=1)

Create an ErrorRecord from a ClassificationResult.

This factory method captures root cause information from the multi-error classification, including confidence in root cause identification and the count of secondary errors. This enables the retry strategy to consider root cause confidence when making retry decisions.

Parameters:

Name Type Description Default
result ClassificationResult

ClassificationResult from classify_execution().

required
sheet_num int | None

Optional sheet number for context.

None
attempt_num int

Which retry attempt this represents.

1

Returns:

Type Description
ErrorRecord

ErrorRecord with root cause confidence and secondary error count.

Raises:

Type Description
ValueError

If confidence is not in valid range [0.0, 1.0].

Source code in src/marianne/execution/retry_strategy.py
@classmethod
def from_classification_result(
    cls,
    result: ClassificationResult,
    sheet_num: int | None = None,
    attempt_num: int = 1,
) -> ErrorRecord:
    """Create an ErrorRecord from a ClassificationResult.

    This factory method captures root cause information from the multi-error
    classification, including confidence in root cause identification and
    the count of secondary errors. This enables the retry strategy to
    consider root cause confidence when making retry decisions.

    Args:
        result: ClassificationResult from classify_execution().
        sheet_num: Optional sheet number for context.
        attempt_num: Which retry attempt this represents.

    Returns:
        ErrorRecord with root cause confidence and secondary error count.

    Raises:
        ValueError: If confidence is not in valid range [0.0, 1.0].
    """
    # Validate confidence is in valid range (defensive check)
    if not 0.0 <= result.confidence <= 1.0:
        raise ValueError(
            f"root_cause_confidence must be 0.0-1.0, got {result.confidence}"
        )

    primary = result.primary
    return cls(
        timestamp=datetime.now(UTC),
        error_code=primary.error_code,
        category=primary.category,
        message=primary.message,
        exit_code=primary.exit_code,
        exit_signal=primary.exit_signal,
        retriable=primary.retriable,
        suggested_wait=primary.suggested_wait_seconds,
        sheet_num=sheet_num,
        attempt_num=attempt_num,
        root_cause_confidence=result.confidence,
        secondary_error_count=len(result.secondary),
    )
to_dict
to_dict()

Convert to dictionary for logging/serialization.

Returns:

Type Description
dict[str, object]

Dictionary representation with all fields.

Source code in src/marianne/execution/retry_strategy.py
def to_dict(self) -> dict[str, object]:
    """Convert to dictionary for logging/serialization.

    Returns:
        Dictionary representation with all fields.
    """
    return {
        "timestamp": self.timestamp.isoformat(),
        "error_code": self.error_code.value,
        "category": self.category.value,
        "message": self.message,
        "exit_code": self.exit_code,
        "exit_signal": self.exit_signal,
        "retriable": self.retriable,
        "suggested_wait": self.suggested_wait,
        SHEET_NUM_KEY: self.sheet_num,
        "attempt_num": self.attempt_num,
        "root_cause_confidence": (
            round(self.root_cause_confidence, 3)
            if self.root_cause_confidence is not None
            else None
        ),
        "secondary_error_count": self.secondary_error_count,
    }

RetryPattern

Bases: str, Enum

Detected error patterns that influence retry strategy.

Each pattern triggers a different retry behavior to maximize the chance of recovery while minimizing wasted attempts.

Attributes
NONE class-attribute instance-attribute
NONE = 'none'

No clear pattern detected - use default retry behavior.

RAPID_FAILURES class-attribute instance-attribute
RAPID_FAILURES = 'rapid_failures'

Multiple failures in quick succession - needs longer cooldown.

REPEATED_ERROR_CODE class-attribute instance-attribute
REPEATED_ERROR_CODE = 'repeated_error_code'

Same error code appearing repeatedly - may be persistent issue.

RATE_LIMITED class-attribute instance-attribute
RATE_LIMITED = 'rate_limited'

Rate limiting detected - use rate limit wait time.

CASCADING_FAILURES class-attribute instance-attribute
CASCADING_FAILURES = 'cascading_failures'

Errors are getting worse/different - system may be degrading.

INTERMITTENT class-attribute instance-attribute
INTERMITTENT = 'intermittent'

Errors are spread out with successes in between - normal transient.

RECOVERY_IN_PROGRESS class-attribute instance-attribute
RECOVERY_IN_PROGRESS = 'recovery_in_progress'

Recent success after failures - system may be recovering.

RetryRecommendation dataclass

RetryRecommendation(should_retry, delay_seconds, reason, confidence, detected_pattern=NONE, strategy_used='default', root_cause_confidence=None)

Recommendation from the adaptive retry strategy.

Encapsulates the decision of whether to retry, how long to wait, and the reasoning behind the decision for observability.

Attributes:

Name Type Description
should_retry bool

Whether a retry should be attempted.

delay_seconds float

Recommended delay before retrying.

reason str

Human-readable explanation of the decision.

confidence float

Confidence in this recommendation (0.0-1.0).

detected_pattern RetryPattern

The pattern that influenced this decision.

strategy_used str

Name of the strategy/heuristic that was applied.

root_cause_confidence float | None

Confidence in root cause identification (0.0-1.0, None if N/A).

Functions
__post_init__
__post_init__()

Validate confidence is in valid range.

Source code in src/marianne/execution/retry_strategy.py
def __post_init__(self) -> None:
    """Validate confidence is in valid range."""
    if not 0.0 <= self.confidence <= 1.0:
        raise ValueError(f"confidence must be 0.0-1.0, got {self.confidence}")
    if self.delay_seconds < 0:
        raise ValueError(f"delay_seconds must be >= 0, got {self.delay_seconds}")
to_dict
to_dict()

Convert to dictionary for logging/serialization.

Returns:

Type Description
dict[str, object]

Dictionary representation with all fields.

Source code in src/marianne/execution/retry_strategy.py
def to_dict(self) -> dict[str, object]:
    """Convert to dictionary for logging/serialization.

    Returns:
        Dictionary representation with all fields.
    """
    return {
        "should_retry": self.should_retry,
        "delay_seconds": round(self.delay_seconds, 2),
        "reason": self.reason,
        "confidence": round(self.confidence, 3),
        "detected_pattern": self.detected_pattern.value,
        "strategy_used": self.strategy_used,
        "root_cause_confidence": (
            round(self.root_cause_confidence, 3)
            if self.root_cause_confidence is not None
            else None
        ),
    }

RetryStrategyConfig dataclass

RetryStrategyConfig(base_delay=10.0, max_delay=API_RATE_LIMIT, exponential_base=2.0, rapid_failure_window=60.0, rapid_failure_threshold=3, rapid_failure_multiplier=2.0, repeated_error_threshold=2, repeated_error_strategy_change_threshold=3, min_confidence=0.3, jitter_factor=0.25)

Configuration for the adaptive retry strategy.

All timing values are in seconds. Thresholds are tuned for typical Claude CLI execution patterns.

Attributes:

Name Type Description
base_delay float

Starting delay for exponential backoff.

max_delay float

Maximum delay cap.

exponential_base float

Multiplier for exponential backoff.

rapid_failure_window float

Window (seconds) to detect rapid failures.

rapid_failure_threshold int

Number of failures in window to trigger.

rapid_failure_multiplier float

Extra delay multiplier for rapid failures.

repeated_error_threshold int

Same error code count before flagging.

repeated_error_strategy_change_threshold int

Count before strategy change.

min_confidence float

Minimum confidence for retry recommendation.

jitter_factor float

Random jitter to add (0.0-1.0 of delay).

Functions
__post_init__
__post_init__()

Validate configuration values.

Source code in src/marianne/execution/retry_strategy.py
def __post_init__(self) -> None:
    """Validate configuration values."""
    if self.base_delay <= 0:
        raise ValueError("base_delay must be positive")
    if self.max_delay < self.base_delay:
        raise ValueError("max_delay must be >= base_delay")
    if self.exponential_base <= 1:
        raise ValueError("exponential_base must be > 1")
    if self.rapid_failure_window <= 0:
        raise ValueError("rapid_failure_window must be positive")
    if self.rapid_failure_threshold < 1:
        raise ValueError("rapid_failure_threshold must be >= 1")
    if not 0.0 <= self.jitter_factor <= 1.0:
        raise ValueError("jitter_factor must be between 0.0 and 1.0")
    if not 0.0 <= self.min_confidence <= 1.0:
        raise ValueError("min_confidence must be between 0.0 and 1.0")
    if self.repeated_error_threshold < 1:
        raise ValueError("repeated_error_threshold must be >= 1")
    if self.repeated_error_strategy_change_threshold < 1:
        raise ValueError("repeated_error_strategy_change_threshold must be >= 1")

FileModificationTracker

FileModificationTracker()

Tracks file mtimes before sheet execution for file_modified checks.

Source code in src/marianne/execution/validation/models.py
def __init__(self) -> None:
    self._mtimes: dict[str, float] = {}
Functions
snapshot
snapshot(paths)

Capture mtimes of files before sheet execution.

Source code in src/marianne/execution/validation/models.py
def snapshot(self, paths: list[Path]) -> None:
    """Capture mtimes of files before sheet execution."""
    for path in paths:
        path_str = str(path.resolve())
        if path.exists():
            self._mtimes[path_str] = path.stat().st_mtime
        else:
            self._mtimes[path_str] = 0.0
was_modified
was_modified(path)

Check if file was modified (or created) after snapshot.

Source code in src/marianne/execution/validation/models.py
def was_modified(self, path: Path) -> bool:
    """Check if file was modified (or created) after snapshot."""
    resolved = path.resolve()
    try:
        current_mtime = resolved.stat().st_mtime
    except (OSError, ValueError):
        return False
    original_mtime = self._mtimes.get(str(resolved), 0.0)
    return current_mtime > original_mtime
get_original_mtime
get_original_mtime(path)

Get the original mtime from snapshot.

Source code in src/marianne/execution/validation/models.py
def get_original_mtime(self, path: Path) -> float | None:
    """Get the original mtime from snapshot."""
    path_str = str(path.resolve())
    return self._mtimes.get(path_str)
clear
clear()

Clear all tracked mtimes.

Source code in src/marianne/execution/validation/models.py
def clear(self) -> None:
    """Clear all tracked mtimes."""
    self._mtimes.clear()

SheetValidationResult dataclass

SheetValidationResult(sheet_num, results, rules_checked=0)

Aggregate result of all validations for a sheet.

Attributes
all_passed property
all_passed

Check if all validations passed.

passed_count property
passed_count

Count of passed validations.

failed_count property
failed_count

Count of failed validations (excluding skipped).

skipped_count property
skipped_count

Count of skipped validations (due to staged fail-fast).

executed_count property
executed_count

Count of validations that actually executed (not skipped).

pass_percentage property
pass_percentage

Percentage of validations that passed.

executed_pass_percentage property
executed_pass_percentage

Percentage of EXECUTED validations that passed.

majority_passed property
majority_passed

Returns True if >50% of validations passed.

aggregate_confidence property
aggregate_confidence

Calculate weighted aggregate confidence across all validation results.

Functions
get_passed_rules
get_passed_rules()

Get rules that passed.

Source code in src/marianne/execution/validation/models.py
def get_passed_rules(self) -> list[ValidationRule]:
    """Get rules that passed."""
    return [result.rule for result in self.results if result.passed]
get_failed_rules
get_failed_rules()

Get rules that failed.

Source code in src/marianne/execution/validation/models.py
def get_failed_rules(self) -> list[ValidationRule]:
    """Get rules that failed."""
    return [result.rule for result in self.results if not result.passed]
get_passed_results
get_passed_results()

Get results that passed.

Source code in src/marianne/execution/validation/models.py
def get_passed_results(self) -> list[ValidationResult]:
    """Get results that passed."""
    return [result for result in self.results if result.passed]
get_failed_results
get_failed_results()

Get results that failed.

Source code in src/marianne/execution/validation/models.py
def get_failed_results(self) -> list[ValidationResult]:
    """Get results that failed."""
    return [result for result in self.results if not result.passed]
to_dict_list
to_dict_list()

Convert all results to serializable list.

Source code in src/marianne/execution/validation/models.py
def to_dict_list(self) -> list[ValidationDetailDict]:
    """Convert all results to serializable list."""
    return [result.to_dict() for result in self.results]
get_semantic_summary
get_semantic_summary()

Aggregate semantic information from failed validations.

Source code in src/marianne/execution/validation/models.py
def get_semantic_summary(self) -> dict[str, Any]:
    """Aggregate semantic information from failed validations."""
    category_counts: dict[str, int] = {}
    has_semantic_info = False

    for result in self.results:
        if not result.passed and result.failure_category:
            has_semantic_info = True
            category = result.failure_category
            category_counts[category] = category_counts.get(category, 0) + 1

    dominant_category: str | None = None
    if category_counts:
        dominant_category = max(category_counts, key=lambda k: category_counts[k])

    return {
        "category_counts": category_counts,
        "dominant_category": dominant_category,
        "has_semantic_info": has_semantic_info,
        "total_failures": self.failed_count,
    }
get_actionable_hints
get_actionable_hints(limit=3)

Extract actionable hints from failed validations.

Source code in src/marianne/execution/validation/models.py
def get_actionable_hints(self, limit: int = 3) -> list[str]:
    """Extract actionable hints from failed validations."""
    hints: list[str] = []
    seen: set[str] = set()

    for result in self.results:
        if not result.passed and result.suggested_fix:
            hint = result.suggested_fix
            if len(hint) > 100:
                hint = hint[:97] + "..."

            if hint not in seen:
                seen.add(hint)
                hints.append(hint)

            if len(hints) >= limit:
                break

    return hints

ValidationEngine

ValidationEngine(workspace, sheet_context)

Executes validation rules against sheet outputs.

Handles path template expansion and dispatches to type-specific validation methods.

Initialize validation engine.

Source code in src/marianne/execution/validation/engine.py
def __init__(self, workspace: Path, sheet_context: dict[str, Any]) -> None:
    """Initialize validation engine."""
    self.workspace = workspace.resolve()
    self.sheet_context = sheet_context
    self._mtime_tracker = FileModificationTracker()
Functions
expand_path
expand_path(path_template)

Expand path template with sheet context variables.

Supports: {sheet_num}, {workspace}, {start_item}, {end_item}

Both workspace-relative and absolute paths are allowed. Agents work in backend.working_directory (typically the project root) and create files there — restricting validations to the workspace directory would prevent checking those files.

Source code in src/marianne/execution/validation/engine.py
def expand_path(self, path_template: str) -> Path:
    """Expand path template with sheet context variables.

    Supports: {sheet_num}, {workspace}, {start_item}, {end_item}

    Both workspace-relative and absolute paths are allowed. Agents work
    in ``backend.working_directory`` (typically the project root) and
    create files there — restricting validations to the workspace
    directory would prevent checking those files.
    """
    context = dict(self.sheet_context)
    context["workspace"] = str(self.workspace)

    try:
        expanded = path_template.format(**context)
    except IndexError as exc:
        raise ValueError(
            f"Invalid path template '{path_template}': {exc}. "
            "Use named placeholders like {{workspace}}, not bare {{}}."
        ) from exc
    return Path(expanded).resolve()
snapshot_mtime_files
snapshot_mtime_files(rules)

Snapshot mtimes for all file_modified rules before sheet execution.

Source code in src/marianne/execution/validation/engine.py
def snapshot_mtime_files(self, rules: list[ValidationRule]) -> None:
    """Snapshot mtimes for all file_modified rules before sheet execution."""
    paths = [
        self.expand_path(r.path)
        for r in rules
        if r.type == "file_modified" and r.path
    ]
    self._mtime_tracker.snapshot(paths)
get_applicable_rules
get_applicable_rules(rules)

Get rules that apply to the current sheet context.

Source code in src/marianne/execution/validation/engine.py
def get_applicable_rules(
    self, rules: list[ValidationRule]
) -> list[ValidationRule]:
    """Get rules that apply to the current sheet context."""
    return [r for r in rules if self._check_condition(r.condition)]
run_validations async
run_validations(rules)

Execute all validation rules and return aggregate result.

Source code in src/marianne/execution/validation/engine.py
async def run_validations(self, rules: list[ValidationRule]) -> SheetValidationResult:
    """Execute all validation rules and return aggregate result."""
    applicable_rules = self.get_applicable_rules(rules)
    results: list[ValidationResult] = []

    for rule in applicable_rules:
        result = await self._run_single_validation(rule)
        results.append(result)

    return SheetValidationResult(
        sheet_num=self.sheet_context.get(SHEET_NUM_KEY, 0),
        results=results,
        rules_checked=len(applicable_rules),
    )
run_staged_validations async
run_staged_validations(rules)

Execute validations in stage order with fail-fast behavior.

Source code in src/marianne/execution/validation/engine.py
async def run_staged_validations(
    self, rules: list[ValidationRule]
) -> tuple[SheetValidationResult, int | None]:
    """Execute validations in stage order with fail-fast behavior."""
    applicable_rules = self.get_applicable_rules(rules)

    if not applicable_rules:
        return SheetValidationResult(
            sheet_num=self.sheet_context.get(SHEET_NUM_KEY, 0),
            results=[],
            rules_checked=0,
        ), None

    stages: dict[int, list[ValidationRule]] = defaultdict(list)
    for rule in applicable_rules:
        stages[rule.stage].append(rule)

    all_results: list[ValidationResult] = []
    failed_stage: int | None = None

    for stage_num in sorted(stages.keys()):
        stage_rules = stages[stage_num]
        stage_passed = True

        for rule in stage_rules:
            result = await self._run_single_validation(rule)
            all_results.append(result)
            if not result.passed:
                stage_passed = False

        if not stage_passed:
            failed_stage = stage_num
            self._mark_remaining_stages_skipped(
                stages, stage_num, all_results
            )
            break

    return SheetValidationResult(
        sheet_num=self.sheet_context.get(SHEET_NUM_KEY, 0),
        results=all_results,
        rules_checked=len(applicable_rules),
    ), failed_stage

ValidationResult dataclass

ValidationResult(rule, passed, actual_value=None, expected_value=None, error_message=None, checked_at=utc_now(), check_duration_ms=0.0, confidence=1.0, confidence_factors=dict(), failure_reason=None, failure_category=None, suggested_fix=None, error_type=None)

Result of a single validation check.

Attributes
confidence class-attribute instance-attribute
confidence = 1.0

Confidence in this validation result (0.0-1.0). Default 1.0 = fully confident.

confidence_factors class-attribute instance-attribute
confidence_factors = field(default_factory=dict)

Factors affecting confidence, e.g., {'file_age': 0.9, 'pattern_specificity': 0.8}.

failure_reason class-attribute instance-attribute
failure_reason = None

Semantic explanation of why validation failed.

failure_category class-attribute instance-attribute
failure_category = None

Category of failure: 'missing', 'malformed', 'incomplete', 'stale', 'error'.

suggested_fix class-attribute instance-attribute
suggested_fix = None

Hint for how to fix the issue.

error_type class-attribute instance-attribute
error_type = None

Distinguishes validation failures from validation crashes. None or 'validation_failure' = output didn't meet the rule. 'internal_error' = the validation check itself crashed.

Functions
to_dict
to_dict()

Convert to serializable dictionary.

Source code in src/marianne/execution/validation/models.py
def to_dict(self) -> ValidationDetailDict:
    """Convert to serializable dictionary."""
    return {
        "rule_type": self.rule.type,
        "description": self.rule.description,
        "path": self.rule.path,
        "pattern": self.rule.pattern,
        "passed": self.passed,
        "actual_value": self.actual_value,
        "expected_value": self.expected_value,
        "error_message": self.error_message,
        "checked_at": self.checked_at.isoformat(),
        "check_duration_ms": self.check_duration_ms,
        "confidence": self.confidence,
        "confidence_factors": self.confidence_factors,
        "failure_reason": self.failure_reason,
        "failure_category": self.failure_category,
        "suggested_fix": self.suggested_fix,
        "error_type": self.error_type,
    }
format_failure_summary
format_failure_summary()

Format failure information for prompt injection.

Source code in src/marianne/execution/validation/models.py
def format_failure_summary(self) -> str:
    """Format failure information for prompt injection."""
    if self.passed:
        return ""

    parts: list[str] = []
    if self.failure_category:
        parts.append(f"[{self.failure_category.upper()}]")
    if self.failure_reason:
        parts.append(self.failure_reason)
    if self.suggested_fix:
        parts.append(f"Fix: {self.suggested_fix}")

    return " ".join(parts)