Skip to content

retry_strategy

retry_strategy

Adaptive retry strategy with intelligent pattern detection.

Analyzes error history to make smart retry decisions, detecting patterns like: - Rapid consecutive failures → longer exponential backoff - Same error code repeated → different strategy (may be persistent issue) - Rate limits → use rate limit delay from error classification - Transient errors → standard retry with jitter

Example usage

from marianne.execution.retry_strategy import ( AdaptiveRetryStrategy, ErrorRecord, RetryRecommendation )

strategy = AdaptiveRetryStrategy()

Record errors as they occur

error_history: list[ErrorRecord] = [] error_history.append(ErrorRecord.from_classified_error(classified_error))

Get retry recommendation

recommendation = strategy.analyze(error_history) if recommendation.should_retry: await asyncio.sleep(recommendation.delay_seconds) # Retry the operation else: # Give up or escalate logger.error(f"Not retrying: {recommendation.reason}")

Attributes

Classes

RetryBehavior

Bases: NamedTuple

Precise retry behavior recommendation for a specific error code.

Unlike ErrorCategory which provides broad retry guidelines, RetryBehavior encodes error-code-specific knowledge about optimal retry strategies.

Attributes:

Name Type Description
delay_seconds float

Recommended delay before retrying (0 = no delay).

is_retriable bool

Whether this error is generally retriable.

reason str

Human-readable explanation for the retry behavior.

DelayOutcome dataclass

DelayOutcome(error_code, delay_seconds, succeeded_after, timestamp=(lambda: now(UTC))())

Record of a delay used and its outcome for learning.

Captures the relationship between delay duration and subsequent success/failure, enabling the system to learn optimal delays for each error type.

Attributes:

Name Type Description
error_code ErrorCode

The ErrorCode that triggered the retry.

delay_seconds float

The delay that was actually used before retrying.

succeeded_after bool

Whether the retry succeeded after this delay.

timestamp datetime

When this delay was recorded.

DelayHistory

DelayHistory(max_history=100)

Tracks historical delay outcomes for learning optimal delays.

Maintains a record of (error_code, delay, success) tuples to enable the system to learn which delays work best for each error type.

Thread-safe: Uses a threading.Lock to protect all mutable operations. Pruning maintains chronological order by sorting retained outcomes by timestamp after grouping by error code.

Initialize delay history.

Parameters:

Name Type Description Default
max_history int

Maximum number of outcomes to retain per error code.

100
Source code in src/marianne/execution/retry_strategy.py
def __init__(self, max_history: int = 100) -> None:
    """Initialize delay history.

    Args:
        max_history: Maximum number of outcomes to retain per error code.
    """
    self._history: list[DelayOutcome] = []
    self._max_history = max_history
    self._lock = threading.Lock()
Functions
record
record(outcome)

Record a delay outcome.

Thread-safe: Uses lock to protect append and pruning operations.

Parameters:

Name Type Description Default
outcome DelayOutcome

The delay outcome to record.

required

Raises:

Type Description
ValueError

If outcome is None or outcome.delay_seconds is negative.

Source code in src/marianne/execution/retry_strategy.py
def record(self, outcome: DelayOutcome) -> None:
    """Record a delay outcome.

    Thread-safe: Uses lock to protect append and pruning operations.

    Args:
        outcome: The delay outcome to record.

    Raises:
        ValueError: If outcome is None or outcome.delay_seconds is negative.
    """
    if outcome is None:
        raise ValueError("outcome cannot be None")
    if outcome.delay_seconds < 0:
        raise ValueError(f"delay_seconds must be >= 0, got {outcome.delay_seconds}")

    with self._lock:
        self._history.append(outcome)

        # Prune old history if needed (keep most recent per error code)
        if len(self._history) > self._max_history * 10:
            # Keep last N for each error code
            from collections import defaultdict

            by_code: defaultdict[ErrorCode, list[DelayOutcome]] = defaultdict(list)
            for delay_outcome in self._history:
                by_code[delay_outcome.error_code].append(delay_outcome)

            self._history = []
            for outcomes in by_code.values():
                self._history.extend(outcomes[-self._max_history :])

            # Restore chronological order after grouping by error code
            self._history.sort(key=lambda delay_outcome: delay_outcome.timestamp)
