Skip to content

Index

learning

Learning module for outcome recording, pattern detection, and judgment.

This module provides: - Outcome recording and pattern detection (local workspace) - Global learning store for cross-workspace pattern persistence - Pattern weighting with recency/effectiveness decay - Pattern aggregation for merging outcomes into global patterns - Error learning hooks for adaptive wait times - Migration from workspace-local to global store - Judgment client for LLM-based decision making

Classes

AggregationResult

AggregationResult()

Result of a pattern aggregation operation.

Source code in src/marianne/learning/aggregator.py
def __init__(self) -> None:
    self.outcomes_recorded: int = 0
    self.patterns_detected: int = 0
    self.patterns_merged: int = 0
    self.priorities_updated: bool = False
    self.errors: list[str] = []

PatternAggregator

PatternAggregator(global_store, weighter=None)

Aggregates patterns from executions into the global store.

Implements the aggregation strategy defined in Movement III: 1. Record all sheet outcomes to executions table 2. Run PatternDetector.detect_all() on new outcomes 3. Merge detected patterns with existing patterns in global store 4. Update priority_score for all affected patterns 5. Record pattern_applications for any patterns that were applied 6. Update error_recoveries for any error learning

Conflict resolution strategy: - Same pattern: merge by increasing counts and updating timestamps - Different suggested_actions: keep action with higher effectiveness

Initialize the pattern aggregator.

Parameters:

Name Type Description Default
global_store GlobalLearningStore

The global learning store for persistence.

required
weighter PatternWeighter | None

Pattern weighter for priority calculation.

None
Source code in src/marianne/learning/aggregator.py
def __init__(
    self,
    global_store: GlobalLearningStore,
    weighter: PatternWeighter | None = None,
) -> None:
    """Initialize the pattern aggregator.

    Args:
        global_store: The global learning store for persistence.
        weighter: Pattern weighter for priority calculation.
    """
    self.global_store = global_store
    self.weighter = weighter or PatternWeighter()
Functions
aggregate_outcomes
aggregate_outcomes(outcomes, workspace_path, model=None, instrument_name=None)

Aggregate a batch of outcomes into the global store.

This is the main entry point called after job completion.

Parameters:

Name Type Description Default
outcomes list[SheetOutcome]

List of sheet outcomes from the completed job.

required
workspace_path Path

Path to the workspace for hashing.

required
model str | None

Optional model name used for execution.

None
instrument_name str | None

Optional instrument/backend type for pattern scoping.

None

Returns:

Type Description
AggregationResult

AggregationResult with statistics about the aggregation.

Source code in src/marianne/learning/aggregator.py
def aggregate_outcomes(
    self,
    outcomes: list[SheetOutcome],
    workspace_path: Path,
    model: str | None = None,
    instrument_name: str | None = None,
) -> AggregationResult:
    """Aggregate a batch of outcomes into the global store.

    This is the main entry point called after job completion.

    Args:
        outcomes: List of sheet outcomes from the completed job.
        workspace_path: Path to the workspace for hashing.
        model: Optional model name used for execution.
        instrument_name: Optional instrument/backend type for pattern scoping.

    Returns:
        AggregationResult with statistics about the aggregation.
    """
    self._instrument_name = instrument_name
    result = AggregationResult()

    if not outcomes:
        return result

    # Step 1: Record all outcomes to executions table
    execution_ids = []
    for outcome in outcomes:
        exec_id = self.global_store.record_outcome(
            outcome=outcome,
            workspace_path=workspace_path,
            model=model,
        )
        execution_ids.append(exec_id)
        result.outcomes_recorded += 1

    # Step 2: Run pattern detection on new outcomes
    detector = PatternDetector(outcomes)
    detected_patterns = detector.detect_all()
    result.patterns_detected = len(detected_patterns)

    # Step 3: Merge detected patterns with global store
    for pattern in detected_patterns:
        pattern_id = self._merge_pattern(pattern)
        if pattern_id:
            result.patterns_merged += 1

    # Step 4: Update priority scores for affected patterns
    self._update_all_priorities()
    result.priorities_updated = True

    # Step 5: Record pattern applications for effectiveness tracking
    for outcome in outcomes:
        self._record_pattern_applications(outcome, execution_ids)

    _logger.info(
        "aggregation_complete",
        outcomes=result.outcomes_recorded,
        patterns_detected=result.patterns_detected,
        patterns_merged=result.patterns_merged,
    )

    return result
merge_with_conflict_resolution
merge_with_conflict_resolution(existing, new)

Merge a new pattern with an existing one using conflict resolution.

Resolution strategy from design document: - occurrence_count: sum - effectiveness_score: weighted average by occurrence_count - last_seen: max - last_confirmed: max - suggested_action: keep action with higher effectiveness

Parameters:

Name Type Description Default
existing PatternRecord

The existing pattern record.

required
new DetectedPattern

The newly detected pattern.

required

Returns:

Type Description
dict[str, object]

Dict of updated field values.

Source code in src/marianne/learning/aggregator.py
def merge_with_conflict_resolution(
    self,
    existing: PatternRecord,
    new: DetectedPattern,
) -> dict[str, object]:
    """Merge a new pattern with an existing one using conflict resolution.

    Resolution strategy from design document:
    - occurrence_count: sum
    - effectiveness_score: weighted average by occurrence_count
    - last_seen: max
    - last_confirmed: max
    - suggested_action: keep action with higher effectiveness

    Args:
        existing: The existing pattern record.
        new: The newly detected pattern.

    Returns:
        Dict of updated field values.
    """
    # Sum occurrence counts
    merged_count = existing.occurrence_count + new.frequency

    # Weighted average for effectiveness.
    # Both effectiveness_score and success_rate are on the same 0.0-1.0
    # scale, representing how often the pattern leads to success.
    total = existing.occurrence_count + new.frequency
    if total > 0:
        weighted_effectiveness = (
            existing.effectiveness_score * existing.occurrence_count
            + new.success_rate * new.frequency
        ) / total
    else:
        weighted_effectiveness = 0.5

    # Max for timestamps
    merged_last_seen = max(existing.last_seen, new.last_seen)

    # Determine suggested action — use merged effectiveness to decide.
    # If the merged effectiveness improved over the existing score,
    # the new pattern contributed positively, so adopt its guidance.
    merged_action: str | None
    if weighted_effectiveness > existing.effectiveness_score:
        merged_action = new.to_prompt_guidance()
    else:
        merged_action = existing.suggested_action

    # Format last_seen as ISO string
    last_seen_str = (
        merged_last_seen.isoformat()
        if isinstance(merged_last_seen, datetime)
        else str(merged_last_seen)
    )

    return {
        "occurrence_count": merged_count,
        "effectiveness_score": weighted_effectiveness,
        "last_seen": last_seen_str,
        "suggested_action": merged_action,
    }
prune_deprecated_patterns
prune_deprecated_patterns()

Remove patterns that are below the effectiveness threshold.

Patterns are deprecated (not deleted) if: - They have enough application data (>= 3) - Their effectiveness is below 0.3

Returns:

Type Description
int

Number of patterns deprecated.

Source code in src/marianne/learning/aggregator.py
def prune_deprecated_patterns(self) -> int:
    """Remove patterns that are below the effectiveness threshold.

    Patterns are deprecated (not deleted) if:
    - They have enough application data (>= 3)
    - Their effectiveness is below 0.3

    Returns:
        Number of patterns deprecated.
    """
    deprecated_count = 0
    patterns = self.global_store.get_patterns(min_priority=0.0, limit=1000)

    with self.global_store._get_connection() as conn:
        for pattern in patterns:
            if self.weighter.is_deprecated(
                pattern.led_to_success_count,
                pattern.led_to_failure_count,
            ):
                # Set priority to 0 to effectively deprecate
                conn.execute(
                    "UPDATE patterns SET priority_score = 0 WHERE id = ?",
                    (pattern.id,),
                )
                deprecated_count += 1

    if deprecated_count > 0:
        _logger.info("patterns_deprecated", count=deprecated_count)

    return deprecated_count

ErrorLearningConfig dataclass

ErrorLearningConfig(enabled=True, min_samples=3, learning_rate=0.3, max_wait_time=7200.0, min_wait_time=10.0, decay_factor=0.9)

Configuration for error learning.

Attributes:

Name Type Description
enabled bool

Master switch for error learning.

min_samples int

Minimum recovery samples before using learned delay.

learning_rate float

How much to weight new observations vs existing.

max_wait_time float

Maximum wait time to suggest (cap on learning).

min_wait_time float

Minimum wait time to suggest (floor on learning).

decay_factor float

How much to decay old samples over time.

ErrorLearningContext dataclass

ErrorLearningContext(error, job_id, sheet_num, workspace_path, model=None, timestamp=now(), suggested_wait=None, actual_wait=None, recovery_success=None)

Context for an error learning event.

Tracks the full context of an error for learning purposes.

Attributes
error_code property
error_code

Get the error code from either ClassifiedError or ClassificationResult.

category property
category

Get the error category.

ErrorLearningHooks

ErrorLearningHooks(global_store=None, config=None)

Learning hooks for ErrorClassifier integration.

Provides hooks that can be called at various points in error handling to record error patterns and learn from recovery attempts.

The hooks follow the design pattern of non-invasive integration: - They can be optionally called by the runner - If global_store is None, hooks are no-ops - All operations are logged for debugging

Usage

hooks = ErrorLearningHooks(global_store)

When an error is classified

adjusted = hooks.on_error_classified(context) if adjusted.suggested_wait_seconds: await asyncio.sleep(adjusted.suggested_wait_seconds)

After recovery attempt

hooks.on_error_recovered(context, success=True)

Initialize error learning hooks.

Parameters:

Name Type Description Default
global_store GlobalLearningStore | None

Global learning store for persistence. If None, hooks are no-ops.

None
config ErrorLearningConfig | None

Error learning configuration.

None
Source code in src/marianne/learning/error_hooks.py
def __init__(
    self,
    global_store: "GlobalLearningStore | None" = None,
    config: ErrorLearningConfig | None = None,
) -> None:
    """Initialize error learning hooks.

    Args:
        global_store: Global learning store for persistence.
                     If None, hooks are no-ops.
        config: Error learning configuration.
    """
    self._store = global_store
    self._config = config or ErrorLearningConfig()
    self._pending_contexts: dict[str, ErrorLearningContext] = {}
Attributes
enabled property
enabled

Check if error learning is enabled and store is available.

Functions
on_error_classified
on_error_classified(context)

Hook called when an error is classified.

Records the error occurrence and potentially adjusts the suggested wait time based on learned patterns.

Parameters:

Name Type Description Default
context ErrorLearningContext

Full error context including job/sheet info.

required

Returns:

Type Description
ClassifiedError

The error with potentially adjusted suggested_wait_seconds.

