Skip to content

error_hooks

error_hooks

Error learning hooks for integration with ErrorClassifier.

This module implements error learning as designed in Movement III: - Extend ErrorClassifier with learning hooks (CV 0.82) - Track error patterns globally - Learn adaptive wait times based on actual recovery success - Integrate with existing ErrorClassifier without major refactoring

Error Learning Hook Integration Points: 1. on_error_classified: Called when an error is classified - Records error occurrence with context - Queries similar past errors for suggested_wait adjustment

  1. on_error_recovered: Called when recovery after waiting succeeds
  2. Records actual_wait and recovery_success to error_recoveries
  3. Updates learned wait times

  4. on_auth_failure: Distinguishes transient vs permanent auth failures

Classes

ErrorLearningConfig dataclass

ErrorLearningConfig(enabled=True, min_samples=3, learning_rate=0.3, max_wait_time=7200.0, min_wait_time=10.0, decay_factor=0.9)

Configuration for error learning.

Attributes:

Name Type Description
enabled bool

Master switch for error learning.

min_samples int

Minimum recovery samples before using learned delay.

learning_rate float

How much to weight new observations vs existing.

max_wait_time float

Maximum wait time to suggest (cap on learning).

min_wait_time float

Minimum wait time to suggest (floor on learning).

decay_factor float

How much to decay old samples over time.

ErrorLearningContext dataclass

ErrorLearningContext(error, job_id, sheet_num, workspace_path, model=None, timestamp=now(), suggested_wait=None, actual_wait=None, recovery_success=None)

Context for an error learning event.

Tracks the full context of an error for learning purposes.

Attributes
error_code property
error_code

Get the error code from either ClassifiedError or ClassificationResult.

category property
category

Get the error category.

ErrorLearningHooks

ErrorLearningHooks(global_store=None, config=None)

Learning hooks for ErrorClassifier integration.

Provides hooks that can be called at various points in error handling to record error patterns and learn from recovery attempts.

The hooks follow the design pattern of non-invasive integration: - They can be optionally called by the runner - If global_store is None, hooks are no-ops - All operations are logged for debugging

Usage

hooks = ErrorLearningHooks(global_store)

When an error is classified

adjusted = hooks.on_error_classified(context) if adjusted.suggested_wait_seconds: await asyncio.sleep(adjusted.suggested_wait_seconds)

After recovery attempt

hooks.on_error_recovered(context, success=True)

Initialize error learning hooks.

Parameters:

Name Type Description Default
global_store GlobalLearningStore | None

Global learning store for persistence. If None, hooks are no-ops.

None
config ErrorLearningConfig | None

Error learning configuration.

None
Source code in src/marianne/learning/error_hooks.py
def __init__(
    self,
    global_store: "GlobalLearningStore | None" = None,
    config: ErrorLearningConfig | None = None,
) -> None:
    """Initialize error learning hooks.

    Args:
        global_store: Global learning store for persistence.
                     If None, hooks are no-ops.
        config: Error learning configuration.
    """
    self._store = global_store
    self._config = config or ErrorLearningConfig()
    self._pending_contexts: dict[str, ErrorLearningContext] = {}
Attributes
enabled property
enabled

Check if error learning is enabled and store is available.

Functions
on_error_classified
on_error_classified(context)

Hook called when an error is classified.

Records the error occurrence and potentially adjusts the suggested wait time based on learned patterns.

Parameters:

Name Type Description Default
context ErrorLearningContext

Full error context including job/sheet info.

required

Returns:

Type Description
ClassifiedError

The error with potentially adjusted suggested_wait_seconds.