query_for_error_code
query_for_error_code(code)

Query outcomes for a specific error code.

Parameters:

Name Type Description Default
code ErrorCode

The error code to query.

required

Returns:

Type Description
list[DelayOutcome]

List of DelayOutcome for this error code.

Source code in src/marianne/execution/retry_strategy.py
def query_for_error_code(self, code: ErrorCode) -> list[DelayOutcome]:
    """Query outcomes for a specific error code.

    Args:
        code: The error code to query.

    Returns:
        List of DelayOutcome for this error code.
    """
    return [
        delay_outcome for delay_outcome in self._history
        if delay_outcome.error_code == code
    ]
get_average_successful_delay
get_average_successful_delay(code)

Get average delay that led to success for an error code.

Parameters:

Name Type Description Default
code ErrorCode

The error code to query.

required

Returns:

Type Description
float | None

Average successful delay in seconds, or None if no successful samples.

Source code in src/marianne/execution/retry_strategy.py
def get_average_successful_delay(self, code: ErrorCode) -> float | None:
    """Get average delay that led to success for an error code.

    Args:
        code: The error code to query.

    Returns:
        Average successful delay in seconds, or None if no successful samples.
    """
    successful = [
        delay_outcome for delay_outcome in self._history
        if delay_outcome.error_code == code and delay_outcome.succeeded_after
    ]
    if not successful:
        return None
    return sum(d.delay_seconds for d in successful) / len(successful)
get_sample_count
get_sample_count(code)

Get number of samples for an error code.

Parameters:

Name Type Description Default
code ErrorCode

The error code to query.

required

Returns:

Type Description
int

Number of delay outcomes recorded for this code.

Source code in src/marianne/execution/retry_strategy.py
def get_sample_count(self, code: ErrorCode) -> int:
    """Get number of samples for an error code.

    Args:
        code: The error code to query.

    Returns:
        Number of delay outcomes recorded for this code.
    """
    return sum(
        1 for delay_outcome in self._history
        if delay_outcome.error_code == code
    )

RetryPattern

Bases: str, Enum

Detected error patterns that influence retry strategy.

Each pattern triggers a different retry behavior to maximize the chance of recovery while minimizing wasted attempts.

Attributes
NONE class-attribute instance-attribute
NONE = 'none'

No clear pattern detected - use default retry behavior.

RAPID_FAILURES class-attribute instance-attribute
RAPID_FAILURES = 'rapid_failures'

Multiple failures in quick succession - needs longer cooldown.

REPEATED_ERROR_CODE class-attribute instance-attribute
REPEATED_ERROR_CODE = 'repeated_error_code'

Same error code appearing repeatedly - may be persistent issue.

RATE_LIMITED class-attribute instance-attribute
RATE_LIMITED = 'rate_limited'

Rate limiting detected - use rate limit wait time.

CASCADING_FAILURES class-attribute instance-attribute
CASCADING_FAILURES = 'cascading_failures'

Errors are getting worse/different - system may be degrading.

INTERMITTENT class-attribute instance-attribute
INTERMITTENT = 'intermittent'

Errors are spread out with successes in between - normal transient.

RECOVERY_IN_PROGRESS class-attribute instance-attribute
RECOVERY_IN_PROGRESS = 'recovery_in_progress'

Recent success after failures - system may be recovering.

ErrorRecord dataclass

ErrorRecord(timestamp, error_code, category, message, exit_code=None, exit_signal=None, retriable=True, suggested_wait=None, sheet_num=None, attempt_num=1, monotonic_time=monotonic(), root_cause_confidence=None, secondary_error_count=0)