Source code in src/marianne/learning/error_hooks.py
def on_error_classified(
    self,
    context: ErrorLearningContext,
) -> ClassifiedError:
    """Hook called when an error is classified.

    Records the error occurrence and potentially adjusts the suggested
    wait time based on learned patterns.

    Args:
        context: Full error context including job/sheet info.

    Returns:
        The error with potentially adjusted suggested_wait_seconds.
    """
    if not self.enabled:
        return self._get_classified_error(context)

    error = self._get_classified_error(context)

    # Record pattern for this error
    self._record_error_pattern(context)

    # Check if this is a rate limit error and we have learned data
    if error.category == ErrorCategory.RATE_LIMIT:
        adjusted_wait = self._get_learned_wait(context)
        if adjusted_wait is not None:
            _logger.info(
                f"Adjusted wait for {error.error_code.value}: "
                f"{error.suggested_wait_seconds}s -> {adjusted_wait}s (learned)"
            )
            # Create new error with adjusted wait
            return ClassifiedError(
                category=error.category,
                message=error.message,
                error_code=error.error_code,
                original_error=error.original_error,
                exit_code=error.exit_code,
                exit_signal=error.exit_signal,
                exit_reason=error.exit_reason,
                retriable=error.retriable,
                suggested_wait_seconds=adjusted_wait,
                error_info=error.error_info,
            )

    # Track this context for later recovery reporting
    context_key = self._get_context_key(context)
    self._pending_contexts[context_key] = context

    return error
on_error_recovered
on_error_recovered(context, success)

Hook called after a recovery attempt.

Records the actual wait time and whether recovery succeeded, updating the learned wait times for this error code.

Parameters:

Name Type Description Default
context ErrorLearningContext

Error context with actual_wait filled in.

required
success bool

Whether the recovery attempt succeeded.

required
Source code in src/marianne/learning/error_hooks.py
def on_error_recovered(
    self,
    context: ErrorLearningContext,
    success: bool,
) -> None:
    """Hook called after a recovery attempt.

    Records the actual wait time and whether recovery succeeded,
    updating the learned wait times for this error code.

    Args:
        context: Error context with actual_wait filled in.
        success: Whether the recovery attempt succeeded.
    """
    if not self.enabled or self._store is None:
        return

    error = self._get_classified_error(context)

    # Record the recovery to the global store
    if context.actual_wait is not None:
        suggested_wait = context.suggested_wait or error.suggested_wait_seconds or 0
        self._store.record_error_recovery(
            error_code=error.error_code.value,
            suggested_wait=suggested_wait,
            actual_wait=context.actual_wait,
            recovery_success=success,
            model=context.model,
        )

        _logger.debug(
            f"Recorded error recovery: {error.error_code.value} "
            f"actual_wait={context.actual_wait}s success={success}"
        )

    # Clean up pending context
    context_key = self._get_context_key(context)
    self._pending_contexts.pop(context_key, None)
on_auth_failure
on_auth_failure(context)

Hook to analyze auth failures.

Uses historical data to determine if this auth failure is likely transient (worth retrying) or permanent (should fail immediately).

Parameters:

Name Type Description Default
context ErrorLearningContext

Error context for the auth failure.

required

Returns:

Type Description
bool

Tuple of (is_transient, reason).

str

If is_transient is True, the error might recover after a delay.

Source code in src/marianne/learning/error_hooks.py
def on_auth_failure(
    self,
    context: ErrorLearningContext,
) -> tuple[bool, str]:
    """Hook to analyze auth failures.

    Uses historical data to determine if this auth failure is likely
    transient (worth retrying) or permanent (should fail immediately).

    Args:
        context: Error context for the auth failure.

    Returns:
        Tuple of (is_transient, reason).
        If is_transient is True, the error might recover after a delay.
    """
    if not self.enabled or self._store is None:
        return False, "No learning data available"

    error = self._get_classified_error(context)

    # Query past auth failures for this model/context
    # If we've seen successful recoveries, mark as transient
    with self._store._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT
                SUM(CASE WHEN recovery_success THEN 1 ELSE 0 END) as successes,
                COUNT(*) as total
            FROM error_recoveries
            WHERE error_code = ? AND model = ?
            """,
            (error.error_code.value, context.model),
        )
        row = cursor.fetchone()

        if row and row["total"] >= self._config.min_samples:
            success_rate = row["successes"] / row["total"]
            if success_rate > 0.3:  # >30% recovery rate suggests transient
                return True, f"Historical recovery rate: {success_rate:.0%}"

    return False, "Insufficient recovery history or low success rate"
get_error_stats
get_error_stats(error_code)

Get statistics for a specific error code.

Parameters:

Name Type Description Default
error_code str

The error code to query (e.g., 'E103').

required

Returns:

Type Description
dict[str, str | int | float]

Dictionary with error statistics.

Source code in src/marianne/learning/error_hooks.py
def get_error_stats(self, error_code: str) -> dict[str, str | int | float]:
    """Get statistics for a specific error code.

    Args:
        error_code: The error code to query (e.g., 'E103').

    Returns:
        Dictionary with error statistics.
    """
    if not self.enabled or self._store is None:
        return {"error": "Learning not enabled"}

    with self._store._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT
                COUNT(*) as total_occurrences,
                SUM(CASE WHEN recovery_success THEN 1 ELSE 0 END) as recoveries,
                AVG(actual_wait) as avg_wait,
                MIN(actual_wait) as min_wait,
                MAX(actual_wait) as max_wait
            FROM error_recoveries
            WHERE error_code = ?
            """,
            (error_code,),
        )
        row = cursor.fetchone()

        if row:
            return {
                "error_code": error_code,
                "total_occurrences": row["total_occurrences"],
                "successful_recoveries": row["recoveries"] or 0,
                "recovery_rate": (
                    (row["recoveries"] / row["total_occurrences"] * 100)
                    if row["total_occurrences"] > 0
                    else 0
                ),
                "avg_wait_seconds": round(row["avg_wait"] or 0, 1),
                "min_wait_seconds": round(row["min_wait"] or 0, 1),
                "max_wait_seconds": round(row["max_wait"] or 0, 1),
            }

    return {"error_code": error_code, "total_occurrences": 0}

ErrorRecoveryRecord dataclass

ErrorRecoveryRecord(id, error_code, suggested_wait, actual_wait, recovery_success, recorded_at, model, time_of_day)

A record of error recovery timing for learning adaptive waits.

ExecutionRecord dataclass

ExecutionRecord(id, workspace_hash, job_hash, sheet_num, started_at, completed_at, duration_seconds, status, retry_count, success_without_retry, validation_pass_rate, confidence_score, model, error_codes=list())

A record of a sheet execution stored in the global database.

Functions
__post_init__
__post_init__()

Clamp fields to valid ranges.

Source code in src/marianne/learning/store/models.py
def __post_init__(self) -> None:
    """Clamp fields to valid ranges."""
    self.duration_seconds = max(0.0, self.duration_seconds)
    self.retry_count = max(0, self.retry_count)
    self.validation_pass_rate = max(0.0, min(1.0, self.validation_pass_rate))
    self.confidence_score = max(0.0, min(1.0, self.confidence_score))

GlobalLearningStore

GlobalLearningStore(db_path=None)

Bases: PatternMixin, ExecutionMixin, RateLimitMixin, DriftMixin, EscalationMixin, BudgetMixin, PatternLifecycleMixin, GlobalLearningStoreBase

Global learning store combining all mixins.

This is the primary interface for Marianne's cross-workspace learning system. It provides persistent storage for execution outcomes, detected patterns, error recovery data, and learning metrics across all Marianne workspaces.

The class is composed from multiple mixins, each providing domain-specific functionality. The base class (listed last for proper MRO) provides: - SQLite connection management with WAL mode for concurrent access - Schema creation and version migration - Hashing utilities for workspace and job identification

Mixin Capabilities

PatternMixin: - record_pattern(), get_patterns(), get_pattern_by_id() - record_pattern_application(), update_pattern_effectiveness() - quarantine lifecycle: quarantine_pattern(), validate_pattern() - trust scoring, success factor analysis - pattern discovery broadcasting

ExecutionMixin: - record_outcome() for sheet execution outcomes - get_execution_stats(), get_recent_executions() - get_similar_executions() for learning activation - workspace clustering for cross-workspace correlation

RateLimitMixin: - record_rate_limit_event() for cross-workspace coordination - get_recent_rate_limits() to check before API calls - Enables parallel jobs to avoid hitting same limits

DriftMixin: - calculate_drift_metrics() for effectiveness drift - detect_epistemic_drift() for belief-level monitoring - auto_retire_drifting_patterns() for lifecycle management - get_pattern_evolution_trajectory() for historical analysis

EscalationMixin: - record_escalation_decision() when handlers respond - get_similar_escalation() for pattern-based suggestions - Closes the learning loop for escalation handling

BudgetMixin: - get_exploration_budget(), update_exploration_budget() - record_entropy_response() for diversity injection - Dynamic budget with floor/ceiling to prevent over-convergence

Example

from marianne.learning.store import GlobalLearningStore store = GlobalLearningStore()

Record a pattern

store.record_pattern( ... pattern_type="rate_limit_recovery", ... pattern_content={"action": "exponential_backoff"}, ... context={"error_code": "E101"}, ... source_job="job-123", ... )

Query execution statistics

stats = store.get_execution_stats() print(f"Total executions: {stats['total_executions']}")

Attributes:

Name Type Description
db_path

Path to the SQLite database file.

_logger MarianneLogger

Module logger instance for consistent logging.

Note

The database uses WAL mode for safe concurrent access from multiple Marianne jobs. Schema migrations are applied automatically when the store is initialized.

Source code in src/marianne/learning/store/base.py
def __init__(self, db_path: Path | None = None) -> None:
    """Initialize the global learning store.

    Creates the database directory if needed, establishes the connection,
    and runs any necessary migrations.

    Args:
        db_path: Path to the SQLite database file.
                Defaults to ~/.marianne/global-learning.db
    """
    self.db_path = db_path or DEFAULT_GLOBAL_STORE_PATH
    self._logger = _logger
    # ContextVar scopes the batch connection per-asyncio-task, preventing
    # a race where Task A's batch_connection() leaks into Task B's
    # _get_connection() calls.  Each task sees its own (or no) batch conn.
    self._batch_conn: contextvars.ContextVar[sqlite3.Connection | None] = (
        contextvars.ContextVar("_batch_conn", default=None)
    )
    self._ensure_db_exists()
    self._migrate_if_needed()

PatternRecord dataclass

PatternRecord(id, pattern_type, pattern_name, description, occurrence_count, first_seen, last_seen, last_confirmed, led_to_success_count, led_to_failure_count, effectiveness_score, variance, suggested_action, context_tags, priority_score, quarantine_status=PENDING, provenance_job_hash=None, provenance_sheet_num=None, quarantined_at=None, validated_at=None, quarantine_reason=None, trust_score=0.5, trust_calculation_date=None, success_factors=None, success_factors_updated_at=None, active=True, content_hash=None, instrument_name=None)