Source code in src/marianne/learning/error_hooks.py
def on_error_classified(
    self,
    context: ErrorLearningContext,
) -> ClassifiedError:
    """Hook called when an error is classified.

    Records the error occurrence and potentially adjusts the suggested
    wait time based on learned patterns.

    Args:
        context: Full error context including job/sheet info.

    Returns:
        The error with potentially adjusted suggested_wait_seconds.
    """
    if not self.enabled:
        return self._get_classified_error(context)

    error = self._get_classified_error(context)

    # Record pattern for this error
    self._record_error_pattern(context)

    # Check if this is a rate limit error and we have learned data
    if error.category == ErrorCategory.RATE_LIMIT:
        adjusted_wait = self._get_learned_wait(context)
        if adjusted_wait is not None:
            _logger.info(
                f"Adjusted wait for {error.error_code.value}: "
                f"{error.suggested_wait_seconds}s -> {adjusted_wait}s (learned)"
            )
            # Create new error with adjusted wait
            return ClassifiedError(
                category=error.category,
                message=error.message,
                error_code=error.error_code,
                original_error=error.original_error,
                exit_code=error.exit_code,
                exit_signal=error.exit_signal,
                exit_reason=error.exit_reason,
                retriable=error.retriable,
                suggested_wait_seconds=adjusted_wait,
                error_info=error.error_info,
            )

    # Track this context for later recovery reporting
    context_key = self._get_context_key(context)
    self._pending_contexts[context_key] = context

    return error
on_error_recovered
on_error_recovered(context, success)

Hook called after a recovery attempt.

Records the actual wait time and whether recovery succeeded, updating the learned wait times for this error code.

Parameters:

Name Type Description Default
context ErrorLearningContext

Error context with actual_wait filled in.

required
success bool

Whether the recovery attempt succeeded.

required
Source code in src/marianne/learning/error_hooks.py
def on_error_recovered(
    self,
    context: ErrorLearningContext,
    success: bool,
) -> None:
    """Hook called after a recovery attempt.

    Records the actual wait time and whether recovery succeeded,
    updating the learned wait times for this error code.

    Args:
        context: Error context with actual_wait filled in.
        success: Whether the recovery attempt succeeded.
    """
    if not self.enabled or self._store is None:
        return

    error = self._get_classified_error(context)

    # Record the recovery to the global store
    if context.actual_wait is not None:
        suggested_wait = context.suggested_wait or error.suggested_wait_seconds or 0
        self._store.record_error_recovery(
            error_code=error.error_code.value,
            suggested_wait=suggested_wait,
            actual_wait=context.actual_wait,
            recovery_success=success,
            model=context.model,
        )

        _logger.debug(
            f"Recorded error recovery: {error.error_code.value} "
            f"actual_wait={context.actual_wait}s success={success}"
        )

    # Clean up pending context
    context_key = self._get_context_key(context)
    self._pending_contexts.pop(context_key, None)
on_auth_failure
on_auth_failure(context)

Hook to analyze auth failures.

Uses historical data to determine if this auth failure is likely transient (worth retrying) or permanent (should fail immediately).

Parameters:

Name Type Description Default
context ErrorLearningContext

Error context for the auth failure.

required

Returns:

Type Description
bool

Tuple of (is_transient, reason).

str

If is_transient is True, the error might recover after a delay.

Source code in src/marianne/learning/error_hooks.py
def on_auth_failure(
    self,
    context: ErrorLearningContext,
) -> tuple[bool, str]:
    """Hook to analyze auth failures.

    Uses historical data to determine if this auth failure is likely
    transient (worth retrying) or permanent (should fail immediately).

    Args:
        context: Error context for the auth failure.

    Returns:
        Tuple of (is_transient, reason).
        If is_transient is True, the error might recover after a delay.
    """
    if not self.enabled or self._store is None:
        return False, "No learning data available"

    error = self._get_classified_error(context)

    # Query past auth failures for this model/context
    # If we've seen successful recoveries, mark as transient
    with self._store._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT
                SUM(CASE WHEN recovery_success THEN 1 ELSE 0 END) as successes,
                COUNT(*) as total
            FROM error_recoveries
            WHERE error_code = ? AND model = ?
            """,
            (error.error_code.value, context.model),
        )
        row = cursor.fetchone()

        if row and row["total"] >= self._config.min_samples:
            success_rate = row["successes"] / row["total"]
            if success_rate > 0.3:  # >30% recovery rate suggests transient
                return True, f"Historical recovery rate: {success_rate:.0%}"

    return False, "Insufficient recovery history or low success rate"
get_error_stats
get_error_stats(error_code)

Get statistics for a specific error code.

Parameters:

Name Type Description Default
error_code str

The error code to query (e.g., 'E103').

required

Returns:

Type Description
dict[str, str | int | float]

Dictionary with error statistics.

Source code in src/marianne/learning/error_hooks.py
def get_error_stats(self, error_code: str) -> dict[str, str | int | float]:
    """Get statistics for a specific error code.

    Args:
        error_code: The error code to query (e.g., 'E103').

    Returns:
        Dictionary with error statistics.
    """
    if not self.enabled or self._store is None:
        return {"error": "Learning not enabled"}

    with self._store._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT
                COUNT(*) as total_occurrences,
                SUM(CASE WHEN recovery_success THEN 1 ELSE 0 END) as recoveries,
                AVG(actual_wait) as avg_wait,
                MIN(actual_wait) as min_wait,
                MAX(actual_wait) as max_wait
            FROM error_recoveries
            WHERE error_code = ?
            """,
            (error_code,),
        )
        row = cursor.fetchone()

        if row:
            return {
                "error_code": error_code,
                "total_occurrences": row["total_occurrences"],
                "successful_recoveries": row["recoveries"] or 0,
                "recovery_rate": (
                    (row["recoveries"] / row["total_occurrences"] * 100)
                    if row["total_occurrences"] > 0
                    else 0
                ),
                "avg_wait_seconds": round(row["avg_wait"] or 0, 1),
                "min_wait_seconds": round(row["min_wait"] or 0, 1),
                "max_wait_seconds": round(row["max_wait"] or 0, 1),
            }

    return {"error_code": error_code, "total_occurrences": 0}

