Index
execution
¶
Execution layer for Marianne jobs.
Contains validation, retry logic, circuit breaker, and adaptive retry strategy.
Classes¶
FatalError
¶
Bases: Exception
Non-recoverable error that should stop the job.
SheetExecutionMode
¶
Bases: str, Enum
Mode of sheet execution.
CircuitBreaker
¶
Circuit breaker for preventing cascading failures.
The circuit breaker monitors execution success/failure and automatically blocks further requests when a failure threshold is exceeded. This prevents overwhelming a failing service and gives it time to recover.
Async-safe: All state modifications are protected by an asyncio.Lock.
Attributes:
| Name | Type | Description |
|---|---|---|
failure_threshold |
int
|
Number of consecutive failures before opening circuit. |
recovery_timeout |
float
|
Seconds to wait before testing recovery (OPEN -> HALF_OPEN). |
state |
float
|
Current circuit state. |
stats |
float
|
Statistics about circuit breaker behavior. |
Initialize circuit breaker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
failure_threshold
|
int
|
Number of consecutive failures before opening the circuit. Default is 5. |
5
|
recovery_timeout
|
float
|
Seconds to wait in OPEN state before transitioning to HALF_OPEN to test recovery. Default is 300 (5 minutes). |
300.0
|
name
|
str
|
Name for this circuit breaker (used in logging). |
'default'
|
Source code in src/marianne/execution/circuit_breaker.py
Attributes¶
failure_threshold
property
¶
Number of consecutive failures before opening circuit.
Functions¶
get_state
async
¶
Get the current circuit state.
This method handles automatic state transitions: - If OPEN and recovery_timeout has elapsed, transitions to HALF_OPEN.
Returns:
| Type | Description |
|---|---|
CircuitState
|
Current CircuitState. |
Source code in src/marianne/execution/circuit_breaker.py
can_execute
async
¶
Check if a request can be executed.
Returns True if: - Circuit is CLOSED (normal operation) - Circuit is HALF_OPEN (testing recovery) - Circuit is OPEN but recovery_timeout has elapsed (transitions to HALF_OPEN)
Returns False if: - Circuit is OPEN and recovery_timeout hasn't elapsed
Returns:
| Type | Description |
|---|---|
bool
|
True if the request should be allowed, False if it should be blocked. |
Source code in src/marianne/execution/circuit_breaker.py
record_success
async
¶
Record a successful operation.
Effects by state: - CLOSED: Resets consecutive failure count - HALF_OPEN: Transitions to CLOSED (recovery confirmed) - OPEN: No effect (shouldn't happen - request blocked)
Source code in src/marianne/execution/circuit_breaker.py
record_failure
async
¶
Record a failed operation.
Effects by state: - CLOSED: Increments failure count, may transition to OPEN - HALF_OPEN: Transitions to OPEN (recovery failed) - OPEN: No effect (shouldn't happen - request blocked)
Source code in src/marianne/execution/circuit_breaker.py
time_until_retry
async
¶
Get time remaining until retry is allowed.
Returns:
| Type | Description |
|---|---|
float | None
|
Seconds until the circuit transitions to HALF_OPEN, or None if |
float | None
|
the circuit is not OPEN. |
Source code in src/marianne/execution/circuit_breaker.py
record_cost
async
¶
Record token usage and estimated cost from an execution.
Updates running totals for cost tracking. Call this after each successful or failed execution that consumed tokens.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_tokens
|
int
|
Number of input tokens consumed. |
required |
output_tokens
|
int
|
Number of output tokens consumed. |
required |
estimated_cost
|
float
|
Estimated cost in USD for this execution. |
required |
Source code in src/marianne/execution/circuit_breaker.py
check_cost_threshold
async
¶
Check if total estimated cost exceeds a threshold.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_cost
|
float
|
Maximum allowed cost in USD. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if threshold is exceeded (should stop), False otherwise. |
Source code in src/marianne/execution/circuit_breaker.py
get_stats
async
¶
Get current statistics.
Returns:
| Type | Description |
|---|---|
CircuitBreakerStats
|
Copy of current CircuitBreakerStats. |
Source code in src/marianne/execution/circuit_breaker.py
reset
async
¶
Reset the circuit breaker to initial state.
This resets: - State to CLOSED - Failure counts to 0 - Last failure time to None
Statistics are NOT reset (use get_stats() to view history).
Source code in src/marianne/execution/circuit_breaker.py
force_open
async
¶
Force the circuit to OPEN state.
Useful for manual intervention or testing.
Source code in src/marianne/execution/circuit_breaker.py
force_close
async
¶
Force the circuit to CLOSED state.
Useful for manual intervention or testing. Also resets failure counts.
Source code in src/marianne/execution/circuit_breaker.py
__repr__
¶
Get string representation of circuit breaker.
CircuitBreakerStats
dataclass
¶
CircuitBreakerStats(total_successes=0, total_failures=0, times_opened=0, times_half_opened=0, times_closed=0, last_failure_at=None, last_state_change_at=None, consecutive_failures=0, total_input_tokens=0, total_output_tokens=0, total_estimated_cost=0.0)
Statistics for circuit breaker monitoring.
Provides visibility into the circuit breaker's behavior for observability and debugging, including cost tracking.
Attributes¶
total_successes
class-attribute
instance-attribute
¶
Total number of successful operations recorded.
total_failures
class-attribute
instance-attribute
¶
Total number of failed operations recorded.
times_opened
class-attribute
instance-attribute
¶
Number of times the circuit has transitioned to OPEN state.
times_half_opened
class-attribute
instance-attribute
¶
Number of times the circuit has transitioned to HALF_OPEN state.
times_closed
class-attribute
instance-attribute
¶
Number of times the circuit has transitioned to CLOSED from another state.
last_failure_at
class-attribute
instance-attribute
¶
Timestamp of the most recent failure (monotonic time).
last_state_change_at
class-attribute
instance-attribute
¶
Timestamp of the most recent state transition.
consecutive_failures
class-attribute
instance-attribute
¶
Current count of consecutive failures (resets on success).
total_input_tokens
class-attribute
instance-attribute
¶
Total input tokens consumed across all executions.
total_output_tokens
class-attribute
instance-attribute
¶
Total output tokens consumed across all executions.
total_estimated_cost
class-attribute
instance-attribute
¶
Total estimated cost in USD across all executions.
Functions¶
to_dict
¶
Convert stats to dictionary for logging/serialization.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of all statistics. |
Source code in src/marianne/execution/circuit_breaker.py
CircuitState
¶
Bases: str, Enum
State of the circuit breaker.
The circuit breaker transitions between these states based on success/failure patterns:
- CLOSED: Normal operation. Failures are tracked but requests are allowed.
- OPEN: Blocking mode. Requests are rejected until recovery_timeout elapses.
- HALF_OPEN: Testing mode. A single request is allowed to test recovery.
Attributes¶
CLOSED
class-attribute
instance-attribute
¶
Normal operation - requests are allowed and failures are tracked.
OPEN
class-attribute
instance-attribute
¶
Blocking calls - requests are rejected, waiting for recovery timeout.
HALF_OPEN
class-attribute
instance-attribute
¶
Testing recovery - one request is allowed to test if service recovered.
AdaptiveRetryStrategy
¶
Intelligent retry strategy that analyzes error patterns.
The strategy examines error history to detect patterns and make informed retry decisions. Key features:
-
Rapid Failure Detection: If multiple errors occur in a short window, applies longer backoff to avoid overwhelming the system.
-
Repeated Error Detection: If the same error code appears repeatedly, may recommend different strategies or lower confidence.
-
Rate Limit Handling: Uses suggested wait times from rate limit errors, with additional buffer.
-
Cascading Failure Detection: If errors are getting different/worse, may recommend stopping to prevent further damage.
-
Recovery Detection: If recent attempts succeeded after failures, uses shorter delays to capitalize on recovery.
-
Delay Learning with Circuit Breaker: When a DelayHistory is provided, the strategy learns optimal delays from past outcomes. A circuit breaker protects against bad learned delays by reverting to static delays after 3 consecutive failures.
Circuit Breaker State Design
The circuit breaker state (_learned_delay_failures, _use_learned_delay) is intentionally ephemeral and NOT persisted. This is a deliberate design choice with the following trade-offs:
Benefits: - After restart, the system gets a "fresh start" to try learned delays - Avoids persisting potentially stale circuit breaker state - Simple implementation without additional state management
Trade-offs: - After restart, may retry with a previously-failed learned delay once - Circuit breaker will re-trigger after 3 failures if the learned delay is still problematic
The DelayHistory itself CAN be persisted (it's just delay outcomes), but the circuit breaker resets on each AdaptiveRetryStrategy instantiation. Use reset_circuit_breaker() to manually reset circuit breaker state for a specific error code during runtime.
Thread-safe: No mutable state; all analysis is based on input history.
Example
strategy = AdaptiveRetryStrategy()
Analyze error history¶
recommendation = strategy.analyze(error_history)
Log the decision¶
logger.info( "retry_decision", should_retry=recommendation.should_retry, delay=recommendation.delay_seconds, pattern=recommendation.detected_pattern.value, reason=recommendation.reason, )
Initialize the adaptive retry strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RetryStrategyConfig | None
|
Optional configuration. Uses defaults if not provided. |
None
|
delay_history
|
DelayHistory | None
|
Optional delay history for learning. If not provided, learning features are disabled (purely static delays). |
None
|
global_learning_store
|
GlobalLearningStore | None
|
Optional global learning store for cross-workspace learned delays (Evolution #3: Learned Wait Time Injection). If provided, blend_historical_delay() will query global store for cross-workspace learned delays when in-memory history is insufficient. |
None
|
Source code in src/marianne/execution/retry_strategy.py
Functions¶
analyze
¶
Analyze error history and recommend retry behavior.
This is the main entry point for the adaptive retry strategy. It examines the error history to detect patterns and returns a recommendation with reasoning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_history
|
list[ErrorRecord]
|
List of ErrorRecords in chronological order. |
required |
max_retries
|
int | None
|
Optional maximum retries to consider (for confidence). |
None
|
Returns:
| Type | Description |
|---|---|
RetryRecommendation
|
RetryRecommendation with decision, delay, and reasoning. |
Source code in src/marianne/execution/retry_strategy.py
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 | |
blend_historical_delay
¶
Blend learned delay with static delay for an error code.
Priority order: 1. Circuit breaker override → static 2. In-memory delay history (job-specific learning) 3. Global learning store (cross-workspace learned delays) 4. Static delay (fallback)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_code
|
ErrorCode
|
The error code to get delay for. |
required |
static_delay
|
float
|
The static delay from ErrorCode.get_retry_behavior(). |
required |
Returns:
| Type | Description |
|---|---|
tuple[float, str]
|
Tuple of (blended_delay, strategy_name). |
Source code in src/marianne/execution/retry_strategy.py
record_delay_outcome
¶
Record the outcome of a retry delay for learning.
Should be called after each retry attempt to update the delay history. Also updates circuit breaker state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_code
|
ErrorCode
|
The error code that was being retried. |
required |
delay_used
|
float
|
The delay in seconds that was used. |
required |
succeeded
|
bool
|
Whether the retry succeeded after this delay. |
required |
Source code in src/marianne/execution/retry_strategy.py
reset_circuit_breaker
¶
Reset circuit breaker for an error code, re-enabling learned delays.
Call this method when you want to give learned delays another chance after the circuit breaker has tripped. Common scenarios:
- After manual intervention that fixed the underlying issue
- After a cooling-off period with successful static delays
- At the start of a new batch/job where conditions may have changed
Note: The circuit breaker state is ephemeral (not persisted), so it automatically resets when a new AdaptiveRetryStrategy is instantiated. This method is for resetting during runtime without reinstantiation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_code
|
ErrorCode
|
The error code to reset circuit breaker for. |
required |
Example
After manual fix, give learned delays another chance¶
strategy.reset_circuit_breaker(ErrorCode.E101)
Source code in src/marianne/execution/retry_strategy.py
ErrorRecord
dataclass
¶
ErrorRecord(timestamp, error_code, category, message, exit_code=None, exit_signal=None, retriable=True, suggested_wait=None, sheet_num=None, attempt_num=1, monotonic_time=monotonic(), root_cause_confidence=None, secondary_error_count=0)
Record of a single error occurrence for pattern analysis.
Captures all relevant information about an error to enable intelligent pattern detection across multiple errors.
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp |
datetime
|
When the error occurred (UTC). |
error_code |
ErrorCode
|
Structured error code (e.g., E001, E101). |
category |
ErrorCategory
|
High-level error category (rate_limit, transient, etc.). |
message |
str
|
Human-readable error description. |
exit_code |
int | None
|
Process exit code if applicable. |
exit_signal |
int | None
|
Signal number if killed by signal. |
retriable |
bool
|
Whether this specific error is retriable. |
suggested_wait |
float | None
|
Classifier's suggested wait time in seconds. |
sheet_num |
int | None
|
Sheet number where error occurred. |
attempt_num |
int
|
Which attempt number this was (1-indexed). |
monotonic_time |
float
|
Monotonic timestamp for precise timing calculations. |
root_cause_confidence |
float | None
|
Confidence in root cause identification (0.0-1.0). |
secondary_error_count |
int
|
Number of secondary errors detected. |
Functions¶
from_classified_error
classmethod
¶
Create an ErrorRecord from a ClassifiedError.
This is the primary factory method for creating ErrorRecords in the retry flow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
ClassifiedError
|
ClassifiedError from the error classifier. |
required |
sheet_num
|
int | None
|
Optional sheet number for context. |
None
|
attempt_num
|
int
|
Which retry attempt this represents. |
1
|
Returns:
| Type | Description |
|---|---|
ErrorRecord
|
ErrorRecord populated from the classified error. |
Source code in src/marianne/execution/retry_strategy.py
from_classification_result
classmethod
¶
Create an ErrorRecord from a ClassificationResult.
This factory method captures root cause information from the multi-error classification, including confidence in root cause identification and the count of secondary errors. This enables the retry strategy to consider root cause confidence when making retry decisions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result
|
ClassificationResult
|
ClassificationResult from classify_execution(). |
required |
sheet_num
|
int | None
|
Optional sheet number for context. |
None
|
attempt_num
|
int
|
Which retry attempt this represents. |
1
|
Returns:
| Type | Description |
|---|---|
ErrorRecord
|
ErrorRecord with root cause confidence and secondary error count. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If confidence is not in valid range [0.0, 1.0]. |
Source code in src/marianne/execution/retry_strategy.py
to_dict
¶
Convert to dictionary for logging/serialization.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dictionary representation with all fields. |
Source code in src/marianne/execution/retry_strategy.py
RetryPattern
¶
Bases: str, Enum
Detected error patterns that influence retry strategy.
Each pattern triggers a different retry behavior to maximize the chance of recovery while minimizing wasted attempts.
Attributes¶
NONE
class-attribute
instance-attribute
¶
No clear pattern detected - use default retry behavior.
RAPID_FAILURES
class-attribute
instance-attribute
¶
Multiple failures in quick succession - needs longer cooldown.
REPEATED_ERROR_CODE
class-attribute
instance-attribute
¶
Same error code appearing repeatedly - may be persistent issue.
RATE_LIMITED
class-attribute
instance-attribute
¶
Rate limiting detected - use rate limit wait time.
CASCADING_FAILURES
class-attribute
instance-attribute
¶
Errors are getting worse/different - system may be degrading.
INTERMITTENT
class-attribute
instance-attribute
¶
Errors are spread out with successes in between - normal transient.
RECOVERY_IN_PROGRESS
class-attribute
instance-attribute
¶
Recent success after failures - system may be recovering.
RetryRecommendation
dataclass
¶
RetryRecommendation(should_retry, delay_seconds, reason, confidence, detected_pattern=NONE, strategy_used='default', root_cause_confidence=None)
Recommendation from the adaptive retry strategy.
Encapsulates the decision of whether to retry, how long to wait, and the reasoning behind the decision for observability.
Attributes:
| Name | Type | Description |
|---|---|---|
should_retry |
bool
|
Whether a retry should be attempted. |
delay_seconds |
float
|
Recommended delay before retrying. |
reason |
str
|
Human-readable explanation of the decision. |
confidence |
float
|
Confidence in this recommendation (0.0-1.0). |
detected_pattern |
RetryPattern
|
The pattern that influenced this decision. |
strategy_used |
str
|
Name of the strategy/heuristic that was applied. |
root_cause_confidence |
float | None
|
Confidence in root cause identification (0.0-1.0, None if N/A). |
Functions¶
__post_init__
¶
Validate confidence is in valid range.
Source code in src/marianne/execution/retry_strategy.py
to_dict
¶
Convert to dictionary for logging/serialization.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dictionary representation with all fields. |
Source code in src/marianne/execution/retry_strategy.py
RetryStrategyConfig
dataclass
¶
RetryStrategyConfig(base_delay=10.0, max_delay=API_RATE_LIMIT, exponential_base=2.0, rapid_failure_window=60.0, rapid_failure_threshold=3, rapid_failure_multiplier=2.0, repeated_error_threshold=2, repeated_error_strategy_change_threshold=3, min_confidence=0.3, jitter_factor=0.25)
Configuration for the adaptive retry strategy.
All timing values are in seconds. Thresholds are tuned for typical Claude CLI execution patterns.
Attributes:
| Name | Type | Description |
|---|---|---|
base_delay |
float
|
Starting delay for exponential backoff. |
max_delay |
float
|
Maximum delay cap. |
exponential_base |
float
|
Multiplier for exponential backoff. |
rapid_failure_window |
float
|
Window (seconds) to detect rapid failures. |
rapid_failure_threshold |
int
|
Number of failures in window to trigger. |
rapid_failure_multiplier |
float
|
Extra delay multiplier for rapid failures. |
repeated_error_threshold |
int
|
Same error code count before flagging. |
repeated_error_strategy_change_threshold |
int
|
Count before strategy change. |
min_confidence |
float
|
Minimum confidence for retry recommendation. |
jitter_factor |
float
|
Random jitter to add (0.0-1.0 of delay). |
Functions¶
__post_init__
¶
Validate configuration values.
Source code in src/marianne/execution/retry_strategy.py
FileModificationTracker
¶
Tracks file mtimes before sheet execution for file_modified checks.
Source code in src/marianne/execution/validation/models.py
Functions¶
snapshot
¶
Capture mtimes of files before sheet execution.
Source code in src/marianne/execution/validation/models.py
was_modified
¶
Check if file was modified (or created) after snapshot.
Source code in src/marianne/execution/validation/models.py
get_original_mtime
¶
SheetValidationResult
dataclass
¶
Aggregate result of all validations for a sheet.
Attributes¶
executed_pass_percentage
property
¶
Percentage of EXECUTED validations that passed.
aggregate_confidence
property
¶
Calculate weighted aggregate confidence across all validation results.
Functions¶
get_passed_rules
¶
get_failed_rules
¶
get_passed_results
¶
get_failed_results
¶
to_dict_list
¶
get_semantic_summary
¶
Aggregate semantic information from failed validations.
Source code in src/marianne/execution/validation/models.py
get_actionable_hints
¶
Extract actionable hints from failed validations.
Source code in src/marianne/execution/validation/models.py
ValidationEngine
¶
Executes validation rules against sheet outputs.
Handles path template expansion and dispatches to type-specific validation methods.
Initialize validation engine.
Source code in src/marianne/execution/validation/engine.py
Functions¶
expand_path
¶
Expand path template with sheet context variables.
Supports: {sheet_num}, {workspace}, {start_item}, {end_item}
Both workspace-relative and absolute paths are allowed. Agents work
in backend.working_directory (typically the project root) and
create files there — restricting validations to the workspace
directory would prevent checking those files.
Source code in src/marianne/execution/validation/engine.py
snapshot_mtime_files
¶
Snapshot mtimes for all file_modified rules before sheet execution.
Source code in src/marianne/execution/validation/engine.py
get_applicable_rules
¶
Get rules that apply to the current sheet context.
run_validations
async
¶
Execute all validation rules and return aggregate result.
Source code in src/marianne/execution/validation/engine.py
run_staged_validations
async
¶
Execute validations in stage order with fail-fast behavior.
Source code in src/marianne/execution/validation/engine.py
ValidationResult
dataclass
¶
ValidationResult(rule, passed, actual_value=None, expected_value=None, error_message=None, checked_at=utc_now(), check_duration_ms=0.0, confidence=1.0, confidence_factors=dict(), failure_reason=None, failure_category=None, suggested_fix=None, error_type=None)
Result of a single validation check.
Attributes¶
confidence
class-attribute
instance-attribute
¶
Confidence in this validation result (0.0-1.0). Default 1.0 = fully confident.
confidence_factors
class-attribute
instance-attribute
¶
Factors affecting confidence, e.g., {'file_age': 0.9, 'pattern_specificity': 0.8}.
failure_reason
class-attribute
instance-attribute
¶
Semantic explanation of why validation failed.
failure_category
class-attribute
instance-attribute
¶
Category of failure: 'missing', 'malformed', 'incomplete', 'stale', 'error'.
suggested_fix
class-attribute
instance-attribute
¶
Hint for how to fix the issue.
error_type
class-attribute
instance-attribute
¶
Distinguishes validation failures from validation crashes. None or 'validation_failure' = output didn't meet the rule. 'internal_error' = the validation check itself crashed.
Functions¶
to_dict
¶
Convert to serializable dictionary.
Source code in src/marianne/execution/validation/models.py
format_failure_summary
¶
Format failure information for prompt injection.