A pattern record stored in the global database.

v19 Evolution: Extended with quarantine_status, provenance, and trust_score fields to support the Pattern Quarantine & Provenance and Pattern Trust Scoring evolutions.

Attributes
quarantine_status class-attribute instance-attribute
quarantine_status = PENDING

Current status in the quarantine lifecycle.

provenance_job_hash class-attribute instance-attribute
provenance_job_hash = None

Hash of the job that first created this pattern.

provenance_sheet_num class-attribute instance-attribute
provenance_sheet_num = None

Sheet number where this pattern was first observed.

quarantined_at class-attribute instance-attribute
quarantined_at = None

When the pattern was moved to QUARANTINED status.

validated_at class-attribute instance-attribute
validated_at = None

When the pattern was moved to VALIDATED status.

quarantine_reason class-attribute instance-attribute
quarantine_reason = None

Reason for quarantine (if quarantined).

trust_score class-attribute instance-attribute
trust_score = 0.5

Trust score (0.0-1.0). 0.5 is neutral, >0.7 is high trust.

trust_calculation_date class-attribute instance-attribute
trust_calculation_date = None

When trust_score was last calculated.

success_factors class-attribute instance-attribute
success_factors = None

WHY this pattern succeeds - captured context conditions and factors.

success_factors_updated_at class-attribute instance-attribute
success_factors_updated_at = None

When success_factors were last updated.

active class-attribute instance-attribute
active = True

Whether this pattern is active (False = soft-deleted).

content_hash class-attribute instance-attribute
content_hash = None

SHA-256 hash of pattern content for cross-name deduplication.

instrument_name class-attribute instance-attribute
instrument_name = None

Backend instrument that produced this pattern (e.g., 'claude_cli').

Functions
__post_init__
__post_init__()

Clamp scored fields to valid ranges.

Source code in src/marianne/learning/store/models.py
def __post_init__(self) -> None:
    """Clamp scored fields to valid ranges."""
    self.trust_score = max(0.0, min(1.0, self.trust_score))
    self.effectiveness_score = max(0.0, min(1.0, self.effectiveness_score))
    self.variance = max(0.0, self.variance)
    self.priority_score = max(0.0, self.priority_score)

JudgmentClient

JudgmentClient(endpoint, timeout=30.0)

HTTP client for Recursive Light judgment API.

Connects to Recursive Light's /api/marianne/judgment endpoint to get TDF-aligned execution decisions with accumulated wisdom.

Falls back to default "retry" action on connection errors for graceful degradation.

Attributes:

Name Type Description
endpoint

Base URL for the Recursive Light API.

timeout

Request timeout in seconds.

Initialize the judgment client.

Parameters:

Name Type Description Default
endpoint str

Base URL for the Recursive Light API server.

required
timeout float

Request timeout in seconds. Defaults to 30.0.

30.0
Source code in src/marianne/learning/judgment.py
def __init__(
    self,
    endpoint: str,
    timeout: float = 30.0,
) -> None:
    """Initialize the judgment client.

    Args:
        endpoint: Base URL for the Recursive Light API server.
        timeout: Request timeout in seconds. Defaults to 30.0.
    """
    self.endpoint = endpoint.rstrip("/")
    self.timeout = timeout
    self._client: httpx.AsyncClient | None = None
Functions
get_judgment async
get_judgment(query)

Get execution judgment from Recursive Light.

Posts the query to RL's /api/marianne/judgment endpoint and parses the response. On errors, returns a default "retry" action for graceful degradation.

Parameters:

Name Type Description Default
query JudgmentQuery

Full context about the sheet execution state.

required

Returns:

Type Description
JudgmentResponse

JudgmentResponse with recommended action. On connection

JudgmentResponse

errors, returns default retry action with low confidence.

Source code in src/marianne/learning/judgment.py
async def get_judgment(self, query: JudgmentQuery) -> JudgmentResponse:
    """Get execution judgment from Recursive Light.

    Posts the query to RL's /api/marianne/judgment endpoint and
    parses the response. On errors, returns a default "retry"
    action for graceful degradation.

    Args:
        query: Full context about the sheet execution state.

    Returns:
        JudgmentResponse with recommended action. On connection
        errors, returns default retry action with low confidence.
    """
    try:
        client = await self._get_client()

        # Build request payload from JudgmentQuery
        payload = {
            "job_id": query.job_id,
            SHEET_NUM_KEY: query.sheet_num,
            "validation_results": query.validation_results,
            "execution_history": query.execution_history,
            "error_patterns": query.error_patterns,
            "retry_count": query.retry_count,
            "confidence": query.confidence,
        }

        # POST to RL judgment endpoint
        response = await client.post("/api/marianne/judgment", json=payload)

        if response.status_code != 200:
            # API error - fall back to retry
            return self._default_retry_response(
                f"RL API error: {response.status_code}"
            )

        # Parse response JSON
        data = response.json()
        return self._parse_judgment_response(data)

    except httpx.ConnectError as e:
        return self._default_retry_response(f"Connection error: {e}")

    except httpx.TimeoutException as e:
        return self._default_retry_response(f"Timeout: {e}")

    except httpx.HTTPStatusError as e:
        if e.response.status_code < 500:
            # 4xx client errors are permanent — retrying won't help.
            _logger.warning(
                "judgment_client_error: HTTP %s — not retrying",
                e.response.status_code,
            )
            return JudgmentResponse(
                recommended_action="retry",
                confidence=0.1,
                reasoning=f"RL client error (HTTP {e.response.status_code}), unavailable",
            )
        return self._default_retry_response(f"HTTP server error: {e}")

    except (ValueError, KeyError, TypeError) as e:
        return self._default_retry_response(f"Response parsing error: {e}")

    except (httpx.HTTPError, OSError, OverflowError) as e:
        _logger.warning("judgment_unexpected_error: %s", e, exc_info=True)
        return self._default_retry_response(f"Unexpected error: {e}")
    except ExceptionGroup as eg:
        _logger.warning("judgment_exception_group: %s", eg, exc_info=True)
        return self._default_retry_response(f"Connection error group: {eg}")
close async
close()

Close the HTTP client connection.

Should be called when done using the client to clean up resources.

Source code in src/marianne/learning/judgment.py
async def close(self) -> None:
    """Close the HTTP client connection.

    Should be called when done using the client to clean up resources.
    """
    if self._client is not None and not self._client.is_closed:
        await self._client.aclose()
        self._client = None
__aenter__ async
__aenter__()

Async context manager entry.

Source code in src/marianne/learning/judgment.py
async def __aenter__(self) -> "JudgmentClient":
    """Async context manager entry."""
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit - closes client.

Source code in src/marianne/learning/judgment.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: types.TracebackType | None,
) -> None:
    """Async context manager exit - closes client."""
    await self.close()

JudgmentQuery dataclass

JudgmentQuery(job_id, sheet_num, validation_results, execution_history, error_patterns, retry_count, confidence)

Query sent to Recursive Light for execution judgment.

Contains all relevant context about the current sheet execution state needed for RL to provide informed judgment.

Attributes
job_id instance-attribute
job_id

Unique identifier for the job.

sheet_num instance-attribute
sheet_num

Current sheet number being executed.

validation_results instance-attribute
validation_results

Serialized validation results from the sheet.

execution_history instance-attribute
execution_history

History of previous execution attempts for this sheet.

error_patterns instance-attribute
error_patterns

Detected error patterns from previous attempts.

retry_count instance-attribute
retry_count

Number of retry attempts already made.

confidence instance-attribute
confidence

Current aggregate confidence score (0.0-1.0).

JudgmentResponse dataclass

JudgmentResponse(recommended_action, confidence, reasoning, prompt_modifications=None, escalation_urgency=None, human_question=None, patterns_learned=list())

Response from Recursive Light with execution judgment.

Provides recommended action, reasoning, and optional modifications for how to proceed with sheet execution.

Attributes
recommended_action instance-attribute
recommended_action

Recommended action: - proceed: Continue to next sheet (current succeeded) - retry: Retry the current sheet - completion: Run completion prompt for partial success - escalate: Escalate to human for decision - abort: Stop the entire job

confidence instance-attribute
confidence

RL's confidence in this recommendation (0.0-1.0).

reasoning instance-attribute
reasoning

Explanation of why this action is recommended.

prompt_modifications class-attribute instance-attribute
prompt_modifications = None

Optional modifications to apply to prompt on retry.

escalation_urgency class-attribute instance-attribute
escalation_urgency = None

Urgency level if action is 'escalate'.

human_question class-attribute instance-attribute
human_question = None

Specific question to ask human if escalating.

patterns_learned class-attribute instance-attribute
patterns_learned = field(default_factory=list)

New patterns identified by RL from this execution.

LocalJudgmentClient

LocalJudgmentClient(proceed_threshold=0.7, retry_threshold=0.4, max_retries=3)

Local fallback judgment client using simple heuristics.

Provides judgment without network calls for offline operation or when Recursive Light is unavailable. Uses confidence thresholds and pass rate heuristics to determine actions.

This is a simpler, always-available alternative that makes reasonable decisions based on local metrics alone.

Attributes:

Name Type Description
proceed_threshold

Confidence above which to proceed (default 0.7).

retry_threshold

Confidence above which to retry (default 0.4).

max_retries

Maximum retries before escalating (default 3).

Initialize the local judgment client.

Parameters:

Name Type Description Default
proceed_threshold float

Confidence >= this proceeds to next sheet.

0.7
retry_threshold float

Confidence >= this (but < proceed) retries. Below this, escalates or aborts.

0.4
max_retries int

Maximum retry count before escalating.

3
Source code in src/marianne/learning/judgment.py
def __init__(
    self,
    proceed_threshold: float = 0.7,
    retry_threshold: float = 0.4,
    max_retries: int = 3,
) -> None:
    """Initialize the local judgment client.

    Args:
        proceed_threshold: Confidence >= this proceeds to next sheet.
        retry_threshold: Confidence >= this (but < proceed) retries.
            Below this, escalates or aborts.
        max_retries: Maximum retry count before escalating.
    """
    self.proceed_threshold = proceed_threshold
    self.retry_threshold = retry_threshold
    self.max_retries = max_retries
Functions
get_judgment async
get_judgment(query)

Get execution judgment using local heuristics.

Decision logic: 1. High confidence (>= proceed_threshold) -> proceed 2. Medium confidence (>= retry_threshold) -> retry (unless max retries) 3. Low confidence (< retry_threshold) -> escalate 4. Max retries exceeded -> completion or escalate based on pass rate

Parameters:

Name Type Description Default
query JudgmentQuery

Full context about the sheet execution state.

required

Returns:

Type Description
JudgmentResponse

JudgmentResponse with recommended action based on heuristics.