Record of a single error occurrence for pattern analysis.

Captures all relevant information about an error to enable intelligent pattern detection across multiple errors.

Attributes:

Name Type Description
timestamp datetime

When the error occurred (UTC).

error_code ErrorCode

Structured error code (e.g., E001, E101).

category ErrorCategory

High-level error category (rate_limit, transient, etc.).

message str

Human-readable error description.

exit_code int | None

Process exit code if applicable.

exit_signal int | None

Signal number if killed by signal.

retriable bool

Whether this specific error is retriable.

suggested_wait float | None

Classifier's suggested wait time in seconds.

sheet_num int | None

Sheet number where error occurred.

attempt_num int

Which attempt number this was (1-indexed).

monotonic_time float

Monotonic timestamp for precise timing calculations.

root_cause_confidence float | None

Confidence in root cause identification (0.0-1.0).

secondary_error_count int

Number of secondary errors detected.

Functions
from_classified_error classmethod
from_classified_error(error, sheet_num=None, attempt_num=1)

Create an ErrorRecord from a ClassifiedError.

This is the primary factory method for creating ErrorRecords in the retry flow.

Parameters:

Name Type Description Default
error ClassifiedError

ClassifiedError from the error classifier.

required
sheet_num int | None

Optional sheet number for context.

None
attempt_num int

Which retry attempt this represents.

1

Returns:

Type Description
ErrorRecord

ErrorRecord populated from the classified error.

Source code in src/marianne/execution/retry_strategy.py
@classmethod
def from_classified_error(
    cls,
    error: ClassifiedError,
    sheet_num: int | None = None,
    attempt_num: int = 1,
) -> ErrorRecord:
    """Create an ErrorRecord from a ClassifiedError.

    This is the primary factory method for creating ErrorRecords
    in the retry flow.

    Args:
        error: ClassifiedError from the error classifier.
        sheet_num: Optional sheet number for context.
        attempt_num: Which retry attempt this represents.

    Returns:
        ErrorRecord populated from the classified error.
    """
    return cls(
        timestamp=datetime.now(UTC),
        error_code=error.error_code,
        category=error.category,
        message=error.message,
        exit_code=error.exit_code,
        exit_signal=error.exit_signal,
        retriable=error.retriable,
        suggested_wait=error.suggested_wait_seconds,
        sheet_num=sheet_num,
        attempt_num=attempt_num,
    )
from_classification_result classmethod
from_classification_result(result, sheet_num=None, attempt_num=1)

Create an ErrorRecord from a ClassificationResult.

This factory method captures root cause information from the multi-error classification, including confidence in root cause identification and the count of secondary errors. This enables the retry strategy to consider root cause confidence when making retry decisions.

Parameters:

Name Type Description Default
result ClassificationResult

ClassificationResult from classify_execution().

required
sheet_num int | None

Optional sheet number for context.

None
attempt_num int

Which retry attempt this represents.

1

Returns:

Type Description
ErrorRecord

ErrorRecord with root cause confidence and secondary error count.

Raises:

Type Description
ValueError

If confidence is not in valid range [0.0, 1.0].