Functions

wrap_classifier_with_learning

wrap_classifier_with_learning(classifier, global_store=None)

Wrap an ErrorClassifier with learning hooks.

This is a convenience function that creates learning hooks and returns them alongside the classifier for easy integration.

Parameters:

Name Type Description Default
classifier ErrorClassifier

The ErrorClassifier to wrap.

required
global_store GlobalLearningStore | None

Global learning store for persistence.

None

Returns:

Type Description
tuple[ErrorClassifier, ErrorLearningHooks]

Tuple of (classifier, hooks) for use in runner.

Source code in src/marianne/learning/error_hooks.py
def wrap_classifier_with_learning(
    classifier: "ErrorClassifier",
    global_store: "GlobalLearningStore | None" = None,
) -> tuple["ErrorClassifier", ErrorLearningHooks]:
    """Wrap an ErrorClassifier with learning hooks.

    This is a convenience function that creates learning hooks and
    returns them alongside the classifier for easy integration.

    Args:
        classifier: The ErrorClassifier to wrap.
        global_store: Global learning store for persistence.

    Returns:
        Tuple of (classifier, hooks) for use in runner.
    """
    # Import here to avoid circular imports

    hooks = ErrorLearningHooks(global_store)
    return classifier, hooks

record_error_recovery

record_error_recovery(global_store, error, actual_wait, success, model=None)

Record an error recovery to the global store.

Convenience function for use in the runner when a recovery is attempted.

Parameters:

Name Type Description Default
global_store GlobalLearningStore | None

Global learning store (no-op if None).

required
error ClassifiedError | ClassificationResult

The error that was recovered from.

required
actual_wait float

Actual time waited in seconds.

required
success bool

Whether recovery succeeded.

required
model str | None

Optional model name.

None
Source code in src/marianne/learning/error_hooks.py
def record_error_recovery(
    global_store: "GlobalLearningStore | None",
    error: ClassifiedError | ClassificationResult,
    actual_wait: float,
    success: bool,
    model: str | None = None,
) -> None:
    """Record an error recovery to the global store.

    Convenience function for use in the runner when a recovery is attempted.

    Args:
        global_store: Global learning store (no-op if None).
        error: The error that was recovered from.
        actual_wait: Actual time waited in seconds.
        success: Whether recovery succeeded.
        model: Optional model name.
    """
    if global_store is None:
        return

    error_code = (
        error.primary.error_code.value
        if isinstance(error, ClassificationResult)
        else error.error_code.value
    )

    suggested_wait = (
        error.primary.suggested_wait_seconds
        if isinstance(error, ClassificationResult)
        else error.suggested_wait_seconds
    ) or 0

    global_store.record_error_recovery(
        error_code=error_code,
        suggested_wait=suggested_wait,
        actual_wait=actual_wait,
        recovery_success=success,
        model=model,
    )