Source code in src/marianne/learning/judgment.py
async def get_judgment(self, query: JudgmentQuery) -> JudgmentResponse:
    """Get execution judgment using local heuristics.

    Decision logic:
    1. High confidence (>= proceed_threshold) -> proceed
    2. Medium confidence (>= retry_threshold) -> retry (unless max retries)
    3. Low confidence (< retry_threshold) -> escalate
    4. Max retries exceeded -> completion or escalate based on pass rate

    Args:
        query: Full context about the sheet execution state.

    Returns:
        JudgmentResponse with recommended action based on heuristics.
    """
    confidence = query.confidence
    retry_count = query.retry_count

    # Calculate validation pass rate
    pass_rate = self._calculate_pass_rate(query.validation_results)

    # High confidence: proceed to next sheet
    if confidence >= self.proceed_threshold:
        return JudgmentResponse(
            recommended_action="proceed",
            confidence=0.8,
            reasoning=f"Confidence {confidence:.1%} >= {self.proceed_threshold:.1%} threshold",
            patterns_learned=[],
        )

    # Max retries exceeded
    if retry_count >= self.max_retries:
        # If pass rate is decent, try completion
        if pass_rate >= 0.5:
            return JudgmentResponse(
                recommended_action="completion",
                confidence=0.6,
                reasoning=(
                    f"Max retries ({self.max_retries}) exceeded but pass rate "
                    f"{pass_rate:.1%} suggests partial success - trying completion"
                ),
                patterns_learned=["max_retries_with_partial_success"],
            )
        # Otherwise escalate
        return JudgmentResponse(
            recommended_action="escalate",
            confidence=0.7,
            reasoning=(
                f"Max retries ({self.max_retries}) exceeded with low pass rate "
                f"{pass_rate:.1%} - human decision needed"
            ),
            escalation_urgency="medium",
            human_question=(
                f"Sheet {query.sheet_num} failed {retry_count} times with "
                f"{pass_rate:.0%} validation pass rate. Continue, skip, or abort?"
            ),
            patterns_learned=["max_retries_exceeded"],
        )

    # Medium confidence: retry
    if confidence >= self.retry_threshold:
        return JudgmentResponse(
            recommended_action="retry",
            confidence=0.6,
            reasoning=(
                f"Confidence {confidence:.1%} in retry range "
                f"[{self.retry_threshold:.1%}, {self.proceed_threshold:.1%})"
            ),
            patterns_learned=[],
        )

    # Low confidence: escalate
    urgency: Literal["low", "medium", "high"] = "low"
    if confidence < 0.2:
        urgency = "high"
    elif confidence < 0.3:
        urgency = "medium"

    return JudgmentResponse(
        recommended_action="escalate",
        confidence=0.5,
        reasoning=(
            f"Confidence {confidence:.1%} < {self.retry_threshold:.1%} "
            f"threshold - escalating for human decision"
        ),
        escalation_urgency=urgency,
        human_question=(
            f"Sheet {query.sheet_num} has low confidence ({confidence:.0%}) "
            f"after {retry_count} attempts. How should we proceed?"
        ),
        patterns_learned=["low_confidence_escalation"],
    )

MigrationResult dataclass

MigrationResult(errors=list(), skipped_workspaces=list(), imported_workspaces=list(), workspaces_found=0, outcomes_imported=0, patterns_detected=0)

Result of a migration operation.

Attributes:

Name Type Description
workspaces_found int

Number of workspaces with outcomes.

outcomes_imported int

Total outcomes imported.

patterns_detected int

Patterns detected from imported outcomes.

errors list[str]

Any errors encountered during migration.

skipped_workspaces list[str]

Workspaces skipped (already imported, etc.).

imported_workspaces list[str]

Workspaces successfully imported.

OutcomeMigrator

OutcomeMigrator(global_store, aggregator=None)

Migrates workspace-local outcomes to the global store.

This migrator scans for existing .marianne-outcomes.json files and imports their contents into the global SQLite database, enabling cross-workspace learning from historical data.

Migration is: - Non-destructive: Original files are preserved - Idempotent: Already-imported outcomes are skipped - Pattern-aware: Runs pattern detection after import

Usage

migrator = OutcomeMigrator(global_store) result = migrator.migrate_all() print(f"Imported {result.outcomes_imported} outcomes")

Initialize the outcome migrator.

Parameters:

Name Type Description Default
global_store GlobalLearningStore

Global learning store to import into.

required
aggregator PatternAggregator | None

Optional pattern aggregator for pattern detection.

None
Source code in src/marianne/learning/migration.py
def __init__(
    self,
    global_store: "GlobalLearningStore",
    aggregator: "PatternAggregator | None" = None,
) -> None:
    """Initialize the outcome migrator.

    Args:
        global_store: Global learning store to import into.
        aggregator: Optional pattern aggregator for pattern detection.
    """
    self._store = global_store
    self._aggregator = aggregator

    # Track imported workspaces to avoid duplicates
    self._imported_workspace_hashes: set[str] = set()
Functions
migrate_all
migrate_all(scan_patterns=None, additional_paths=None)

Migrate all discoverable workspace-local outcomes.

Scans standard locations plus any additional paths for .marianne-outcomes.json files and imports them.

Parameters:

Name Type Description Default
scan_patterns list[str] | None

Glob patterns to scan (defaults to standard locations).

None
additional_paths list[Path] | None

Additional specific paths to scan.

None

Returns:

Type Description
MigrationResult

MigrationResult with import statistics.

Source code in src/marianne/learning/migration.py
def migrate_all(
    self,
    scan_patterns: list[str] | None = None,
    additional_paths: list[Path] | None = None,
) -> MigrationResult:
    """Migrate all discoverable workspace-local outcomes.

    Scans standard locations plus any additional paths for
    .marianne-outcomes.json files and imports them.

    Args:
        scan_patterns: Glob patterns to scan (defaults to standard locations).
        additional_paths: Additional specific paths to scan.

    Returns:
        MigrationResult with import statistics.
    """
    result = MigrationResult()
    patterns = scan_patterns or DEFAULT_SCAN_PATTERNS

    # Collect all outcome files to migrate
    outcome_files: list[Path] = []

    # Expand glob patterns
    for pattern in patterns:
        expanded = Path(pattern).expanduser()
        if "*" in str(expanded):
            # It's a glob pattern - find the first non-glob parent
            # e.g., "./*-workspace/.marianne-outcomes.json" -> glob from "."
            pattern_str = str(expanded)
            parts = pattern_str.split("/")
            base_parts: list[str] = []
            glob_parts: list[str] = []
            in_glob = False
            for part in parts:
                if "*" in part or in_glob:
                    in_glob = True
                    glob_parts.append(part)
                else:
                    base_parts.append(part)

            base_path = Path("/".join(base_parts)) if base_parts else Path(".")
            glob_pattern = "/".join(glob_parts)

            if base_path.exists():
                outcome_files.extend(base_path.glob(glob_pattern))
        elif expanded.exists() and expanded.is_file():
            outcome_files.append(expanded)

    # Add additional paths
    if additional_paths:
        for path in additional_paths:
            if path.exists() and path.is_file():
                outcome_files.append(path)

    # Deduplicate by resolved path
    unique_files = list({f.resolve() for f in outcome_files})
    result.workspaces_found = len(unique_files)

    _logger.info("migration_files_found", count=len(unique_files))

    # Import each file
    for outcome_file in unique_files:
        try:
            imported = self._migrate_file(outcome_file)
            if imported > 0:
                result.outcomes_imported += imported
                result.imported_workspaces.append(str(outcome_file.parent))
            else:
                result.skipped_workspaces.append(str(outcome_file.parent))
        except Exception as e:
            error_msg = f"Error migrating {outcome_file}: {e}"
            _logger.warning(error_msg)
            result.errors.append(error_msg)

    # Run pattern detection on imported data if aggregator available
    if self._aggregator and result.outcomes_imported > 0:
        try:
            detected_count: int = self._detect_patterns_from_store()
            result.patterns_detected = detected_count
        except Exception as e:
            _logger.warning("pattern_detection_error", error=str(e), exc_info=True)
            result.errors.append(f"Pattern detection error: {e}")

    _logger.info(
        "migration_complete",
        outcomes=result.outcomes_imported,
        patterns=result.patterns_detected,
    )

    return result
migrate_workspace
migrate_workspace(workspace_path)

Migrate a single workspace's outcomes.

Parameters:

Name Type Description Default
workspace_path Path

Path to the workspace directory.

required

Returns:

Type Description
MigrationResult

MigrationResult for this workspace.

Source code in src/marianne/learning/migration.py
def migrate_workspace(self, workspace_path: Path) -> MigrationResult:
    """Migrate a single workspace's outcomes.

    Args:
        workspace_path: Path to the workspace directory.

    Returns:
        MigrationResult for this workspace.
    """
    result = MigrationResult()

    # Look for outcome file in workspace
    outcome_file = workspace_path / ".marianne-outcomes.json"
    if not outcome_file.exists():
        result.errors.append(f"No outcomes file found in {workspace_path}")
        return result

    result.workspaces_found = 1

    try:
        imported = self._migrate_file(outcome_file)
        result.outcomes_imported = imported
        if imported > 0:
            result.imported_workspaces.append(str(workspace_path))
        else:
            result.skipped_workspaces.append(str(workspace_path))
    except Exception as e:
        _logger.warning(
            "workspace_migration_error",
            workspace=str(workspace_path),
            error=str(e),
            exc_info=True,
        )
        result.errors.append(f"Error: {e}")

    return result

JsonOutcomeStore

JsonOutcomeStore(store_path)

JSON-file based outcome store implementation.

Stores outcomes in a JSON file with atomic saves, following the same pattern as JsonStateBackend.

Initialize the JSON outcome store.

Parameters:

Name Type Description Default
store_path Path

Path to the JSON file for storing outcomes.

required
Source code in src/marianne/learning/outcomes.py
def __init__(self, store_path: Path) -> None:
    """Initialize the JSON outcome store.

    Args:
        store_path: Path to the JSON file for storing outcomes.
    """
    self.store_path = store_path
    self._outcomes: list[SheetOutcome] = []
    self._loaded: bool = False
    self._cached_patterns: list[DetectedPattern] | None = None
    self._patterns_outcome_count: int = 0
Functions
record async
record(outcome)

Record a sheet outcome to the store.

After recording, if there are enough outcomes (>= 5), pattern detection is run and patterns_detected is populated on the outcome.

Parameters:

Name Type Description Default
outcome SheetOutcome

The sheet outcome to record.