Source code in src/marianne/execution/retry_strategy.py
@classmethod
def from_classification_result(
    cls,
    result: ClassificationResult,
    sheet_num: int | None = None,
    attempt_num: int = 1,
) -> ErrorRecord:
    """Create an ErrorRecord from a ClassificationResult.

    This factory method captures root cause information from the multi-error
    classification, including confidence in root cause identification and
    the count of secondary errors. This enables the retry strategy to
    consider root cause confidence when making retry decisions.

    Args:
        result: ClassificationResult from classify_execution().
        sheet_num: Optional sheet number for context.
        attempt_num: Which retry attempt this represents.

    Returns:
        ErrorRecord with root cause confidence and secondary error count.

    Raises:
        ValueError: If confidence is not in valid range [0.0, 1.0].
    """
    # Validate confidence is in valid range (defensive check)
    if not 0.0 <= result.confidence <= 1.0:
        raise ValueError(
            f"root_cause_confidence must be 0.0-1.0, got {result.confidence}"
        )

    primary = result.primary
    return cls(
        timestamp=datetime.now(UTC),
        error_code=primary.error_code,
        category=primary.category,
        message=primary.message,
        exit_code=primary.exit_code,
        exit_signal=primary.exit_signal,
        retriable=primary.retriable,
        suggested_wait=primary.suggested_wait_seconds,
        sheet_num=sheet_num,
        attempt_num=attempt_num,
        root_cause_confidence=result.confidence,
        secondary_error_count=len(result.secondary),
    )
to_dict
to_dict()

Convert to dictionary for logging/serialization.

Returns:

Type Description
dict[str, object]

Dictionary representation with all fields.

Source code in src/marianne/execution/retry_strategy.py
def to_dict(self) -> dict[str, object]:
    """Convert to dictionary for logging/serialization.

    Returns:
        Dictionary representation with all fields.
    """
    return {
        "timestamp": self.timestamp.isoformat(),
        "error_code": self.error_code.value,
        "category": self.category.value,
        "message": self.message,
        "exit_code": self.exit_code,
        "exit_signal": self.exit_signal,
        "retriable": self.retriable,
        "suggested_wait": self.suggested_wait,
        SHEET_NUM_KEY: self.sheet_num,
        "attempt_num": self.attempt_num,
        "root_cause_confidence": (
            round(self.root_cause_confidence, 3)
            if self.root_cause_confidence is not None
            else None
        ),
        "secondary_error_count": self.secondary_error_count,
    }

RetryRecommendation dataclass

RetryRecommendation(should_retry, delay_seconds, reason, confidence, detected_pattern=NONE, strategy_used='default', root_cause_confidence=None)

Recommendation from the adaptive retry strategy.

Encapsulates the decision of whether to retry, how long to wait, and the reasoning behind the decision for observability.

Attributes:

Name Type Description
should_retry bool

Whether a retry should be attempted.

delay_seconds float

Recommended delay before retrying.

reason str

Human-readable explanation of the decision.

confidence float

Confidence in this recommendation (0.0-1.0).

detected_pattern RetryPattern

The pattern that influenced this decision.

strategy_used str

Name of the strategy/heuristic that was applied.

root_cause_confidence float | None

Confidence in root cause identification (0.0-1.0, None if N/A).

Functions
__post_init__
__post_init__()

Validate confidence is in valid range.

Source code in src/marianne/execution/retry_strategy.py
def __post_init__(self) -> None:
    """Validate confidence is in valid range."""
    if not 0.0 <= self.confidence <= 1.0:
        raise ValueError(f"confidence must be 0.0-1.0, got {self.confidence}")
    if self.delay_seconds < 0:
        raise ValueError(f"delay_seconds must be >= 0, got {self.delay_seconds}")
to_dict
to_dict()

Convert to dictionary for logging/serialization.

Returns:

Type Description
dict[str, object]

Dictionary representation with all fields.

Source code in src/marianne/execution/retry_strategy.py
def to_dict(self) -> dict[str, object]:
    """Convert to dictionary for logging/serialization.

    Returns:
        Dictionary representation with all fields.
    """
    return {
        "should_retry": self.should_retry,
        "delay_seconds": round(self.delay_seconds, 2),
        "reason": self.reason,
        "confidence": round(self.confidence, 3),
        "detected_pattern": self.detected_pattern.value,
        "strategy_used": self.strategy_used,
        "root_cause_confidence": (
            round(self.root_cause_confidence, 3)
            if self.root_cause_confidence is not None
            else None
        ),
    }

RetryStrategyConfig dataclass