required
Source code in src/marianne/learning/outcomes.py
async def record(self, outcome: SheetOutcome) -> None:
    """Record a sheet outcome to the store.

    After recording, if there are enough outcomes (>= 5), pattern
    detection is run and patterns_detected is populated on the outcome.

    Args:
        outcome: The sheet outcome to record.
    """
    # Load existing outcomes first to avoid overwriting on fresh store instance
    await self._load()

    # Check if this sheet already has an outcome (update rather than duplicate)
    existing_idx = next(
        (
            i for i, existing in enumerate(self._outcomes)
            if existing.sheet_id == outcome.sheet_id
        ),
        None,
    )
    if existing_idx is not None:
        self._outcomes[existing_idx] = outcome
    else:
        self._outcomes.append(outcome)

    # Invalidate cached patterns when outcomes change
    self._cached_patterns = None

    # Detect patterns after accumulating enough data
    if len(self._outcomes) >= 5:
        patterns = await self.detect_patterns()
        # Populate patterns_detected with top pattern descriptions
        outcome.patterns_detected = [
            p.to_prompt_guidance() for p in patterns[:3]
        ]

    await self._save()
query_similar async
query_similar(context, limit=10)

Query for similar past outcomes.

Currently returns recent outcomes for the same job_id. Future: implement semantic similarity matching.

Parameters:

Name Type Description Default
context dict[str, Any]

Context dict containing job_id and other metadata.

required
limit int

Maximum number of outcomes to return.

10

Returns:

Type Description
list[SheetOutcome]

List of similar sheet outcomes.

Source code in src/marianne/learning/outcomes.py
async def query_similar(
    self, context: dict[str, Any], limit: int = 10
) -> list[SheetOutcome]:
    """Query for similar past outcomes.

    Currently returns recent outcomes for the same job_id.
    Future: implement semantic similarity matching.

    Args:
        context: Context dict containing job_id and other metadata.
        limit: Maximum number of outcomes to return.

    Returns:
        List of similar sheet outcomes.
    """
    await self._load()
    job_id = context.get("job_id")
    if not job_id:
        return self._outcomes[-limit:]

    matching = [outcome for outcome in self._outcomes if outcome.job_id == job_id]
    return matching[-limit:]
get_patterns async
get_patterns(job_name)

Get detected patterns for a job.

Parameters:

Name Type Description Default
job_name str

The job name to get patterns for.

required

Returns:

Type Description
list[str]

List of pattern strings detected across outcomes.

Source code in src/marianne/learning/outcomes.py
async def get_patterns(self, job_name: str) -> list[str]:
    """Get detected patterns for a job.

    Args:
        job_name: The job name to get patterns for.

    Returns:
        List of pattern strings detected across outcomes.
    """
    await self._load()
    patterns: set[str] = set()
    for outcome in self._outcomes:
        if outcome.job_id == job_name:
            patterns.update(outcome.patterns_detected)
    return list(patterns)
detect_patterns async
detect_patterns()

Detect patterns from all recorded outcomes.

Uses PatternDetector to analyze historical outcomes and identify recurring patterns that can inform future executions. Results are cached and invalidated when outcomes change (record/load).

Returns:

Type Description
list[DetectedPattern]

List of DetectedPattern objects sorted by confidence.

Source code in src/marianne/learning/outcomes.py
async def detect_patterns(self) -> list["DetectedPattern"]:
    """Detect patterns from all recorded outcomes.

    Uses PatternDetector to analyze historical outcomes and
    identify recurring patterns that can inform future executions.
    Results are cached and invalidated when outcomes change (record/load).

    Returns:
        List of DetectedPattern objects sorted by confidence.
    """
    from marianne.learning.patterns import PatternDetector

    await self._load()
    if not self._outcomes:
        return []

    # Return cached patterns if outcome list hasn't changed
    if (
        self._cached_patterns is not None
        and self._patterns_outcome_count == len(self._outcomes)
    ):
        return self._cached_patterns

    detector = PatternDetector(self._outcomes)
    self._cached_patterns = detector.detect_all()
    self._patterns_outcome_count = len(self._outcomes)
    return self._cached_patterns
get_relevant_patterns async
get_relevant_patterns(context, limit=5)

Get pattern descriptions relevant to the given context.

This method detects patterns, matches them to the context, and returns human-readable descriptions suitable for prompt injection.

Parameters:

Name Type Description Default
context dict[str, Any]

Context dict containing job_id, sheet_num, validation_types, etc.

required
limit int

Maximum number of patterns to return.

5

Returns:

Type Description
list[str]

List of pattern description strings for prompt injection.

Source code in src/marianne/learning/outcomes.py
async def get_relevant_patterns(
    self,
    context: dict[str, Any],
    limit: int = 5,
) -> list[str]:
    """Get pattern descriptions relevant to the given context.

    This method detects patterns, matches them to the context,
    and returns human-readable descriptions suitable for prompt injection.

    Args:
        context: Context dict containing job_id, sheet_num, validation_types, etc.
        limit: Maximum number of patterns to return.

    Returns:
        List of pattern description strings for prompt injection.
    """
    from marianne.learning.patterns import PatternApplicator, PatternMatcher

    # Detect all patterns
    all_patterns = await self.detect_patterns()
    if not all_patterns:
        return []

    # Match patterns to context
    matcher = PatternMatcher(all_patterns)
    matched = matcher.match(context, limit=limit)

    # Convert to prompt-ready descriptions
    applicator = PatternApplicator(matched)
    return applicator.get_pattern_descriptions()

OutcomeStore

Bases: Protocol

Protocol for outcome storage backends.

Provides async methods for recording outcomes and querying for similar past outcomes to inform execution decisions.

Functions
record async
record(outcome)

Record a sheet outcome for future learning.

Source code in src/marianne/learning/outcomes.py
async def record(self, outcome: SheetOutcome) -> None:
    """Record a sheet outcome for future learning."""
    ...
query_similar async
query_similar(context, limit=10)

Query for similar past outcomes based on context.

Source code in src/marianne/learning/outcomes.py
async def query_similar(
    self, context: dict[str, Any], limit: int = 10
) -> list[SheetOutcome]:
    """Query for similar past outcomes based on context."""
    ...
get_patterns async
get_patterns(job_name)

Get detected patterns for a specific job.

Source code in src/marianne/learning/outcomes.py
async def get_patterns(self, job_name: str) -> list[str]:
    """Get detected patterns for a specific job."""
    ...
get_relevant_patterns async
get_relevant_patterns(context, limit=5)

Get pattern descriptions relevant to the given context.

Source code in src/marianne/learning/outcomes.py
async def get_relevant_patterns(
    self, context: dict[str, Any], limit: int = 5
) -> list[str]:
    """Get pattern descriptions relevant to the given context."""
    ...

SheetOutcome dataclass

SheetOutcome(sheet_id, job_id, validation_results, execution_duration, retry_count, completion_mode_used, final_status, validation_pass_rate, success_without_retry, patterns_detected=list(), timestamp=(lambda: now(tz=UTC))(), failure_category_counts=dict(), semantic_patterns=list(), fix_suggestions=list(), patterns_applied=list(), stdout_tail='', stderr_tail='', error_history=list(), grounding_passed=None, grounding_confidence=None, grounding_guidance=None)

Structured outcome data from a completed sheet execution.

This dataclass captures all relevant information about a sheet execution for learning and pattern detection purposes.

Attributes
failure_category_counts class-attribute instance-attribute
failure_category_counts = field(default_factory=dict)

Counts of each failure category from validation results.

Example: {'missing': 2, 'stale': 1, 'malformed': 0} Categories: missing, malformed, incomplete, stale, error

semantic_patterns class-attribute instance-attribute
semantic_patterns = field(default_factory=list)

Extracted semantic patterns from failure_reason fields.

Example: ['file not created', 'pattern not found', 'content empty'] These are normalized phrases that can be aggregated across outcomes.

fix_suggestions class-attribute instance-attribute
fix_suggestions = field(default_factory=list)

Collected suggested_fix values from failed validations.

Example: ['Ensure file is created in workspace/', 'Add missing import']

patterns_applied class-attribute instance-attribute
patterns_applied = field(default_factory=list)

Pattern descriptions that were applied/injected for this sheet execution.

These are the patterns from get_relevant_patterns() that were included in the prompt. Used for effectiveness tracking: correlate patterns_applied with success_without_retry to measure which patterns actually help.

Example: ['⚠️ Common issue: file_exists validation tends to fail (seen 3x)']

stdout_tail class-attribute instance-attribute
stdout_tail = ''

Captured tail of stdout from execution.

Stores the last N characters of stdout for pattern extraction. Used by OutputPatternExtractor to detect error patterns.

stderr_tail class-attribute instance-attribute
stderr_tail = ''

Captured tail of stderr from execution.

Stores the last N characters of stderr for pattern extraction. Used by OutputPatternExtractor to detect error patterns.

error_history class-attribute instance-attribute
error_history = field(default_factory=list)

History of errors encountered during execution.

Each entry is a dict with error_code, category, message, etc. Used by _detect_error_code_patterns() for recurring error analysis.

Example: [{'error_code': 'E009', 'category': 'transient', 'message': 'Rate limited'}]

grounding_passed class-attribute instance-attribute
grounding_passed = None

Whether all grounding hooks passed (None if grounding not enabled).

Used to correlate grounding outcomes with validation results and pattern effectiveness over time.

grounding_confidence class-attribute instance-attribute
grounding_confidence = None

Average confidence across grounding hooks (0.0-1.0, None if not enabled).

Higher confidence means external validators had high certainty in their results. Can be correlated with success_without_retry to identify reliable grounding sources.

grounding_guidance class-attribute instance-attribute
grounding_guidance = None

Recovery guidance from failed grounding hooks (None if passed or not enabled).

Captures actionable suggestions from external validators that failed, useful for pattern detection and learning what recovery strategies work.

DetectedPattern dataclass

DetectedPattern(pattern_type, description, frequency=1, success_rate=0.0, last_seen=(lambda: now(tz=UTC))(), context_tags=list(), evidence=list(), confidence=0.5, applications=0, successes_after_application=0, quarantine_status=None, trust_score=None)

A pattern detected from historical outcomes.

Patterns are learned behaviors that can inform future executions. They include both positive patterns (what works) and negative patterns (what to avoid).

v19 Evolution: Extended with optional quarantine_status and trust_score fields for integration with Pattern Quarantine & Trust Scoring features.

Attributes
description instance-attribute
description

Human-readable description of what this pattern represents.

frequency class-attribute instance-attribute
frequency = 1

How often this pattern has been observed.

success_rate class-attribute instance-attribute
success_rate = 0.0

Rate at which this pattern leads to success (0.0-1.0).

last_seen class-attribute instance-attribute
last_seen = field(default_factory=lambda: now(tz=UTC))

When this pattern was last observed (UTC).

context_tags class-attribute instance-attribute
context_tags = field(default_factory=list)

Tags for matching: job types, validation types, error categories.

evidence class-attribute instance-attribute
evidence = field(default_factory=list)

Sheet IDs that contributed to detecting this pattern.

confidence class-attribute instance-attribute
confidence = 0.5

Confidence in this pattern (0.0-1.0). Higher = more reliable.

applications class-attribute instance-attribute
applications = 0

Number of times this pattern was applied (included in prompts).