RetryStrategyConfig(base_delay=10.0, max_delay=API_RATE_LIMIT, exponential_base=2.0, rapid_failure_window=60.0, rapid_failure_threshold=3, rapid_failure_multiplier=2.0, repeated_error_threshold=2, repeated_error_strategy_change_threshold=3, min_confidence=0.3, jitter_factor=0.25)

Configuration for the adaptive retry strategy.

All timing values are in seconds. Thresholds are tuned for typical Claude CLI execution patterns.

Attributes:

Name Type Description
base_delay float

Starting delay for exponential backoff.

max_delay float

Maximum delay cap.

exponential_base float

Multiplier for exponential backoff.

rapid_failure_window float

Window (seconds) to detect rapid failures.

rapid_failure_threshold int

Number of failures in window to trigger.

rapid_failure_multiplier float

Extra delay multiplier for rapid failures.

repeated_error_threshold int

Same error code count before flagging.

repeated_error_strategy_change_threshold int

Count before strategy change.

min_confidence float

Minimum confidence for retry recommendation.

jitter_factor float

Random jitter to add (0.0-1.0 of delay).

Functions
__post_init__
__post_init__()

Validate configuration values.

Source code in src/marianne/execution/retry_strategy.py
def __post_init__(self) -> None:
    """Validate configuration values."""
    if self.base_delay <= 0:
        raise ValueError("base_delay must be positive")
    if self.max_delay < self.base_delay:
        raise ValueError("max_delay must be >= base_delay")
    if self.exponential_base <= 1:
        raise ValueError("exponential_base must be > 1")
    if self.rapid_failure_window <= 0:
        raise ValueError("rapid_failure_window must be positive")
    if self.rapid_failure_threshold < 1:
        raise ValueError("rapid_failure_threshold must be >= 1")
    if not 0.0 <= self.jitter_factor <= 1.0:
        raise ValueError("jitter_factor must be between 0.0 and 1.0")
    if not 0.0 <= self.min_confidence <= 1.0:
        raise ValueError("min_confidence must be between 0.0 and 1.0")
    if self.repeated_error_threshold < 1:
        raise ValueError("repeated_error_threshold must be >= 1")
    if self.repeated_error_strategy_change_threshold < 1:
        raise ValueError("repeated_error_strategy_change_threshold must be >= 1")

LearnedDelayCircuitBreaker

LearnedDelayCircuitBreaker()

Protects learned delays from bad outcomes via a 3-strike rule.

Tracks consecutive failures per error code when using learned delays. After 3+ consecutive failures, the breaker "opens" and the system reverts to static delays for that error code.

State is intentionally ephemeral (not persisted) — a fresh AdaptiveRetryStrategy gets a clean breaker so it can re-evaluate.

Source code in src/marianne/execution/retry_strategy.py
def __init__(self) -> None:
    self._failures: dict[ErrorCode, int] = {}
    self._enabled: dict[ErrorCode, bool] = {}
Functions
is_enabled
is_enabled(error_code)

Check if learned delays are enabled for this error code.

Source code in src/marianne/execution/retry_strategy.py
def is_enabled(self, error_code: ErrorCode) -> bool:
    """Check if learned delays are enabled for this error code."""
    return self._enabled.get(error_code, True)
record_outcome
record_outcome(error_code, succeeded)

Record a retry outcome; trip breaker after consecutive failures.

Source code in src/marianne/execution/retry_strategy.py
def record_outcome(self, error_code: ErrorCode, succeeded: bool) -> None:
    """Record a retry outcome; trip breaker after consecutive failures."""
    if not self.is_enabled(error_code):
        return

    if succeeded:
        self._failures[error_code] = 0
        return

    failures = self._failures.get(error_code, 0) + 1
    self._failures[error_code] = failures

    if failures > self.FAILURE_THRESHOLD:
        self._enabled[error_code] = False
        _logger.warning(
            "circuit_breaker.triggered",
            error_code=error_code.value,
            consecutive_failures=failures,
            message="Reverting to static delay for this error code",
        )
reset
reset(error_code)

Reset breaker for an error code, re-enabling learned delays.

Source code in src/marianne/execution/retry_strategy.py
def reset(self, error_code: ErrorCode) -> None:
    """Reset breaker for an error code, re-enabling learned delays."""
    self._enabled[error_code] = True
    self._failures[error_code] = 0
    _logger.info(
        "circuit_breaker.reset",
        error_code=error_code.value,
        message="Circuit breaker reset, learned delays re-enabled",
    )

AdaptiveRetryStrategy

AdaptiveRetryStrategy(config=None, delay_history=None, global_learning_store=None)

Intelligent retry strategy that analyzes error patterns.

The strategy examines error history to detect patterns and make informed retry decisions. Key features:

  1. Rapid Failure Detection: If multiple errors occur in a short window, applies longer backoff to avoid overwhelming the system.

  2. Repeated Error Detection: If the same error code appears repeatedly, may recommend different strategies or lower confidence.

  3. Rate Limit Handling: Uses suggested wait times from rate limit errors, with additional buffer.

  4. Cascading Failure Detection: If errors are getting different/worse, may recommend stopping to prevent further damage.

  5. Recovery Detection: If recent attempts succeeded after failures, uses shorter delays to capitalize on recovery.

  6. Delay Learning with Circuit Breaker: When a DelayHistory is provided, the strategy learns optimal delays from past outcomes. A circuit breaker protects against bad learned delays by reverting to static delays after 3 consecutive failures.

Circuit Breaker State Design

The circuit breaker state (_learned_delay_failures, _use_learned_delay) is intentionally ephemeral and NOT persisted. This is a deliberate design choice with the following trade-offs:

Benefits: - After restart, the system gets a "fresh start" to try learned delays - Avoids persisting potentially stale circuit breaker state - Simple implementation without additional state management

Trade-offs: - After restart, may retry with a previously-failed learned delay once - Circuit breaker will re-trigger after 3 failures if the learned delay is still problematic