successes_after_application class-attribute instance-attribute
successes_after_application = 0

Number of success_without_retry outcomes when this pattern was applied.

quarantine_status class-attribute instance-attribute
quarantine_status = None

Quarantine status from global store.

trust_score class-attribute instance-attribute
trust_score = None

Trust score (0.0-1.0) from global store. None if not from global store.

effectiveness_rate property
effectiveness_rate

Compute effectiveness rate from applications and successes.

Returns:

Type Description
float

Effectiveness rate (0.0-1.0). Returns 0.4 (slightly below neutral)

float

when applications < 3 to prefer proven patterns over unproven ones.

float

This prevents unproven patterns from being treated equally with

float

patterns that have demonstrated moderate (50%) success.

effectiveness_weight property
effectiveness_weight

Compute weight for blending effectiveness into relevance scoring.

Uses gradual ramp-up: full weight only after 5 applications. This prevents new patterns from being over-weighted.

Returns:

Type Description
float

Weight (0.0-1.0) based on sample size.

is_quarantined property
is_quarantined

Check if pattern is in quarantine status.

v19 Evolution: Used for quarantine-aware scoring.

is_validated property
is_validated

Check if pattern is in validated status.

v19 Evolution: Used for trust-aware scoring.

Functions
to_prompt_guidance
to_prompt_guidance()

Format this pattern as guidance for prompts.

v19 Evolution: Now includes quarantine/trust context when available.

Returns:

Type Description
str

A concise string suitable for injection into prompts.

Source code in src/marianne/learning/patterns.py
def to_prompt_guidance(self) -> str:
    """Format this pattern as guidance for prompts.

    v19 Evolution: Now includes quarantine/trust context when available.

    Returns:
        A concise string suitable for injection into prompts.
    """
    # v19: Add trust indicator if available
    trust_indicator = ""
    if self.trust_score is not None:
        if self.trust_score >= HIGH_TRUST_THRESHOLD:
            trust_indicator = " [High trust]"
        elif self.trust_score <= LOW_TRUST_THRESHOLD:
            trust_indicator = " [Low trust]"
        else:
            trust_indicator = f" [Trust: {self.trust_score:.0%}]"

    # v19: Add quarantine warning if applicable
    if self.is_quarantined:
        return f"⚠️ [QUARANTINED] {self.description}{trust_indicator}"

    template = _GUIDANCE_TEMPLATES.get(self.pattern_type)
    if template is None:
        return f"{self.description}{trust_indicator}"

    return template.format(
        desc=self.description,
        freq=self.frequency,
        rate=f"{self.success_rate:.0%}",
        trust=trust_indicator,
    )

PatternApplicator

PatternApplicator(patterns)

Applies patterns to modify prompts for better execution.

Takes matched patterns and generates prompt modifications that incorporate learned insights.

Initialize the pattern applicator.

Parameters:

Name Type Description Default
patterns list[DetectedPattern]

List of patterns to apply.

required
Source code in src/marianne/learning/patterns.py
def __init__(self, patterns: list[DetectedPattern]) -> None:
    """Initialize the pattern applicator.

    Args:
        patterns: List of patterns to apply.
    """
    self.patterns = patterns
Functions
generate_prompt_section
generate_prompt_section()

Generate a prompt section from patterns.

Returns:

Type Description
str

Formatted markdown section for prompt injection.

Source code in src/marianne/learning/patterns.py
def generate_prompt_section(self) -> str:
    """Generate a prompt section from patterns.

    Returns:
        Formatted markdown section for prompt injection.
    """
    if not self.patterns:
        return ""

    lines = ["## Learned Patterns", ""]
    lines.append(
        "Based on previous executions, here are relevant insights:"
    )
    lines.append("")

    for i, pattern in enumerate(self.patterns[:MAX_PROMPT_PATTERNS], 1):
        guidance = pattern.to_prompt_guidance()
        lines.append(f"{i}. {guidance}")

    lines.append("")
    lines.append("Consider these patterns when executing this sheet.")
    lines.append("")

    return "\n".join(lines)
get_pattern_descriptions
get_pattern_descriptions()

Get pattern descriptions as a list of strings.

Returns:

Type Description
list[str]

List of pattern guidance strings.

Source code in src/marianne/learning/patterns.py
def get_pattern_descriptions(self) -> list[str]:
    """Get pattern descriptions as a list of strings.

    Returns:
        List of pattern guidance strings.
    """
    return [p.to_prompt_guidance() for p in self.patterns[:MAX_PROMPT_PATTERNS]]

PatternDetector

PatternDetector(outcomes)

Detects patterns from historical sheet outcomes.

Analyzes a collection of SheetOutcome objects to identify recurring patterns that can inform future executions.

Initialize the pattern detector.

Parameters:

Name Type Description Default
outcomes list[SheetOutcome]

List of historical sheet outcomes to analyze.

required
Source code in src/marianne/learning/patterns.py
def __init__(self, outcomes: list["SheetOutcome"]) -> None:
    """Initialize the pattern detector.

    Args:
        outcomes: List of historical sheet outcomes to analyze.
    """
    self.outcomes = outcomes
Functions
detect_all
detect_all()

Detect all pattern types from outcomes.

Returns:

Type Description
list[DetectedPattern]

List of detected patterns sorted by confidence.

Source code in src/marianne/learning/patterns.py
def detect_all(self) -> list[DetectedPattern]:
    """Detect all pattern types from outcomes.

    Returns:
        List of detected patterns sorted by confidence.
    """
    patterns: list[DetectedPattern] = []

    # Detect various pattern types
    patterns.extend(self._detect_validation_patterns())
    patterns.extend(self._detect_retry_patterns())
    patterns.extend(self._detect_completion_patterns())
    patterns.extend(self._detect_success_patterns())
    patterns.extend(self._detect_confidence_patterns())
    patterns.extend(self._detect_semantic_patterns())
    patterns.extend(self._detect_error_code_patterns())

    # Calculate effectiveness for each pattern from outcomes
    self._calculate_effectiveness(patterns)

    # Sort by confidence (highest first)
    patterns.sort(key=lambda p: p.confidence, reverse=True)

    return patterns
calculate_success_rate staticmethod
calculate_success_rate(outcomes)

Calculate overall success rate from outcomes.

Success is defined as validation_pass_rate == 1.0 (all validations passed). Partial passes (e.g., 0.5) are counted as failures.

Parameters:

Name Type Description Default
outcomes list[SheetOutcome]

List of sheet outcomes.

required

Returns:

Type Description
float

Success rate as a float (0.0-1.0).

Source code in src/marianne/learning/patterns.py
@staticmethod
def calculate_success_rate(outcomes: list["SheetOutcome"]) -> float:
    """Calculate overall success rate from outcomes.

    Success is defined as validation_pass_rate == 1.0 (all validations
    passed). Partial passes (e.g., 0.5) are counted as failures.

    Args:
        outcomes: List of sheet outcomes.

    Returns:
        Success rate as a float (0.0-1.0).
    """
    if not outcomes:
        return 0.0

    successful = sum(1 for outcome in outcomes if outcome.validation_pass_rate == 1.0)
    return successful / len(outcomes)

PatternMatcher

PatternMatcher(patterns)

Matches detected patterns to execution context.

Given a set of detected patterns and a current execution context, finds patterns that are relevant to the current situation.

Initialize the pattern matcher.

Parameters:

Name Type Description Default
patterns list[DetectedPattern]

List of detected patterns to match against.

required
Source code in src/marianne/learning/patterns.py
def __init__(self, patterns: list[DetectedPattern]) -> None:
    """Initialize the pattern matcher.

    Args:
        patterns: List of detected patterns to match against.
    """
    self.patterns = patterns
Functions
match
match(context, limit=5)

Find patterns relevant to the given context.

Parameters:

Name Type Description Default
context dict[str, Any]

Context dict with job_id, sheet_num, validation_types, etc.

required
limit int

Maximum number of patterns to return.

5

Returns:

Type Description
list[DetectedPattern]

List of matching patterns sorted by relevance.

Source code in src/marianne/learning/patterns.py
def match(
    self,
    context: dict[str, Any],
    limit: int = 5,
) -> list[DetectedPattern]:
    """Find patterns relevant to the given context.

    Args:
        context: Context dict with job_id, sheet_num, validation_types, etc.
        limit: Maximum number of patterns to return.

    Returns:
        List of matching patterns sorted by relevance.
    """
    scored_patterns: list[tuple[float, DetectedPattern]] = []

    for pattern in self.patterns:
        score = self._score_relevance(pattern, context)
        if score > 0:
            scored_patterns.append((score, pattern))

    # Sort by score (highest first) and return top N
    scored_patterns.sort(key=lambda x: x[0], reverse=True)
    return [p for _, p in scored_patterns[:limit]]

PatternType

Bases: Enum

Types of patterns that can be detected from outcomes.

Attributes
VALIDATION_FAILURE class-attribute instance-attribute
VALIDATION_FAILURE = 'validation_failure'

Recurring validation failure pattern (e.g., file not created).

RETRY_SUCCESS class-attribute instance-attribute
RETRY_SUCCESS = 'retry_success'

Pattern where retry succeeds after specific failure.

COMPLETION_MODE class-attribute instance-attribute
COMPLETION_MODE = 'completion_mode'

Pattern where completion mode is effective.

SUCCESS_WITHOUT_RETRY class-attribute instance-attribute
SUCCESS_WITHOUT_RETRY = 'first_attempt_success'

Pattern of success without retry (positive pattern).

HIGH_CONFIDENCE class-attribute instance-attribute
HIGH_CONFIDENCE = 'high_confidence'

Pattern with high validation confidence.

LOW_CONFIDENCE class-attribute instance-attribute
LOW_CONFIDENCE = 'low_confidence'

Pattern with low validation confidence (needs attention).

SEMANTIC_FAILURE class-attribute instance-attribute
SEMANTIC_FAILURE = 'semantic_failure'

Pattern detected from semantic failure_reason/failure_category analysis.

These patterns are extracted from the failure_reason and failure_category fields in ValidationResult, providing deeper insight into WHY failures occur. Examples: - 'stale' category appearing frequently (files not modified) - 'file not created' reason appearing across multiple sheets

OUTPUT_PATTERN class-attribute instance-attribute
OUTPUT_PATTERN = 'output_pattern'

Pattern extracted from stdout/stderr output during execution.

These patterns are detected by analyzing the raw output text for common error signatures, stack traces, and failure indicators. Useful for learning from execution-level failures that may not be captured by validation.

SEMANTIC_INSIGHT class-attribute instance-attribute
SEMANTIC_INSIGHT = 'semantic_insight'

Pattern generated by LLM-based semantic analysis of sheet completions.

These patterns are produced by the conductor's SemanticAnalyzer, which examines sheet output, validation results, and error history to generate deeper insights about why executions succeed or fail. Unlike statistically detected patterns, semantic insights capture nuanced reasoning about prompt effectiveness, agent behavior, and anti-patterns.

RESOURCE_ANOMALY class-attribute instance-attribute
RESOURCE_ANOMALY = 'resource_anomaly'

Resource anomaly detected during execution (memory spike, zombie, OOM).

These patterns are produced by the profiler's AnomalyDetector, which runs heuristic checks on each system snapshot. No LLM calls — pure threshold-based detection of memory spikes, runaway processes, zombies, FD exhaustion, and memory pressure.

RESOURCE_CORRELATION class-attribute instance-attribute
RESOURCE_CORRELATION = 'resource_correlation'

Learned correlation between resource usage and outcomes.

These patterns are produced by the profiler's CorrelationAnalyzer, which periodically cross-references resource profiles of completed jobs with their outcomes (success/failure, validation results) to identify statistical patterns like high-RSS-predicts-failure.

PatternWeighter

PatternWeighter(*, decay_rate_per_month=DEFAULT_DECAY_RATE_PER_MONTH, effectiveness_threshold=DEFAULT_EFFECTIVENESS_THRESHOLD, epistemic_threshold=DEFAULT_EPISTEMIC_THRESHOLD, min_applications_for_effectiveness=DEFAULT_MIN_APPLICATIONS, frequency_normalization_base=DEFAULT_FREQUENCY_BASE)

Calculates priority scores for patterns.

Implements the combined recency + effectiveness weighting algorithm as specified in the Movement III design document.

The priority formula is

priority = ( effectiveness_score × recency_factor × frequency_factor × (1 - variance) )

Where
  • effectiveness_score = successes / (successes + failures + 1)
  • recency_factor = (1 - decay_rate) ^ months_since_last_confirmed
  • frequency_factor = min(1.0, log(occurrence_count + 1) / log(100))
  • variance = std_dev(pattern_application_outcomes)

Initialize the pattern weighter.

Parameters:

Name Type Description Default
decay_rate_per_month float

Fraction of priority lost per month.

DEFAULT_DECAY_RATE_PER_MONTH
effectiveness_threshold float

Below this, patterns are deprecated.

DEFAULT_EFFECTIVENESS_THRESHOLD
epistemic_threshold float

Variance threshold for learnable patterns.

DEFAULT_EPISTEMIC_THRESHOLD
min_applications_for_effectiveness int

Min applications before using actual rate.

DEFAULT_MIN_APPLICATIONS
frequency_normalization_base int

Log base for frequency factor.

DEFAULT_FREQUENCY_BASE
Source code in src/marianne/learning/weighter.py
def __init__(
    self,
    *,
    decay_rate_per_month: float = DEFAULT_DECAY_RATE_PER_MONTH,
    effectiveness_threshold: float = DEFAULT_EFFECTIVENESS_THRESHOLD,
    epistemic_threshold: float = DEFAULT_EPISTEMIC_THRESHOLD,
    min_applications_for_effectiveness: int = DEFAULT_MIN_APPLICATIONS,
    frequency_normalization_base: int = DEFAULT_FREQUENCY_BASE,
) -> None:
    """Initialize the pattern weighter.

    Args:
        decay_rate_per_month: Fraction of priority lost per month.
        effectiveness_threshold: Below this, patterns are deprecated.
        epistemic_threshold: Variance threshold for learnable patterns.
        min_applications_for_effectiveness: Min applications before using actual rate.
        frequency_normalization_base: Log base for frequency factor.
    """
    self.decay_rate_per_month = decay_rate_per_month
    self.effectiveness_threshold = effectiveness_threshold
    self.epistemic_threshold = epistemic_threshold
    self.min_applications_for_effectiveness = min_applications_for_effectiveness
    self.frequency_normalization_base = frequency_normalization_base
    self.frequency_floor = DEFAULT_FREQUENCY_FLOOR
Functions
calculate_priority
calculate_priority(occurrence_count, led_to_success_count, led_to_failure_count, last_confirmed, variance=0.0, now=None)

Calculate the priority score for a pattern.

Parameters:

Name Type Description Default
occurrence_count int

How many times this pattern was observed.

required
led_to_success_count int

Times pattern led to success when applied.

required
led_to_failure_count int

Times pattern led to failure when applied.

required
last_confirmed datetime

When this pattern was last confirmed effective.

required
variance float

Standard deviation of pattern application outcomes.

0.0
now datetime | None

Current time for recency calculation (defaults to now).

None

Returns:

Type Description
float

Priority score from 0.0 to 1.0.

Source code in src/marianne/learning/weighter.py
def calculate_priority(
    self,
    occurrence_count: int,
    led_to_success_count: int,
    led_to_failure_count: int,
    last_confirmed: datetime,
    variance: float = 0.0,
    now: datetime | None = None,
) -> float:
    """Calculate the priority score for a pattern.

    Args:
        occurrence_count: How many times this pattern was observed.
        led_to_success_count: Times pattern led to success when applied.
        led_to_failure_count: Times pattern led to failure when applied.
        last_confirmed: When this pattern was last confirmed effective.
        variance: Standard deviation of pattern application outcomes.
        now: Current time for recency calculation (defaults to now).

    Returns:
        Priority score from 0.0 to 1.0.
    """
    now = now or datetime.now()

    # Calculate effectiveness score
    effectiveness = self.calculate_effectiveness(
        led_to_success_count,
        led_to_failure_count,
    )

    # Calculate recency factor with exponential decay
    recency = self.calculate_recency_factor(last_confirmed, now)

    # Calculate frequency factor
    frequency = self.calculate_frequency_factor(occurrence_count)

    # Calculate variance penalty
    variance_factor = 1.0 - min(1.0, variance)

    # Combined priority score
    priority = effectiveness * recency * frequency * variance_factor

    return max(0.0, min(1.0, priority))
calculate_effectiveness
calculate_effectiveness(success_count, failure_count)

Calculate effectiveness score from success/failure counts.

Uses Laplace smoothing (add-one) to handle cold start: effectiveness = (successes + 0.5) / (successes + failures + 1)

This gives a slightly optimistic prior (0.5) for patterns with no application history.

Parameters:

Name Type Description Default
success_count int

Number of successful outcomes after application.

required
failure_count int

Number of failed outcomes after application.

required

Returns:

Type Description
float

Effectiveness score from 0.0 to 1.0.

Source code in src/marianne/learning/weighter.py
def calculate_effectiveness(
    self,
    success_count: int,
    failure_count: int,
) -> float:
    """Calculate effectiveness score from success/failure counts.

    Uses Laplace smoothing (add-one) to handle cold start:
    effectiveness = (successes + 0.5) / (successes + failures + 1)

    This gives a slightly optimistic prior (0.5) for patterns with
    no application history.

    Args:
        success_count: Number of successful outcomes after application.
        failure_count: Number of failed outcomes after application.

    Returns:
        Effectiveness score from 0.0 to 1.0.
    """
    total = success_count + failure_count
    if total < self.min_applications_for_effectiveness:
        # Not enough data, return neutral prior
        return 0.5

    # Laplace smoothing
    return (success_count + 0.5) / (total + 1)
calculate_recency_factor
calculate_recency_factor(last_confirmed, now=None)

Calculate recency factor with exponential decay.

Formula: recency = (1 - decay_rate) ^ months_since_last_confirmed

With default 10% decay, a pattern loses: - 10% after 1 month - 19% after 2 months - 27% after 3 months - 35% after 4 months - etc.

Parameters:

Name Type Description Default
last_confirmed datetime

When the pattern was last confirmed effective.

required
now datetime | None

Current time (defaults to now).

None

Returns:

Type Description
float

Recency factor from 0.0 to 1.0.

Source code in src/marianne/learning/weighter.py
def calculate_recency_factor(
    self,
    last_confirmed: datetime,
    now: datetime | None = None,
) -> float:
    """Calculate recency factor with exponential decay.

    Formula: recency = (1 - decay_rate) ^ months_since_last_confirmed

    With default 10% decay, a pattern loses:
    - 10% after 1 month
    - 19% after 2 months
    - 27% after 3 months
    - 35% after 4 months
    - etc.

    Args:
        last_confirmed: When the pattern was last confirmed effective.
        now: Current time (defaults to now).

    Returns:
        Recency factor from 0.0 to 1.0.
    """
    now = now or datetime.now()
    delta = now - last_confirmed
    months = delta.days / 30.0

    # Exponential decay
    decay_base = 1.0 - self.decay_rate_per_month
    recency: float = decay_base ** months

    return float(max(0.0, min(1.0, recency)))
calculate_frequency_factor
calculate_frequency_factor(occurrence_count)

Calculate frequency factor from occurrence count.

Formula: frequency = max(floor, min(1.0, log(count + 1) / log(100)))

This gives diminishing returns for high counts: - 1 occurrence: max(0.6, ~0.15) = 0.6 (floored) - 10 occurrences: max(0.6, ~0.52) = 0.6 (floored) - 50 occurrences: max(0.6, ~0.85) = 0.85 - 100+ occurrences: 1.0

The floor prevents single-occurrence patterns from being crushed below the exploitation query threshold. See issue #101 and the matching FREQUENCY_FACTOR_FLOOR in patterns_crud.py.

Parameters:

Name Type Description Default
occurrence_count int

How many times the pattern was observed.

required

Returns:

Type Description
float

Frequency factor from frequency_floor to 1.0.

Source code in src/marianne/learning/weighter.py
def calculate_frequency_factor(self, occurrence_count: int) -> float:
    """Calculate frequency factor from occurrence count.

    Formula: frequency = max(floor, min(1.0, log(count + 1) / log(100)))

    This gives diminishing returns for high counts:
    - 1 occurrence: max(0.6, ~0.15) = 0.6 (floored)
    - 10 occurrences: max(0.6, ~0.52) = 0.6 (floored)
    - 50 occurrences: max(0.6, ~0.85) = 0.85
    - 100+ occurrences: 1.0

    The floor prevents single-occurrence patterns from being crushed
    below the exploitation query threshold.  See issue #101 and the
    matching FREQUENCY_FACTOR_FLOOR in patterns_crud.py.

    Args:
        occurrence_count: How many times the pattern was observed.

    Returns:
        Frequency factor from ``frequency_floor`` to 1.0.
    """
    if occurrence_count <= 0:
        return self.frequency_floor

    base = self.frequency_normalization_base
    log_count = math.log(occurrence_count + 1)
    log_base = math.log(base)

    raw = min(1.0, log_count / log_base)
    return max(self.frequency_floor, raw)
is_deprecated
is_deprecated(led_to_success_count, led_to_failure_count)

Check if a pattern should be deprecated due to low effectiveness.

A pattern is deprecated if: 1. It has enough application data (>= min_applications) 2. Its effectiveness is below the threshold

Parameters:

Name Type Description Default
led_to_success_count int

Times pattern led to success.

required
led_to_failure_count int

Times pattern led to failure.

required

Returns:

Type Description
bool