The DelayHistory itself CAN be persisted (it's just delay outcomes), but the circuit breaker resets on each AdaptiveRetryStrategy instantiation. Use reset_circuit_breaker() to manually reset circuit breaker state for a specific error code during runtime.

Thread-safe: No mutable state; all analysis is based on input history.

Example

strategy = AdaptiveRetryStrategy()

Analyze error history

recommendation = strategy.analyze(error_history)

Log the decision

logger.info( "retry_decision", should_retry=recommendation.should_retry, delay=recommendation.delay_seconds, pattern=recommendation.detected_pattern.value, reason=recommendation.reason, )

Initialize the adaptive retry strategy.

Parameters:

Name Type Description Default
config RetryStrategyConfig | None

Optional configuration. Uses defaults if not provided.

None
delay_history DelayHistory | None

Optional delay history for learning. If not provided, learning features are disabled (purely static delays).

None
global_learning_store GlobalLearningStore | None

Optional global learning store for cross-workspace learned delays (Evolution #3: Learned Wait Time Injection). If provided, blend_historical_delay() will query global store for cross-workspace learned delays when in-memory history is insufficient.

None
Source code in src/marianne/execution/retry_strategy.py
def __init__(
    self,
    config: RetryStrategyConfig | None = None,
    delay_history: DelayHistory | None = None,
    global_learning_store: GlobalLearningStore | None = None,
) -> None:
    """Initialize the adaptive retry strategy.

    Args:
        config: Optional configuration. Uses defaults if not provided.
        delay_history: Optional delay history for learning. If not provided,
            learning features are disabled (purely static delays).
        global_learning_store: Optional global learning store for cross-workspace
            learned delays (Evolution #3: Learned Wait Time Injection).
            If provided, blend_historical_delay() will query global store
            for cross-workspace learned delays when in-memory history is
            insufficient.
    """
    self.config = config or RetryStrategyConfig()
    self._delay_history = delay_history
    self._global_store = global_learning_store

    # Circuit breaker: protects against bad learned delays.
    # See class docstring "Circuit Breaker State Design" for rationale.
    self._circuit_breaker = LearnedDelayCircuitBreaker()

    # Backward-compatible aliases for any direct access in tests
    self._learned_delay_failures = self._circuit_breaker._failures
    self._use_learned_delay = self._circuit_breaker._enabled
Functions
analyze
analyze(error_history, max_retries=None)

Analyze error history and recommend retry behavior.

This is the main entry point for the adaptive retry strategy. It examines the error history to detect patterns and returns a recommendation with reasoning.

Parameters:

Name Type Description Default
error_history list[ErrorRecord]

List of ErrorRecords in chronological order.

required
max_retries int | None

Optional maximum retries to consider (for confidence).

None

Returns:

Type Description
RetryRecommendation

RetryRecommendation with decision, delay, and reasoning.

Source code in src/marianne/execution/retry_strategy.py
def analyze(
    self,
    error_history: list[ErrorRecord],
    max_retries: int | None = None,
) -> RetryRecommendation:
    """Analyze error history and recommend retry behavior.

    This is the main entry point for the adaptive retry strategy.
    It examines the error history to detect patterns and returns
    a recommendation with reasoning.

    Args:
        error_history: List of ErrorRecords in chronological order.
        max_retries: Optional maximum retries to consider (for confidence).

    Returns:
        RetryRecommendation with decision, delay, and reasoning.
    """
    if not error_history:
        # No errors - this shouldn't happen, but handle gracefully
        return RetryRecommendation(
            should_retry=True,
            delay_seconds=self.config.base_delay,
            reason="No error history - using default retry",
            confidence=0.5,
            detected_pattern=RetryPattern.NONE,
            strategy_used="default",
        )

    # Get the most recent error
    latest_error = error_history[-1]
    attempt_count = len(error_history)

    # Check for non-retriable error first
    if not latest_error.retriable:
        return self._recommend_no_retry(
            latest_error,
            "Error is not retriable",
            confidence=0.95,
            pattern=RetryPattern.NONE,
        )

    # Detect patterns in the error history
    pattern = self._detect_pattern(error_history)

    # Get recommendation based on pattern
    recommendation = self._recommend_for_pattern(
        pattern=pattern,
        error_history=error_history,
        latest_error=latest_error,
        attempt_count=attempt_count,
        max_retries=max_retries,
    )

    # Propagate root cause confidence from latest error to recommendation
    recommendation.root_cause_confidence = latest_error.root_cause_confidence

    # Log the decision including root cause confidence
    _logger.info(
        "retry_strategy.decision",
        should_retry=recommendation.should_retry,
        delay_seconds=round(recommendation.delay_seconds, 2),
        confidence=round(recommendation.confidence, 3),
        detected_pattern=pattern.value,
        strategy_used=recommendation.strategy_used,
        attempt_count=attempt_count,
        latest_error_code=latest_error.error_code.value,
        reason=recommendation.reason,
        root_cause_confidence=(
            round(latest_error.root_cause_confidence, 3)
            if latest_error.root_cause_confidence is not None
            else None
        ),
        secondary_error_count=latest_error.secondary_error_count,
    )

    return recommendation
blend_historical_delay
blend_historical_delay(error_code, static_delay)

Blend learned delay with static delay for an error code.

Priority order: 1. Circuit breaker override → static 2. In-memory delay history (job-specific learning) 3. Global learning store (cross-workspace learned delays) 4. Static delay (fallback)

Parameters:

Name Type Description Default
error_code ErrorCode

The error code to get delay for.

required
static_delay float

The static delay from ErrorCode.get_retry_behavior().

required

Returns:

Type Description
tuple[float, str]

Tuple of (blended_delay, strategy_name).

Source code in src/marianne/execution/retry_strategy.py
def blend_historical_delay(
    self,
    error_code: ErrorCode,
    static_delay: float,
) -> tuple[float, str]:
    """Blend learned delay with static delay for an error code.

    Priority order:
    1. Circuit breaker override → static
    2. In-memory delay history (job-specific learning)
    3. Global learning store (cross-workspace learned delays)
    4. Static delay (fallback)

    Args:
        error_code: The error code to get delay for.
        static_delay: The static delay from ErrorCode.get_retry_behavior().

    Returns:
        Tuple of (blended_delay, strategy_name).
    """
    if not self._circuit_breaker.is_enabled(error_code):
        return static_delay, "static_circuit_breaker"

    result = self._try_inmemory_delay(error_code, static_delay)
    if result is not None:
        return result

    result = self._try_global_delay(error_code, static_delay)
    if result is not None:
        return result

    # Fallback: distinguish bootstrap phase from no-history
    if self._delay_history is not None:
        return static_delay, "static_bootstrap"
    return static_delay, "static"
record_delay_outcome
record_delay_outcome(error_code, delay_used, succeeded)

Record the outcome of a retry delay for learning.

Should be called after each retry attempt to update the delay history. Also updates circuit breaker state.

Parameters:

Name Type Description Default
error_code ErrorCode

The error code that was being retried.

required
delay_used float

The delay in seconds that was used.

required
succeeded bool

Whether the retry succeeded after this delay.

required
Source code in src/marianne/execution/retry_strategy.py
def record_delay_outcome(
    self,
    error_code: ErrorCode,
    delay_used: float,
    succeeded: bool,
) -> None:
    """Record the outcome of a retry delay for learning.

    Should be called after each retry attempt to update the delay history.
    Also updates circuit breaker state.

    Args:
        error_code: The error code that was being retried.
        delay_used: The delay in seconds that was used.
        succeeded: Whether the retry succeeded after this delay.
    """
    if self._delay_history is None:
        return

    # Record the outcome
    outcome = DelayOutcome(
        error_code=error_code,
        delay_seconds=delay_used,
        succeeded_after=succeeded,
    )
    self._delay_history.record(outcome)

    # Update circuit breaker state
    self._circuit_breaker.record_outcome(error_code, succeeded)
reset_circuit_breaker
reset_circuit_breaker(error_code)

Reset circuit breaker for an error code, re-enabling learned delays.

Call this method when you want to give learned delays another chance after the circuit breaker has tripped. Common scenarios:

  • After manual intervention that fixed the underlying issue
  • After a cooling-off period with successful static delays
  • At the start of a new batch/job where conditions may have changed

Note: The circuit breaker state is ephemeral (not persisted), so it automatically resets when a new AdaptiveRetryStrategy is instantiated. This method is for resetting during runtime without reinstantiation.

Parameters:

Name Type Description Default
error_code ErrorCode

The error code to reset circuit breaker for.

required
Example
After manual fix, give learned delays another chance

strategy.reset_circuit_breaker(ErrorCode.E101)

Source code in src/marianne/execution/retry_strategy.py
def reset_circuit_breaker(self, error_code: ErrorCode) -> None:
    """Reset circuit breaker for an error code, re-enabling learned delays.

    Call this method when you want to give learned delays another chance
    after the circuit breaker has tripped. Common scenarios:

    - After manual intervention that fixed the underlying issue
    - After a cooling-off period with successful static delays
    - At the start of a new batch/job where conditions may have changed

    Note: The circuit breaker state is ephemeral (not persisted), so it
    automatically resets when a new AdaptiveRetryStrategy is instantiated.
    This method is for resetting during runtime without reinstantiation.

    Args:
        error_code: The error code to reset circuit breaker for.

    Example:
        # After manual fix, give learned delays another chance
        strategy.reset_circuit_breaker(ErrorCode.E101)
    """
    self._circuit_breaker.reset(error_code)

Functions