True if pattern should be deprecated.

Source code in src/marianne/learning/weighter.py
def is_deprecated(
    self,
    led_to_success_count: int,
    led_to_failure_count: int,
) -> bool:
    """Check if a pattern should be deprecated due to low effectiveness.

    A pattern is deprecated if:
    1. It has enough application data (>= min_applications)
    2. Its effectiveness is below the threshold

    Args:
        led_to_success_count: Times pattern led to success.
        led_to_failure_count: Times pattern led to failure.

    Returns:
        True if pattern should be deprecated.
    """
    total = led_to_success_count + led_to_failure_count
    if total < self.min_applications_for_effectiveness:
        # Not enough data to deprecate
        return False

    effectiveness = self.calculate_effectiveness(
        led_to_success_count,
        led_to_failure_count,
    )

    return effectiveness < self.effectiveness_threshold
classify_uncertainty
classify_uncertainty(variance)

Classify the uncertainty type of a pattern.

Epistemic uncertainty (variance < threshold): The pattern behavior can be learned with more data. Keep tracking and applying.

Aleatoric uncertainty (variance >= threshold): The pattern has inherently unpredictable outcomes. Deprioritize but don't remove.

Parameters:

Name Type Description Default
variance float

The variance of pattern application outcomes.

required

Returns:

Type Description
str

'epistemic' or 'aleatoric'

Source code in src/marianne/learning/weighter.py
def classify_uncertainty(self, variance: float) -> str:
    """Classify the uncertainty type of a pattern.

    Epistemic uncertainty (variance < threshold): The pattern behavior
    can be learned with more data. Keep tracking and applying.

    Aleatoric uncertainty (variance >= threshold): The pattern has
    inherently unpredictable outcomes. Deprioritize but don't remove.

    Args:
        variance: The variance of pattern application outcomes.

    Returns:
        'epistemic' or 'aleatoric'
    """
    if variance < self.epistemic_threshold:
        return "epistemic"
    return "aleatoric"
calculate_variance
calculate_variance(outcomes)

Calculate variance from a list of boolean outcomes.

Used to track consistency of pattern applications.

Parameters:

Name Type Description Default
outcomes list[bool]

List of True (success) / False (failure) outcomes.

required

Returns:

Type Description
float

Variance from 0.0 (all same) to 0.25 (50/50 split).

Source code in src/marianne/learning/weighter.py
def calculate_variance(self, outcomes: list[bool]) -> float:
    """Calculate variance from a list of boolean outcomes.

    Used to track consistency of pattern applications.

    Args:
        outcomes: List of True (success) / False (failure) outcomes.

    Returns:
        Variance from 0.0 (all same) to 0.25 (50/50 split).
    """
    if len(outcomes) < 2:
        return 0.0

    mean = sum(1 if o else 0 for o in outcomes) / len(outcomes)
    # Need parentheses around the subtraction to get correct variance
    variance = sum(((1 if o else 0) - mean) ** 2 for o in outcomes) / len(outcomes)

    return variance

Functions

aggregate_job_outcomes

aggregate_job_outcomes(outcomes, workspace_path, global_store=None, model=None)

Convenience function to aggregate outcomes after job completion.

This is the main entry point for the aggregation system.

Parameters:

Name Type Description Default
outcomes list[SheetOutcome]

List of sheet outcomes from the completed job.

required
workspace_path Path

Path to the workspace for hashing.

required
global_store GlobalLearningStore | None

Optional global store (uses default if None).

None
model str | None

Optional model name used for execution.

None

Returns:

Type Description
AggregationResult

AggregationResult with statistics.

Source code in src/marianne/learning/aggregator.py
def aggregate_job_outcomes(
    outcomes: list[SheetOutcome],
    workspace_path: Path,
    global_store: GlobalLearningStore | None = None,
    model: str | None = None,
) -> AggregationResult:
    """Convenience function to aggregate outcomes after job completion.

    This is the main entry point for the aggregation system.

    Args:
        outcomes: List of sheet outcomes from the completed job.
        workspace_path: Path to the workspace for hashing.
        global_store: Optional global store (uses default if None).
        model: Optional model name used for execution.

    Returns:
        AggregationResult with statistics.
    """
    from marianne.learning.global_store import get_global_store

    store = global_store or get_global_store()
    aggregator = PatternAggregator(store)

    return aggregator.aggregate_outcomes(
        outcomes=outcomes,
        workspace_path=workspace_path,
        model=model,
    )

record_error_recovery

record_error_recovery(global_store, error, actual_wait, success, model=None)

Record an error recovery to the global store.

Convenience function for use in the runner when a recovery is attempted.

Parameters:

Name Type Description Default
global_store GlobalLearningStore | None

Global learning store (no-op if None).

required
error ClassifiedError | ClassificationResult

The error that was recovered from.

required
actual_wait float

Actual time waited in seconds.

required
success bool

Whether recovery succeeded.

required
model str | None

Optional model name.

None
Source code in src/marianne/learning/error_hooks.py
def record_error_recovery(
    global_store: "GlobalLearningStore | None",
    error: ClassifiedError | ClassificationResult,
    actual_wait: float,
    success: bool,
    model: str | None = None,
) -> None:
    """Record an error recovery to the global store.

    Convenience function for use in the runner when a recovery is attempted.

    Args:
        global_store: Global learning store (no-op if None).
        error: The error that was recovered from.
        actual_wait: Actual time waited in seconds.
        success: Whether recovery succeeded.
        model: Optional model name.
    """
    if global_store is None:
        return

    error_code = (
        error.primary.error_code.value
        if isinstance(error, ClassificationResult)
        else error.error_code.value
    )

    suggested_wait = (
        error.primary.suggested_wait_seconds
        if isinstance(error, ClassificationResult)
        else error.suggested_wait_seconds
    ) or 0

    global_store.record_error_recovery(
        error_code=error_code,
        suggested_wait=suggested_wait,
        actual_wait=actual_wait,
        recovery_success=success,
        model=model,
    )

get_global_store

get_global_store(db_path=None)

Get or create the global learning store singleton.

This function provides a convenient singleton accessor for the GlobalLearningStore. It ensures only one store instance exists per database path, avoiding the overhead of creating multiple connections to the same SQLite database.

Parameters:

Name Type Description Default
db_path Path | None

Optional custom database path. If None, uses the default path at ~/.marianne/global-learning.db.

None

Returns:

Type Description
GlobalLearningStore

The GlobalLearningStore singleton instance.

Example

store = get_global_store() # Uses default path store = get_global_store(Path("/custom/path.db")) # Custom path

Source code in src/marianne/learning/store/__init__.py
def get_global_store(
    db_path: Path | None = None,
) -> GlobalLearningStore:
    """Get or create the global learning store singleton.

    This function provides a convenient singleton accessor for the GlobalLearningStore.
    It ensures only one store instance exists per database path, avoiding the overhead
    of creating multiple connections to the same SQLite database.

    Args:
        db_path: Optional custom database path. If None, uses the default
            path at ~/.marianne/global-learning.db.

    Returns:
        The GlobalLearningStore singleton instance.

    Example:
        >>> store = get_global_store()  # Uses default path
        >>> store = get_global_store(Path("/custom/path.db"))  # Custom path
    """
    global _global_store

    with _global_store_lock:
        if _global_store is None or (
            db_path is not None and _global_store.db_path != db_path
        ):
            _global_store = GlobalLearningStore(db_path)

    return _global_store

check_migration_status

check_migration_status(global_store)

Check the current migration status.

Parameters:

Name Type Description Default
global_store GlobalLearningStore

Global learning store to check.

required

Returns:

Type Description
dict[str, Any]

Dictionary with migration status information.

Source code in src/marianne/learning/migration.py
def check_migration_status(global_store: "GlobalLearningStore") -> dict[str, Any]:
    """Check the current migration status.

    Args:
        global_store: Global learning store to check.

    Returns:
        Dictionary with migration status information.
    """
    stats = global_store.get_execution_stats()

    return {
        "total_executions": stats.get("total_executions", 0),
        "total_patterns": stats.get("total_patterns", 0),
        "unique_workspaces": stats.get("unique_workspaces", 0),
        "needs_migration": stats.get("total_executions", 0) == 0,
        "avg_pattern_effectiveness": stats.get("avg_pattern_effectiveness", 0.0),
    }

migrate_existing_outcomes

migrate_existing_outcomes(global_store, scan_patterns=None, additional_paths=None)

Convenience function to migrate all existing outcomes.

Parameters:

Name Type Description Default
global_store GlobalLearningStore

Global learning store to import into.

required
scan_patterns list[str] | None

Optional custom scan patterns.

None
additional_paths list[Path] | None

Optional additional paths to scan.

None

Returns:

Type Description
MigrationResult

MigrationResult with import statistics.

Source code in src/marianne/learning/migration.py
def migrate_existing_outcomes(
    global_store: "GlobalLearningStore",
    scan_patterns: list[str] | None = None,
    additional_paths: list[Path] | None = None,
) -> MigrationResult:
    """Convenience function to migrate all existing outcomes.

    Args:
        global_store: Global learning store to import into.
        scan_patterns: Optional custom scan patterns.
        additional_paths: Optional additional paths to scan.

    Returns:
        MigrationResult with import statistics.
    """
    migrator = OutcomeMigrator(global_store)
    return migrator.migrate_all(
        scan_patterns=scan_patterns,
        additional_paths=additional_paths,
    )

calculate_priority

calculate_priority(occurrence_count, led_to_success_count, led_to_failure_count, last_confirmed, variance=0.0)

Convenience function to calculate priority without creating a weighter.

Parameters:

Name Type Description Default
occurrence_count int

How many times this pattern was observed.

required
led_to_success_count int

Times pattern led to success when applied.

required
led_to_failure_count int

Times pattern led to failure when applied.

required
last_confirmed datetime

When this pattern was last confirmed effective.

required
variance float

Standard deviation of pattern application outcomes.

0.0

Returns:

Type Description
float

Priority score from 0.0 to 1.0.

Source code in src/marianne/learning/weighter.py
def calculate_priority(
    occurrence_count: int,
    led_to_success_count: int,
    led_to_failure_count: int,
    last_confirmed: datetime,
    variance: float = 0.0,
) -> float:
    """Convenience function to calculate priority without creating a weighter.

    Args:
        occurrence_count: How many times this pattern was observed.
        led_to_success_count: Times pattern led to success when applied.
        led_to_failure_count: Times pattern led to failure when applied.
        last_confirmed: When this pattern was last confirmed effective.
        variance: Standard deviation of pattern application outcomes.

    Returns:
        Priority score from 0.0 to 1.0.
    """
    weighter = PatternWeighter()
    return weighter.calculate_priority(
        occurrence_count=occurrence_count,
        led_to_success_count=led_to_success_count,
        led_to_failure_count=led_to_failure_count,
        last_confirmed=last_confirmed,
        variance=variance,
    )