Skip to content

Index

errors

Error classification and handling.

Re-exports all public symbols for backward compatibility.

Classes

ErrorClassifier

ErrorClassifier(rate_limit_patterns=None, auth_patterns=None, network_patterns=None)

Classifies errors based on patterns and exit codes.

Pattern matching follows the approach from run-sheet-review.sh which checks output for rate limit indicators.

Initialize classifier with detection patterns.

Parameters:

Name Type Description Default
rate_limit_patterns list[str] | None

Regex patterns indicating rate limiting

None
auth_patterns list[str] | None

Regex patterns indicating auth failures

None
network_patterns list[str] | None

Regex patterns indicating network issues

None
Source code in src/marianne/core/errors/classifier.py
def __init__(
    self,
    rate_limit_patterns: list[str] | None = None,
    auth_patterns: list[str] | None = None,
    network_patterns: list[str] | None = None,
):
    """Initialize classifier with detection patterns.

    Args:
        rate_limit_patterns: Regex patterns indicating rate limiting
        auth_patterns: Regex patterns indicating auth failures
        network_patterns: Regex patterns indicating network issues
    """
    self.rate_limit_patterns = _compile_patterns(
        rate_limit_patterns or _DEFAULT_RATE_LIMIT_PATTERNS
    )
    self.auth_patterns = _compile_patterns(auth_patterns or _DEFAULT_AUTH_PATTERNS)
    self.network_patterns = _compile_patterns(network_patterns or _DEFAULT_NETWORK_PATTERNS)
    self.dns_patterns = _compile_patterns(_DEFAULT_DNS_PATTERNS)
    self.ssl_patterns = _compile_patterns(_DEFAULT_SSL_PATTERNS)
    self.capacity_patterns = _compile_patterns(_DEFAULT_CAPACITY_PATTERNS)
    self.quota_exhaustion_patterns = _compile_patterns(_DEFAULT_QUOTA_EXHAUSTION_PATTERNS)
    self.reset_time_patterns = _compile_patterns(_DEFAULT_RESET_TIME_PATTERNS)
    self.mcp_patterns = _compile_patterns(_DEFAULT_MCP_PATTERNS)
    self.cli_mode_patterns = _compile_patterns(_DEFAULT_CLI_MODE_PATTERNS)
    self.enoent_patterns = _compile_patterns(_DEFAULT_ENOENT_PATTERNS)
    self.stale_patterns = _compile_patterns(_DEFAULT_STALE_PATTERNS)

    # Pre-computed combined regex patterns for _matches_any().
    # Each pattern list is merged into a single alternation regex so that
    # matching is a single .search() call per category.
    self._combined_cache: dict[int, re.Pattern[str]] = {}
    for attr_name in (
        "rate_limit_patterns", "auth_patterns", "network_patterns",
        "dns_patterns", "ssl_patterns", "capacity_patterns",
        "quota_exhaustion_patterns", "mcp_patterns",
        "cli_mode_patterns", "enoent_patterns", "stale_patterns",
    ):
        patterns = getattr(self, attr_name)
        if patterns:
            alternation = "|".join(f"(?:{p.pattern})" for p in patterns)
            self._combined_cache[id(patterns)] = re.compile(
                alternation, re.IGNORECASE
            )
Functions
parse_reset_time
parse_reset_time(text)

Parse reset time from message and return seconds until reset.

Supports patterns like: - "resets at 9pm" -> seconds until 9pm (or next day if past) - "resets at 21:00" -> seconds until 21:00 - "resets in 3 hours" -> 3 * 3600 seconds - "resets in 30 minutes" -> 30 * 60 seconds

Parameters:

Name Type Description Default
text str

Error message that may contain reset time info.

required

Returns:

Type Description
float | None

Seconds until reset, or None if no reset time found.

float | None

Returns minimum of RESET_TIME_MINIMUM_WAIT_SECONDS to avoid immediate retries.

Source code in src/marianne/core/errors/classifier.py
def parse_reset_time(self, text: str) -> float | None:
    """Parse reset time from message and return seconds until reset.

    Supports patterns like:
    - "resets at 9pm" -> seconds until 9pm (or next day if past)
    - "resets at 21:00" -> seconds until 21:00
    - "resets in 3 hours" -> 3 * 3600 seconds
    - "resets in 30 minutes" -> 30 * 60 seconds

    Args:
        text: Error message that may contain reset time info.

    Returns:
        Seconds until reset, or None if no reset time found.
        Returns minimum of RESET_TIME_MINIMUM_WAIT_SECONDS to avoid immediate retries.
    """

    for pattern in self.reset_time_patterns:
        match = pattern.search(text)
        if not match:
            continue

        groups = match.groups()

        # Pattern: "resets in X hours/minutes"
        if (
            len(groups) == 2
            and groups[1]
            and groups[1].lower() in ("hour", "hr", "minute", "min")
        ):
            amount = int(groups[0])
            unit = groups[1].lower()
            seconds: float = amount * 3600 if unit in ("hour", "hr") else amount * 60
            return self._clamp_wait(seconds)

        # Pattern: "resets at X:XX" (24-hour time)
        if len(groups) == 2 and groups[1] and groups[1].isdigit():
            hour = int(groups[0])
            minute = int(groups[1])
            now = datetime.now()
            reset_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
            if reset_time <= now:
                reset_time += timedelta(days=1)  # Next day
            seconds = (reset_time - now).total_seconds()
            return self._clamp_wait(seconds)

        # Pattern: "resets at Xpm/Xam"
        if len(groups) == 2 and groups[1] and groups[1].lower() in ("am", "pm"):
            hour = int(groups[0])
            meridiem = groups[1].lower()
            if meridiem == "pm" and hour != 12:
                hour += 12
            elif meridiem == "am" and hour == 12:
                hour = 0
            now = datetime.now()
            reset_time = now.replace(hour=hour, minute=0, second=0, microsecond=0)
            if reset_time <= now:
                reset_time += timedelta(days=1)  # Next day
            seconds = (reset_time - now).total_seconds()
            return self._clamp_wait(seconds)

    # No pattern matched, return default wait
    return None
extract_rate_limit_wait
extract_rate_limit_wait(text)

Extract wait duration from rate limit error text.

Supports common patterns from Anthropic, Claude Code, and generic APIs: - "retry after N seconds/minutes/hours" - "try again in N seconds/minutes/hours" - "wait N seconds/minutes/hours" - "Retry-After: N" (header value) - "resets in N hours/minutes" (delegates to parse_reset_time)

Parameters:

Name Type Description Default
text str

Error message or combined stdout/stderr.

required

Returns:

Type Description
float | None

Seconds to wait, clamped to [MIN, MAX], or None if no pattern matches.

Source code in src/marianne/core/errors/classifier.py
def extract_rate_limit_wait(self, text: str) -> float | None:
    """Extract wait duration from rate limit error text.

    Supports common patterns from Anthropic, Claude Code, and generic APIs:
    - "retry after N seconds/minutes/hours"
    - "try again in N seconds/minutes/hours"
    - "wait N seconds/minutes/hours"
    - "Retry-After: N" (header value)
    - "resets in N hours/minutes" (delegates to parse_reset_time)

    Args:
        text: Error message or combined stdout/stderr.

    Returns:
        Seconds to wait, clamped to [MIN, MAX], or None if no pattern matches.
    """
    if not text:
        return None

    # Defer to existing parse_reset_time for "resets in/at" patterns
    reset_time = self.parse_reset_time(text)
    if reset_time is not None:
        return reset_time

    import re as _re

    patterns: list[tuple[_re.Pattern[str], float]] = [
        # "retry after N seconds/second"
        (_re.compile(r"retry\s+after\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "retry after N minutes/minute"
        (_re.compile(r"retry\s+after\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "retry after N hours/hour"
        (_re.compile(r"retry\s+after\s+(\d+)\s*h(?:ours?)?", _re.IGNORECASE), 3600.0),
        # "try again in N seconds"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "try again in N minutes"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "try again in N hours"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*h(?:ours?)?", _re.IGNORECASE), 3600.0),
        # "wait N seconds"
        (_re.compile(r"wait\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "wait N minutes"
        (_re.compile(r"wait\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "Retry-After: N" (header value, always seconds)
        (_re.compile(r"[Rr]etry-?After\s*:\s*(\d+)", _re.IGNORECASE), 1.0),
    ]

    for pattern, multiplier in patterns:
        match = pattern.search(text)
        if match:
            value = int(match.group(1))
            return self._clamp_wait(value * multiplier)

    return None
classify
classify(stdout='', stderr='', exit_code=None, exit_signal=None, exit_reason=None, exception=None, output_format=None)

Classify an error based on output, exit code, and signal.

Delegates to sub-classifiers in priority order: 1. Signal-based exits (_classify_signal) 2. Timeout exit reason 3. Pattern-matching on output (_classify_by_pattern) 4. Exit code analysis (_classify_by_exit_code) 5. Unknown fallback

Parameters:

Name Type Description Default
stdout str

Standard output from the command

''
stderr str

Standard error from the command

''
exit_code int | None

Process exit code (0 = success), None if killed by signal

None
exit_signal int | None

Signal number if killed by signal

None
exit_reason ExitReason | None

Why execution ended (completed, timeout, killed, error)

None
exception Exception | None

Optional exception that was raised

None
output_format str | None

Backend output format ("text", "json", "stream-json"). When "text", exit code 1 is classified as E209 (validation) instead of E009 (unknown).

None

Returns:

Type Description
ClassifiedError

ClassifiedError with category, error_code, and metadata

Source code in src/marianne/core/errors/classifier.py
def classify(
    self,
    stdout: str = "",
    stderr: str = "",
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    exception: Exception | None = None,
    output_format: str | None = None,
) -> ClassifiedError:
    """Classify an error based on output, exit code, and signal.

    Delegates to sub-classifiers in priority order:
    1. Signal-based exits (_classify_signal)
    2. Timeout exit reason
    3. Pattern-matching on output (_classify_by_pattern)
    4. Exit code analysis (_classify_by_exit_code)
    5. Unknown fallback

    Args:
        stdout: Standard output from the command
        stderr: Standard error from the command
        exit_code: Process exit code (0 = success), None if killed by signal
        exit_signal: Signal number if killed by signal
        exit_reason: Why execution ended (completed, timeout, killed, error)
        exception: Optional exception that was raised
        output_format: Backend output format ("text", "json", "stream-json").
            When "text", exit code 1 is classified as E209 (validation)
            instead of E009 (unknown).

    Returns:
        ClassifiedError with category, error_code, and metadata
    """
    combined = f"{stdout}\n{stderr}"
    if exception:
        combined += f"\n{str(exception)}"

    # 0. Negative exit codes indicate signal kills (e.g., -9 = SIGKILL)
    # Python's subprocess reports killed-by-signal as negative exit codes.
    if exit_code is not None and exit_code < 0:
        exit_signal = abs(exit_code)

    # 1. Signal-based exits
    if exit_signal is not None:
        result = self._classify_signal(
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            stdout=stdout,
            stderr=stderr,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            retriable=result.retriable,
            suggested_wait=result.suggested_wait_seconds,
            message=result.message,
        )
        return result

    # 2. Timeout exit reason (even without signal)
    #    Differentiate stale detection (E006) from backend timeout (E001).
    #    Stale detection writes "Stale execution:" to stderr — F-097.
    if exit_reason == "timeout":
        is_stale = "stale execution" in combined.lower()
        error_code = (
            ErrorCode.EXECUTION_STALE if is_stale
            else ErrorCode.EXECUTION_TIMEOUT
        )
        message = (
            "Stale execution detected — no output activity"
            if is_stale
            else "Command timed out"
        )
        wait_seconds = 120.0 if is_stale else 60.0
        result = ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=message,
            error_code=error_code,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=result.retriable,
            message=result.message,
        )
        return result

    # 3. Pattern-matching on output text
    pattern_result = self._classify_by_pattern(
        combined, exit_code, exit_reason, exception,
    )
    if pattern_result is not None:
        return pattern_result

    # 4. Exit code analysis (with output for non-transient detection)
    exit_code_result = self._classify_by_exit_code(
        exit_code, exit_reason, exception, combined, output_format,
    )
    if exit_code_result is not None:
        return exit_code_result

    # 5. exit_code=None: process killed or disappeared without exit code.
    # Always retriable — this is never a deterministic user error.
    # Check stderr for OOM indicators to set appropriate wait time.
    if exit_code is None:
        # OOM/kill indicators in stderr → longer wait (memory needs to free)
        oom_indicators = ("killed", "out of memory", "oom", "cannot allocate")
        stderr_lower = stderr.lower() if stderr else ""
        is_oom = any(indicator in stderr_lower for indicator in oom_indicators)
        wait_seconds = 60.0 if is_oom else 10.0
        message = (
            "Process killed (possible OOM — retrying with longer wait)"
            if is_oom
            else "Process exited without exit code (possible signal race — retrying)"
        )

        result = ClassifiedError(
            category=ErrorCategory.TRANSIENT,
            message=message,
            error_code=ErrorCode.UNKNOWN,
            original_error=exception,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_code=exit_code,
            retriable=result.retriable,
            is_oom=is_oom,
            message=result.message,
        )
        return result

    # 6. Unknown fallback
    result = ClassifiedError(
        category=ErrorCategory.FATAL,
        message=f"Unknown error (exit_code={exit_code})",
        error_code=ErrorCode.UNKNOWN,
        original_error=exception,
        exit_code=exit_code,
        exit_signal=None,
        exit_reason=exit_reason,
        retriable=False,
    )
    _logger.warning(
        _EVT_ERROR_CLASSIFIED,
        category=result.category.value,
        error_code=result.error_code.value,
        exit_code=exit_code,
        retriable=result.retriable,
        message=result.message,
    )
    return result
classify_execution
classify_execution(stdout='', stderr='', exit_code=None, exit_signal=None, exit_reason=None, exception=None, output_format=None, *, input=None)

Classify execution errors using structured JSON parsing with fallback.

This is the new multi-error classification method that: 1. Parses structured JSON errors[] from CLI output (if present) 2. Classifies each error independently (no short-circuiting) 3. Analyzes exit code and signal for additional context 4. Selects root cause using priority-based scoring 5. Returns all errors with primary/secondary designation

This method returns ClassificationResult which provides access to all detected errors while maintaining backward compatibility through the primary attribute.

Supports two calling conventions
  1. Keyword args (legacy): classify_execution(stdout=..., stderr=..., ...)
  2. Bundled (preferred): classify_execution(input=ClassificationInput(...))

When input is supplied, its fields take precedence over individual keyword arguments.

Parameters:

Name Type Description Default
stdout str

Standard output from the command (may contain JSON).

''
stderr str

Standard error from the command.

''
exit_code int | None

Process exit code (0 = success), None if killed by signal.

None
exit_signal int | None

Signal number if killed by signal.

None
exit_reason ExitReason | None

Why execution ended (completed, timeout, killed, error).

None
exception Exception | None

Optional exception that was raised.

None
output_format str | None

Expected output format (e.g. "json").

None
input ClassificationInput | None

Bundled classification input (preferred over individual kwargs).

None

Returns:

Type Description
ClassificationResult

ClassificationResult with primary error, secondary errors, and metadata.

Example
result = classifier.classify_execution(stdout, stderr, exit_code)

# Access primary (root cause) error
if result.primary.category == ErrorCategory.RATE_LIMIT:
    wait_time = result.primary.suggested_wait_seconds

# Access all errors for debugging
for error in result.all_errors:
    logger.info(f"{error.error_code.value}: {error.message}")
Source code in src/marianne/core/errors/classifier.py
def classify_execution(
    self,
    stdout: str = "",
    stderr: str = "",
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    exception: Exception | None = None,
    output_format: str | None = None,
    *,
    input: ClassificationInput | None = None,
) -> ClassificationResult:
    """Classify execution errors using structured JSON parsing with fallback.

    This is the new multi-error classification method that:
    1. Parses structured JSON errors[] from CLI output (if present)
    2. Classifies each error independently (no short-circuiting)
    3. Analyzes exit code and signal for additional context
    4. Selects root cause using priority-based scoring
    5. Returns all errors with primary/secondary designation

    This method returns ClassificationResult which provides access to
    all detected errors while maintaining backward compatibility through
    the `primary` attribute.

    Supports two calling conventions:
        1. **Keyword args** (legacy): ``classify_execution(stdout=..., stderr=..., ...)``
        2. **Bundled** (preferred): ``classify_execution(input=ClassificationInput(...))``

    When *input* is supplied, its fields take precedence over
    individual keyword arguments.

    Args:
        stdout: Standard output from the command (may contain JSON).
        stderr: Standard error from the command.
        exit_code: Process exit code (0 = success), None if killed by signal.
        exit_signal: Signal number if killed by signal.
        exit_reason: Why execution ended (completed, timeout, killed, error).
        exception: Optional exception that was raised.
        output_format: Expected output format (e.g. "json").
        input: Bundled classification input (preferred over individual kwargs).

    Returns:
        ClassificationResult with primary error, secondary errors, and metadata.

    Example:
        ```python
        result = classifier.classify_execution(stdout, stderr, exit_code)

        # Access primary (root cause) error
        if result.primary.category == ErrorCategory.RATE_LIMIT:
            wait_time = result.primary.suggested_wait_seconds

        # Access all errors for debugging
        for error in result.all_errors:
            logger.info(f"{error.error_code.value}: {error.message}")
        ```
    """
    if input is not None:
        stdout = input.stdout
        stderr = input.stderr
        exit_code = input.exit_code
        exit_signal = input.exit_signal
        exit_reason = input.exit_reason
        exception = input.exception
        output_format = input.output_format
    all_errors: list[ClassifiedError] = []
    raw_errors: list[ParsedCliError] = []
    classification_method = "structured"

    # === PHASE 1: Parse Structured JSON ===
    # Pass both stdout and stderr - errors can appear in either stream
    json_errors = try_parse_json_errors(stdout, stderr)
    raw_errors = json_errors

    if json_errors:
        for parsed_error in json_errors:
            classified = classify_single_json_error(
                parsed_error,
                exit_code=exit_code,
                exit_reason=exit_reason,
            )
            all_errors.append(classified)

    # === PHASE 2: Exit Code / Signal Analysis ===
    if exit_signal is not None:
        signal_error = self._classify_signal(
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            stdout=stdout,
            stderr=stderr,
        )
        # Only add if not duplicating an existing error code
        if not any(e.error_code == signal_error.error_code for e in all_errors):
            all_errors.append(signal_error)
            if not json_errors:
                classification_method = "exit_code"

    elif exit_reason == "timeout":
        # Differentiate stale detection (E006) from backend timeout (E001).
        # Stale detection writes "Stale execution:" to stderr — F-097.
        combined_for_stale = f"{stdout}\n{stderr}".lower()
        is_stale = "stale execution" in combined_for_stale
        timeout_code = (
            ErrorCode.EXECUTION_STALE if is_stale
            else ErrorCode.EXECUTION_TIMEOUT
        )
        timeout_message = (
            "Stale execution detected — no output activity"
            if is_stale
            else "Command timed out"
        )
        timeout_wait = 120.0 if is_stale else 60.0
        timeout_error = ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=timeout_message,
            error_code=timeout_code,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=timeout_wait,
        )
        if not any(e.error_code == timeout_code for e in all_errors):
            all_errors.append(timeout_error)
            if not json_errors:
                classification_method = "exit_code"

    elif exit_code is None and json_errors:
        # Process killed or disappeared without exit code, AND Phase 1
        # found JSON errors from partial output. Add a process-killed
        # error so select_root_cause can weigh it against JSON errors.
        # Without this, exit_code=None context is lost when Phase 1
        # finds errors (Phase 4 regex fallback is skipped).
        # When no JSON errors exist, Phase 4 calls classify() which
        # already handles exit_code=None correctly.
        stderr_lower = stderr.lower() if stderr else ""
        oom_indicators = ("killed", "out of memory", "oom", "cannot allocate")
        is_oom = any(indicator in stderr_lower for indicator in oom_indicators)
        wait_seconds = 60.0 if is_oom else 10.0
        message = (
            "Process killed (possible OOM — retrying with longer wait)"
            if is_oom
            else "Process exited without exit code "
            "(possible signal race — retrying)"
        )
        process_killed_error = ClassifiedError(
            category=ErrorCategory.TRANSIENT,
            message=message,
            error_code=ErrorCode.UNKNOWN,
            original_error=exception,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        all_errors.append(process_killed_error)

    # === PHASE 3: Exception Analysis ===
    if exception is not None:
        exc_str = str(exception).lower()
        # Try to classify based on exception message
        if "timeout" in exc_str:
            exc_error = ClassifiedError(
                category=ErrorCategory.TIMEOUT,
                message=str(exception),
                error_code=ErrorCode.EXECUTION_TIMEOUT,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=60.0,
            )
        elif "connection" in exc_str or "network" in exc_str:
            exc_error = ClassifiedError(
                category=ErrorCategory.NETWORK,
                message=str(exception),
                error_code=ErrorCode.NETWORK_CONNECTION_FAILED,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=30.0,
            )
        else:
            exc_error = ClassifiedError(
                category=ErrorCategory.TRANSIENT,
                message=str(exception),
                error_code=ErrorCode.UNKNOWN,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=30.0,
            )
        # Only add if we don't have the same error code already
        if not any(e.error_code == exc_error.error_code for e in all_errors):
            all_errors.append(exc_error)

    # === PHASE 4: Regex Fallback (only if no structured errors) ===
    if not all_errors:
        classification_method = "regex_fallback"
        fallback_error = self.classify(
            stdout=stdout,
            stderr=stderr,
            exit_code=exit_code,
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            output_format=output_format,
        )
        all_errors.append(fallback_error)

    # === PHASE 4.5: Rate Limit Override (always runs) ===
    # Rate limits in stdout/stderr must never be missed, even when Phase 1
    # found structured JSON errors that masked them. F-098: Claude CLI rate
    # limit messages appear in stdout but Phase 1 may produce generic errors
    # that prevent Phase 4 from firing.
    has_rate_limit_error = any(
        e.category == ErrorCategory.RATE_LIMIT for e in all_errors
    )
    if not has_rate_limit_error:
        combined_for_rate_limit = f"{stdout}\n{stderr}"
        if self._matches_any(combined_for_rate_limit, self.rate_limit_patterns):
            # Check for quota exhaustion (more specific) first
            if self._matches_any(
                combined_for_rate_limit, self.quota_exhaustion_patterns
            ):
                wait_seconds = (
                    self.parse_reset_time(combined_for_rate_limit)
                    or DEFAULT_QUOTA_WAIT_SECONDS
                )
                rate_limit_error = ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message="Token quota exhausted — detected in output",
                    error_code=ErrorCode.QUOTA_EXHAUSTED,
                    original_error=exception,
                    exit_code=exit_code,
                    exit_signal=None,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=wait_seconds,
                )
            else:
                error_code = (
                    ErrorCode.CAPACITY_EXCEEDED
                    if self._matches_any(
                        combined_for_rate_limit, self.capacity_patterns
                    )
                    else ErrorCode.RATE_LIMIT_API
                )
                rate_limit_error = ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message="Rate limit detected in output",
                    error_code=error_code,
                    original_error=exception,
                    exit_code=exit_code,
                    exit_signal=None,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=DEFAULT_RATE_LIMIT_WAIT_SECONDS,
                )
            all_errors.append(rate_limit_error)
            _logger.warning(
                "rate_limit_override",
                component="errors",
                error_code=rate_limit_error.error_code.value,
                message="Rate limit detected via Phase 4.5 override",
            )

    # === PHASE 5: Root Cause Selection ===
    root_cause, symptoms, confidence = select_root_cause(all_errors)

    # Log the classification result
    _logger.info(
        "execution_classified",
        method=classification_method,
        primary_code=root_cause.error_code.value,
        error_count=len(all_errors),
        confidence=confidence,
        all_codes=[e.error_code.value for e in all_errors],
    )

    return ClassificationResult(
        primary=root_cause,
        secondary=symptoms,
        raw_errors=raw_errors,
        confidence=confidence,
        classification_method=classification_method,
    )
from_config classmethod
from_config(rate_limit_patterns)

Create classifier from config patterns.

Source code in src/marianne/core/errors/classifier.py
@classmethod
def from_config(cls, rate_limit_patterns: list[str]) -> ErrorClassifier:
    """Create classifier from config patterns."""
    return cls(rate_limit_patterns=rate_limit_patterns)

ErrorCategory

Bases: str, Enum

Categories of errors with different retry behaviors.

Attributes
RATE_LIMIT class-attribute instance-attribute
RATE_LIMIT = 'rate_limit'

Retriable with long wait - API/service is rate limiting.

TRANSIENT class-attribute instance-attribute
TRANSIENT = 'transient'

Retriable with backoff - temporary network/service issues.

VALIDATION class-attribute instance-attribute
VALIDATION = 'validation'

Retriable - Claude ran but didn't produce expected output.

AUTH class-attribute instance-attribute
AUTH = 'auth'

Fatal - authentication/authorization failure, needs user intervention.

NETWORK class-attribute instance-attribute
NETWORK = 'network'

Retriable with backoff - network connectivity issues.

TIMEOUT class-attribute instance-attribute
TIMEOUT = 'timeout'

Retriable - operation timed out.

SIGNAL class-attribute instance-attribute
SIGNAL = 'signal'

Process killed by signal - may be retriable depending on signal.

FATAL class-attribute instance-attribute
FATAL = 'fatal'

Non-retriable - stop job immediately.

CONFIGURATION class-attribute instance-attribute
CONFIGURATION = 'configuration'

Non-retriable - configuration error needs user intervention (e.g., MCP setup).

PREFLIGHT class-attribute instance-attribute
PREFLIGHT = 'preflight'

Pre-execution check failure — config or environment not ready.

ESCALATION class-attribute instance-attribute
ESCALATION = 'escalation'

Escalation-based abort — grounding or escalation policy triggered.

ErrorCode

Bases: str, Enum

Structured error codes for comprehensive error classification.

Error codes are organized by category using numeric prefixes: - E0xx: Execution errors (timeouts, crashes, kills) - E1xx: Rate limit / capacity errors - E2xx: Validation errors - E3xx: Configuration errors - E4xx: State errors - E5xx: Backend errors - E6xx: Preflight errors

Error codes are stable identifiers that can be used for: - Programmatic error handling and routing - Log aggregation and alerting - Documentation and troubleshooting guides - Metrics and observability dashboards

Attributes
EXECUTION_TIMEOUT class-attribute instance-attribute
EXECUTION_TIMEOUT = 'E001'

Command execution exceeded timeout limit.

EXECUTION_KILLED class-attribute instance-attribute
EXECUTION_KILLED = 'E002'

Process was killed by a signal (external termination).

EXECUTION_CRASHED class-attribute instance-attribute
EXECUTION_CRASHED = 'E003'

Process crashed (segfault, bus error, abort, etc.).

EXECUTION_INTERRUPTED class-attribute instance-attribute
EXECUTION_INTERRUPTED = 'E004'

Process was interrupted by user (SIGINT/Ctrl+C).

EXECUTION_OOM class-attribute instance-attribute
EXECUTION_OOM = 'E005'

Process was killed due to out of memory condition.

EXECUTION_STALE class-attribute instance-attribute
EXECUTION_STALE = 'E006'

Execution killed by stale detection — no output for too long.

EXECUTION_UNKNOWN class-attribute instance-attribute
EXECUTION_UNKNOWN = 'E009'

Unknown execution error with non-zero exit code.

RATE_LIMIT_API class-attribute instance-attribute
RATE_LIMIT_API = 'E101'

API rate limit exceeded (429, quota, throttling).

RATE_LIMIT_CLI class-attribute instance-attribute
RATE_LIMIT_CLI = 'E102'

CLI-level rate limiting detected.

CAPACITY_EXCEEDED class-attribute instance-attribute
CAPACITY_EXCEEDED = 'E103'

Service capacity exceeded (overloaded, try again later).

QUOTA_EXHAUSTED class-attribute instance-attribute
QUOTA_EXHAUSTED = 'E104'

Token/usage quota exhausted - wait until reset time.

VALIDATION_FILE_MISSING class-attribute instance-attribute
VALIDATION_FILE_MISSING = 'E201'

Expected output file does not exist.

VALIDATION_CONTENT_MISMATCH class-attribute instance-attribute
VALIDATION_CONTENT_MISMATCH = 'E202'

Output content does not match expected pattern.

VALIDATION_COMMAND_FAILED class-attribute instance-attribute
VALIDATION_COMMAND_FAILED = 'E203'

Validation command returned non-zero exit code.

VALIDATION_TIMEOUT class-attribute instance-attribute
VALIDATION_TIMEOUT = 'E204'

Validation check timed out.

VALIDATION_GENERIC class-attribute instance-attribute
VALIDATION_GENERIC = 'E209'

Generic validation failure (output validation needed).

CONFIG_INVALID class-attribute instance-attribute
CONFIG_INVALID = 'E301'

Configuration file is malformed or invalid.

CONFIG_MISSING_FIELD class-attribute instance-attribute
CONFIG_MISSING_FIELD = 'E302'

Required configuration field is missing.

CONFIG_PATH_NOT_FOUND class-attribute instance-attribute
CONFIG_PATH_NOT_FOUND = 'E303'

Configuration file path does not exist.

CONFIG_PARSE_ERROR class-attribute instance-attribute
CONFIG_PARSE_ERROR = 'E304'

Failed to parse configuration file (YAML/JSON syntax error).

CONFIG_MCP_ERROR class-attribute instance-attribute
CONFIG_MCP_ERROR = 'E305'

MCP server/plugin configuration error (missing env vars, invalid config).

CONFIG_CLI_MODE_ERROR class-attribute instance-attribute
CONFIG_CLI_MODE_ERROR = 'E306'

Claude CLI mode mismatch (e.g., streaming mode incompatible with operation).

STATE_CORRUPTION class-attribute instance-attribute
STATE_CORRUPTION = 'E401'

Checkpoint state file is corrupted or inconsistent.

STATE_LOAD_FAILED class-attribute instance-attribute
STATE_LOAD_FAILED = 'E402'

Failed to load checkpoint state from storage.

STATE_SAVE_FAILED class-attribute instance-attribute
STATE_SAVE_FAILED = 'E403'

Failed to save checkpoint state to storage.

STATE_VERSION_MISMATCH class-attribute instance-attribute
STATE_VERSION_MISMATCH = 'E404'

Checkpoint state version is incompatible.

BACKEND_CONNECTION class-attribute instance-attribute
BACKEND_CONNECTION = 'E501'

Failed to connect to backend service.

BACKEND_AUTH class-attribute instance-attribute
BACKEND_AUTH = 'E502'

Backend authentication or authorization failed.

BACKEND_RESPONSE class-attribute instance-attribute
BACKEND_RESPONSE = 'E503'

Invalid or unexpected response from backend.

BACKEND_TIMEOUT class-attribute instance-attribute
BACKEND_TIMEOUT = 'E504'

Backend request timed out.

BACKEND_NOT_FOUND class-attribute instance-attribute
BACKEND_NOT_FOUND = 'E505'

Backend executable or service not found.

PREFLIGHT_PATH_MISSING class-attribute instance-attribute
PREFLIGHT_PATH_MISSING = 'E601'

Required path does not exist (working_dir, referenced file).

PREFLIGHT_PROMPT_TOO_LARGE class-attribute instance-attribute
PREFLIGHT_PROMPT_TOO_LARGE = 'E602'

Prompt exceeds recommended token limit.

PREFLIGHT_WORKING_DIR_INVALID class-attribute instance-attribute
PREFLIGHT_WORKING_DIR_INVALID = 'E603'

Working directory is not accessible or not a directory.

PREFLIGHT_VALIDATION_SETUP class-attribute instance-attribute
PREFLIGHT_VALIDATION_SETUP = 'E604'

Validation target path or pattern is invalid.

NETWORK_CONNECTION_FAILED class-attribute instance-attribute
NETWORK_CONNECTION_FAILED = 'E901'

Network connection failed (refused, reset, unreachable).

NETWORK_DNS_ERROR class-attribute instance-attribute
NETWORK_DNS_ERROR = 'E902'

DNS resolution failed.

NETWORK_SSL_ERROR class-attribute instance-attribute
NETWORK_SSL_ERROR = 'E903'

SSL/TLS handshake or certificate error.

NETWORK_TIMEOUT class-attribute instance-attribute
NETWORK_TIMEOUT = 'E904'

Network operation timed out.

UNKNOWN class-attribute instance-attribute
UNKNOWN = 'E999'

Unclassified error - requires investigation.

category property
category

Get the category prefix (first digit) of this error code.

Returns:

Type Description
str

Category string like "execution", "rate_limit", "validation", etc.

is_retriable property
is_retriable

Check if this error code is generally retriable.

Returns:

Type Description
bool

True if errors with this code are typically retriable.

Functions
get_retry_behavior
get_retry_behavior()

Get precise retry behavior for this error code.

Returns error-code-specific delay and retry recommendations. Uses module-level _RETRY_BEHAVIORS constant to avoid rebuilding the lookup table on every call.

Returns:

Type Description
RetryBehavior

RetryBehavior with delay, retriability, and reason.

Source code in src/marianne/core/errors/codes.py
def get_retry_behavior(self) -> RetryBehavior:
    """Get precise retry behavior for this error code.

    Returns error-code-specific delay and retry recommendations.
    Uses module-level _RETRY_BEHAVIORS constant to avoid rebuilding
    the lookup table on every call.

    Returns:
        RetryBehavior with delay, retriability, and reason.
    """
    return _RETRY_BEHAVIORS.get(
        self,
        RetryBehavior(
            delay_seconds=30.0,
            is_retriable=self.is_retriable,
            reason=f"Default behavior for {self.value}",
        ),
    )
get_severity
get_severity()

Get the severity level for this error code.

Severity assignments: - CRITICAL: Fatal errors requiring immediate attention - ERROR: Most error codes (default) - WARNING: Degraded but potentially temporary conditions - INFO: Reserved for future diagnostic codes

Returns:

Type Description
Severity

Severity level for this error code.

Source code in src/marianne/core/errors/codes.py
def get_severity(self) -> Severity:
    """Get the severity level for this error code.

    Severity assignments:
    - CRITICAL: Fatal errors requiring immediate attention
    - ERROR: Most error codes (default)
    - WARNING: Degraded but potentially temporary conditions
    - INFO: Reserved for future diagnostic codes

    Returns:
        Severity level for this error code.
    """
    # Critical errors - job cannot continue
    critical_codes = {
        ErrorCode.EXECUTION_CRASHED,
        ErrorCode.EXECUTION_OOM,
        ErrorCode.STATE_CORRUPTION,
        ErrorCode.BACKEND_AUTH,
        ErrorCode.BACKEND_NOT_FOUND,
    }
    if self in critical_codes:
        return Severity.CRITICAL

    # Warning level - degraded but potentially temporary
    warning_codes = {
        ErrorCode.CAPACITY_EXCEEDED,
        ErrorCode.VALIDATION_TIMEOUT,
        ErrorCode.EXECUTION_STALE,
    }
    if self in warning_codes:
        return Severity.WARNING

    # Default to ERROR for most codes
    return Severity.ERROR

RetryBehavior

Bases: NamedTuple

Precise retry behavior recommendation for a specific error code.

Unlike ErrorCategory which provides broad retry guidelines, RetryBehavior encodes error-code-specific knowledge about optimal retry strategies.

Attributes:

Name Type Description
delay_seconds float

Recommended delay before retrying (0 = no delay).

is_retriable bool

Whether this error is generally retriable.

reason str

Human-readable explanation for the retry behavior.

RetryDelays

Constants for retry delay durations.

Centralizes magic numbers for retry timing to ensure consistency across the codebase and make timing decisions discoverable.

These values represent standard delays for different error scenarios. Actual delays may be adjusted dynamically based on error context, parsed reset times, or learning from previous attempts.

Severity

Bases: IntEnum

Severity levels for error classification.

Lower numeric value = higher severity. This allows comparisons like if severity <= Severity.ERROR to check for serious issues.

Assignments: - CRITICAL: Job cannot continue, requires immediate attention (E003 crash, E005 OOM, E401 corruption, E502 auth, E505 binary not found) - ERROR: Operation failed, may be retriable (most error codes) - WARNING: Degraded operation, job may continue (E103 capacity, E204 validation timeout) - INFO: Informational, no action required (reserved for future diagnostic codes)

FatalError

Bases: Exception

Non-recoverable error that should stop the job.

GracefulShutdownError

Bases: Exception

Raised when Ctrl+C is pressed to trigger graceful shutdown.

This exception is caught by the runner to save state before exiting.

RateLimitExhaustedError

RateLimitExhaustedError(message, resume_after=None, backend_type='unknown', quota_exhaustion=False)

Bases: FatalError

Rate limit or quota exhaustion — job should PAUSE, not FAIL.

Subclasses FatalError for backward compatibility: existing except FatalError blocks still catch it, but more specific except RateLimitExhaustedError blocks intercept first when ordered before except FatalError.

Attributes:

Name Type Description
resume_after

When the rate limit resets (ISO datetime), or None.

backend_type

Which backend hit the limit (e.g., "claude-cli").

quota_exhaustion

True if daily/monthly quota is exhausted, False if it's a per-minute rate limit.

Source code in src/marianne/core/errors/exceptions.py
def __init__(
    self,
    message: str,
    resume_after: datetime | Any | None = None,
    backend_type: str = "unknown",
    quota_exhaustion: bool = False,
) -> None:
    super().__init__(message)
    self.resume_after = resume_after
    self.backend_type = backend_type
    self.quota_exhaustion = quota_exhaustion

ClassificationInput dataclass

ClassificationInput(stdout='', stderr='', exit_code=None, exit_signal=None, exit_reason=None, exception=None, output_format=None)

Bundled inputs for ErrorClassifier.classify_execution().

Groups the execution result fields that the classifier needs, reducing the method's parameter count from 8 to 2 (self + input). Callers can still pass individual keyword arguments for backward compatibility.

ClassificationResult dataclass

ClassificationResult(primary, secondary=list(), raw_errors=list(), confidence=1.0, classification_method='structured')

Complete classification result with root cause and context.

This is the new result type from the classifier, providing access to all detected errors while maintaining backward compatibility through the primary attribute.

Example:

# New code - returns ClassificationResult
classification = classifier.classify(stdout, stderr, exit_code)
result = classification.primary  # Backward compatible

# Access all errors
for error in classification.all_errors:
    log.info(f"Error: {error.error_code.value} - {error.message}")

Attributes:

Name Type Description
primary ClassifiedError

The identified root cause error.

secondary list[ClassifiedError]

Secondary/symptom errors for debugging.

raw_errors list[ParsedCliError]

Original parsed errors from CLI JSON.

confidence float

0.0-1.0 confidence in root cause identification.

classification_method str

How classification was done.

Attributes
primary instance-attribute
primary

The identified root cause error.

secondary class-attribute instance-attribute
secondary = field(default_factory=list)

Secondary/symptom errors for debugging.

raw_errors class-attribute instance-attribute
raw_errors = field(default_factory=list)

Original parsed errors from CLI JSON.

confidence class-attribute instance-attribute
confidence = 1.0

0.0-1.0 confidence in root cause identification.

classification_method class-attribute instance-attribute
classification_method = 'structured'

How classification was done: "structured", "exit_code", "regex_fallback".

all_errors property
all_errors

All errors including primary and secondary.

error_codes property
error_codes

All error codes for logging/metrics.

category property
category

Category of the primary error (backward compatibility).

message property
message

Message of the primary error (backward compatibility).

error_code property
error_code

Error code of the primary error (backward compatibility).

retriable property
retriable

Whether the primary error is retriable (backward compatibility).

should_retry property
should_retry

Whether to retry based on primary error (backward compatibility).

Functions
to_error_chain
to_error_chain()

Convert to ErrorChain for detailed analysis.

Source code in src/marianne/core/errors/models.py
def to_error_chain(self) -> ErrorChain:
    """Convert to ErrorChain for detailed analysis."""
    return ErrorChain(
        errors=self.all_errors,
        root_cause=self.primary,
        symptoms=self.secondary,
        confidence=self.confidence,
    )

ClassifiedError dataclass

ClassifiedError(category, message, error_code=UNKNOWN, original_error=None, exit_code=None, exit_signal=None, exit_reason=None, retriable=True, suggested_wait_seconds=None, error_info=None)

An error with its classification and metadata.

ClassifiedError combines high-level category (for retry logic) with specific error codes (for diagnostics and logging). The error_code provides stable identifiers for programmatic handling while category determines retry behavior.

Attributes
error_info class-attribute instance-attribute
error_info = None

Optional structured metadata for this error.

is_signal_kill property
is_signal_kill

True if process was killed by a signal.

signal_name property
signal_name

Human-readable signal name if killed by signal.

code property
code

Get the error code string value (e.g., 'E001').

severity property
severity

Get the severity level for this error.

ErrorChain dataclass

ErrorChain(errors, root_cause, symptoms=list(), confidence=1.0)

Represents a chain of errors from symptom to root cause.

When multiple errors occur, this class helps identify the actual root cause vs symptoms. For example, if ENOENT and rate limit both appear, ENOENT is likely the root cause (missing binary prevents any operation).

Attributes:

Name Type Description
errors list[ClassifiedError]

All errors in order of detection (first = earliest).

root_cause ClassifiedError

The error identified as the most fundamental cause.

symptoms list[ClassifiedError]

Errors that are likely consequences of the root cause.

confidence float

0.0-1.0 confidence in root cause identification.

Attributes
errors instance-attribute
errors

All errors in order of detection (first = earliest).

root_cause instance-attribute
root_cause

The error identified as the most fundamental cause.

symptoms class-attribute instance-attribute
symptoms = field(default_factory=list)

Errors that are likely consequences of the root cause.

confidence class-attribute instance-attribute
confidence = 1.0

0.0-1.0 confidence in root cause identification.

ErrorInfo dataclass

ErrorInfo(reason, domain, metadata=dict())

Machine-readable error identification (Google AIP-193 inspired).

Provides structured metadata for programmatic error handling.

Example:

error_info = ErrorInfo(
    reason="BINARY_NOT_FOUND",
    domain="marianne.backend.claude_cli",
    metadata={
        "expected_binary": "claude",
        "search_path": "/usr/bin:/usr/local/bin",
        "suggestion": "Ensure claude CLI is installed and in PATH",
    }
)

Attributes:

Name Type Description
reason str

UPPER_SNAKE_CASE identifier for the specific error reason.

domain str

Service/component identifier (e.g., "marianne.backend.claude_cli").

metadata dict[str, str]

Dynamic contextual information as key-value pairs.

Attributes
reason instance-attribute
reason

UPPER_SNAKE_CASE identifier for the specific error reason. Example: "RATE_LIMIT_EXCEEDED", "BINARY_NOT_FOUND"

domain instance-attribute
domain

Service/component identifier. Example: "marianne.backend.claude_cli", "marianne.execution"

metadata class-attribute instance-attribute
metadata = field(default_factory=dict)

Dynamic contextual information. Example: {"binary_path": "/usr/bin/claude", "exit_code": "127"}

ParsedCliError dataclass

ParsedCliError(error_type, message, tool_name=None, metadata=dict())

A single error extracted from CLI JSON output.

Claude CLI returns structured JSON with an errors[] array:

{
  "result": "...",
  "errors": [
    {"type": "system", "message": "Rate limit exceeded"},
    {"type": "user", "message": "spawn claude ENOENT"}
  ],
  "cost_usd": 0.05
}

This dataclass represents one item from that array.

Attributes:

Name Type Description
error_type Literal['system', 'user', 'tool']

Error type from CLI: "system", "user", "tool".

message str

Human-readable error message.

tool_name str | None

For tool errors, the name of the failed tool.

metadata dict[str, Any]

Additional structured metadata from the error.

Attributes
error_type instance-attribute
error_type

Error type from CLI: "system", "user", "tool".

message instance-attribute
message

Human-readable error message.

tool_name class-attribute instance-attribute
tool_name = None

For tool errors, the name of the failed tool.

metadata class-attribute instance-attribute
metadata = field(default_factory=dict)

Additional structured metadata.

Functions

classify_single_json_error

classify_single_json_error(parsed_error, exit_code=None, exit_reason=None)

Classify a single error from the JSON errors[] array.

This function uses type-based classification first, then falls back to message pattern matching. The error type from CLI ("system", "user", "tool") guides initial classification.

Parameters:

Name Type Description Default
parsed_error ParsedCliError

A ParsedCliError extracted from CLI JSON output.

required
exit_code int | None

Optional exit code for context.

None
exit_reason ExitReason | None

Optional exit reason for context.

None

Returns:

Type Description
ClassifiedError

ClassifiedError with appropriate category and error code.

Source code in src/marianne/core/errors/parsers.py
def classify_single_json_error(
    parsed_error: ParsedCliError,
    exit_code: int | None = None,
    exit_reason: ExitReason | None = None,
) -> ClassifiedError:
    """Classify a single error from the JSON errors[] array.

    This function uses type-based classification first, then falls back to
    message pattern matching. The error type from CLI ("system", "user", "tool")
    guides initial classification.

    Args:
        parsed_error: A ParsedCliError extracted from CLI JSON output.
        exit_code: Optional exit code for context.
        exit_reason: Optional exit reason for context.

    Returns:
        ClassifiedError with appropriate category and error code.
    """
    message = parsed_error.message.lower()
    error_type = parsed_error.error_type.lower()

    # === Type-based classification ===

    if error_type == "system":
        # System errors are usually API/service level
        # Check rate limit patterns
        rate_limit_indicators = [
            "rate limit", "rate_limit", "quota", "too many requests",
            "429", "hit your limit", "limit exceeded", "daily limit",
        ]
        if any(indicator in message for indicator in rate_limit_indicators):
            # Differentiate capacity vs rate limit
            capacity_indicators = ["capacity", "overloaded", "try again later", "unavailable"]
            if any(indicator in message for indicator in capacity_indicators):
                return ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message=parsed_error.message,
                    error_code=ErrorCode.CAPACITY_EXCEEDED,
                    exit_code=exit_code,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=300.0,
                )
            return ClassifiedError(
                category=ErrorCategory.RATE_LIMIT,
                message=parsed_error.message,
                error_code=ErrorCode.RATE_LIMIT_API,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=3600.0,
            )

        # Check auth patterns
        auth_indicators = ["unauthorized", "authentication", "invalid api key", "401", "403"]
        if any(indicator in message for indicator in auth_indicators):
            return ClassifiedError(
                category=ErrorCategory.AUTH,
                message=parsed_error.message,
                error_code=ErrorCode.BACKEND_AUTH,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

    elif error_type == "user":
        # User errors are usually environment/config issues
        # ENOENT is critical - often the root cause
        # Common patterns: "ENOENT", "spawn claude ENOENT", "command not found"
        if "enoent" in message or "command not found" in message:
            return ClassifiedError(
                category=ErrorCategory.CONFIGURATION,
                message=parsed_error.message,
                error_code=ErrorCode.BACKEND_NOT_FOUND,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,  # Might recover after reinstall
                suggested_wait_seconds=30.0,
                error_info=ErrorInfo(
                    reason="BINARY_NOT_FOUND",
                    domain="marianne.backend.claude_cli",
                    metadata={"original_message": parsed_error.message},
                ),
            )

        if "permission denied" in message or "access denied" in message:
            return ClassifiedError(
                category=ErrorCategory.AUTH,
                message=parsed_error.message,
                error_code=ErrorCode.BACKEND_AUTH,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

        if "no such file" in message or "not found" in message:
            return ClassifiedError(
                category=ErrorCategory.CONFIGURATION,
                message=parsed_error.message,
                error_code=ErrorCode.CONFIG_PATH_NOT_FOUND,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

    elif error_type == "tool":
        # Tool errors need message analysis
        if "mcp" in message or "server" in message:
            return ClassifiedError(
                category=ErrorCategory.CONFIGURATION,
                message=parsed_error.message,
                error_code=ErrorCode.CONFIG_MCP_ERROR,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

        # Tool execution failures are often validation issues
        return ClassifiedError(
            category=ErrorCategory.VALIDATION,
            message=parsed_error.message,
            error_code=ErrorCode.VALIDATION_COMMAND_FAILED,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=10.0,
        )

    # === Message pattern fallback ===

    # Network errors
    network_indicators = [
        "connection refused", "connection reset", "econnrefused",
        "etimedout", "network unreachable",
    ]
    if any(indicator in message for indicator in network_indicators):
        return ClassifiedError(
            category=ErrorCategory.NETWORK,
            message=parsed_error.message,
            error_code=ErrorCode.NETWORK_CONNECTION_FAILED,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=30.0,
        )

    # DNS errors
    dns_indicators = ["dns", "getaddrinfo", "enotfound", "resolve"]
    if any(indicator in message for indicator in dns_indicators):
        return ClassifiedError(
            category=ErrorCategory.NETWORK,
            message=parsed_error.message,
            error_code=ErrorCode.NETWORK_DNS_ERROR,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=30.0,
        )

    # SSL/TLS errors
    ssl_indicators = ["ssl", "tls", "certificate", "handshake"]
    if any(indicator in message for indicator in ssl_indicators):
        return ClassifiedError(
            category=ErrorCategory.NETWORK,
            message=parsed_error.message,
            error_code=ErrorCode.NETWORK_SSL_ERROR,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=30.0,
        )

    # Timeout patterns
    timeout_indicators = ["timeout", "timed out"]
    if any(indicator in message for indicator in timeout_indicators):
        return ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=parsed_error.message,
            error_code=ErrorCode.EXECUTION_TIMEOUT,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=60.0,
        )

    # Default: unknown error with the original message
    return ClassifiedError(
        category=ErrorCategory.TRANSIENT,
        message=parsed_error.message,
        error_code=ErrorCode.UNKNOWN,
        exit_code=exit_code,
        exit_reason=exit_reason,
        retriable=True,
        suggested_wait_seconds=30.0,
    )

select_root_cause

select_root_cause(errors)

Select the most likely root cause from multiple errors.

Uses priority-based scoring where lower score = more fundamental cause. Applies context modifiers for specific error combinations that commonly mask root causes.

Known masking patterns: - ENOENT masks everything (missing binary causes cascading failures) - Auth errors mask rate limits (can't hit rate limit if auth fails) - Network errors mask service errors (can't reach service to get errors) - Config errors mask execution errors (bad config causes execution failure) - Timeout masks completion (timed out = never got to complete)

Parameters:

Name Type Description Default
errors list[ClassifiedError]

List of classified errors to analyze.

required

Returns:

Type Description
ClassifiedError

Tuple of (root_cause, symptoms, confidence).

list[ClassifiedError]
  • root_cause: The most fundamental error that likely caused others
float
  • symptoms: Other errors that are likely consequences
tuple[ClassifiedError, list[ClassifiedError], float]
  • confidence: 0.0-1.0 confidence in root cause identification (higher when there's a clear priority gap)
Source code in src/marianne/core/errors/parsers.py
def select_root_cause(
    errors: list[ClassifiedError],
) -> tuple[ClassifiedError, list[ClassifiedError], float]:
    """Select the most likely root cause from multiple errors.

    Uses priority-based scoring where lower score = more fundamental cause.
    Applies context modifiers for specific error combinations that commonly
    mask root causes.

    Known masking patterns:
    - ENOENT masks everything (missing binary causes cascading failures)
    - Auth errors mask rate limits (can't hit rate limit if auth fails)
    - Network errors mask service errors (can't reach service to get errors)
    - Config errors mask execution errors (bad config causes execution failure)
    - Timeout masks completion (timed out = never got to complete)

    Args:
        errors: List of classified errors to analyze.

    Returns:
        Tuple of (root_cause, symptoms, confidence).
        - root_cause: The most fundamental error that likely caused others
        - symptoms: Other errors that are likely consequences
        - confidence: 0.0-1.0 confidence in root cause identification
          (higher when there's a clear priority gap)
    """
    if not errors:
        # Return an unknown error as fallback
        unknown = ClassifiedError(
            category=ErrorCategory.FATAL,
            message="No errors provided",
            error_code=ErrorCode.UNKNOWN,
            retriable=False,
        )
        return (unknown, [], 0.0)

    if len(errors) == 1:
        return (errors[0], [], 1.0)

    # Calculate modified priorities using index-based lookup
    # (ClassifiedError is a mutable dataclass and not hashable)
    error_codes_present = {e.error_code for e in errors}
    priorities: list[int] = []

    for error in errors:
        priority = ROOT_CAUSE_PRIORITY.get(error.error_code, 99)

        # === Priority Modifiers for Common Masking Patterns ===

        # ENOENT (missing binary) masks everything - it's almost always root cause
        if error.error_code == ErrorCode.BACKEND_NOT_FOUND and any(
            e.error_code != ErrorCode.BACKEND_NOT_FOUND for e in errors
        ):
            priority -= 10  # Strong boost - ENOENT is very fundamental

        # Config path not found is similar - can't run without config
        if error.error_code == ErrorCode.CONFIG_PATH_NOT_FOUND:
            priority -= 5

        # Auth errors mask rate limits (can't be rate limited if auth fails)
        if error.error_code == ErrorCode.BACKEND_AUTH and (
            ErrorCode.RATE_LIMIT_API in error_codes_present
            or ErrorCode.RATE_LIMIT_CLI in error_codes_present
        ):
            priority -= 5

        # Network errors mask service errors
        if error.error_code in (
            ErrorCode.NETWORK_CONNECTION_FAILED,
            ErrorCode.NETWORK_DNS_ERROR,
            ErrorCode.NETWORK_SSL_ERROR,
        ) and (
            ErrorCode.BACKEND_TIMEOUT in error_codes_present
            or ErrorCode.RATE_LIMIT_API in error_codes_present
        ):
            priority -= 3

        # MCP config errors mask tool execution errors
        if (
            error.error_code == ErrorCode.CONFIG_MCP_ERROR
            and ErrorCode.VALIDATION_COMMAND_FAILED in error_codes_present
        ):
            priority -= 3

        # CLI mode errors (streaming vs JSON) are config issues that mask execution
        if error.error_code == ErrorCode.CONFIG_CLI_MODE_ERROR and any(
            e.error_code.category == "execution" for e in errors
        ):
            priority -= 3

        # Timeout is a symptom when paired with rate limits (waited too long)
        if (
            error.error_code == ErrorCode.EXECUTION_TIMEOUT
            and ErrorCode.RATE_LIMIT_API in error_codes_present
        ):
            priority += 5  # Demote timeout - rate limit is root cause

        priorities.append(priority)

    # Find minimum priority (root cause)
    min_idx = min(range(len(errors)), key=lambda i: priorities[i])
    root_cause = errors[min_idx]
    root_priority = priorities[min_idx]

    # Build symptoms list (all errors except root cause)
    symptoms = [errors[i] for i in range(len(errors)) if i != min_idx]
    symptom_priorities = [priorities[i] for i in range(len(errors)) if i != min_idx]

    # Calculate confidence based on priority gap
    # Higher gap = clearer root cause = more confidence
    if symptom_priorities:
        next_priority = min(symptom_priorities)
        gap = next_priority - root_priority

        # Base confidence starts at 0.5 for multiple errors
        # Each priority tier gap adds 5% confidence
        confidence = min(0.5 + (gap * 0.05), 1.0)

        # Boost confidence for known high-signal root causes
        if root_cause.error_code in (
            ErrorCode.BACKEND_NOT_FOUND,  # ENOENT is almost always correct
            ErrorCode.BACKEND_AUTH,  # Auth failures are clear
            ErrorCode.CONFIG_PATH_NOT_FOUND,  # Missing config is clear
        ):
            confidence = min(confidence + 0.15, 1.0)

        # Lower confidence when all errors are in same tier (ambiguous)
        if gap == 0:
            confidence = 0.4  # Significant ambiguity
    else:
        confidence = 1.0

    return (root_cause, symptoms, confidence)

try_parse_json_errors

try_parse_json_errors(output, stderr='')

Extract errors[] array from JSON output.

Claude CLI returns structured JSON with an errors[] array:

{
  "result": "...",
  "errors": [
    {"type": "system", "message": "Rate limit exceeded"},
    {"type": "user", "message": "spawn claude ENOENT"}
  ],
  "cost_usd": 0.05
}

This function parses that structure, handling: - Non-JSON preamble (CLI startup messages) - Multiple JSON objects (takes first valid one with errors[]) - JSON in stderr (some error modes write there) - Truncated JSON (tries to recover)

Parameters:

Name Type Description Default
output str

Raw stdout from Claude CLI execution.

required
stderr str

Optional stderr output (some errors appear here).

''

Returns:

Type Description
list[ParsedCliError]

List of ParsedCliError objects, or empty list if parsing fails.

Source code in src/marianne/core/errors/parsers.py
def try_parse_json_errors(output: str, stderr: str = "") -> list[ParsedCliError]:
    """Extract errors[] array from JSON output.

    Claude CLI returns structured JSON with an `errors[]` array:
    ```json
    {
      "result": "...",
      "errors": [
        {"type": "system", "message": "Rate limit exceeded"},
        {"type": "user", "message": "spawn claude ENOENT"}
      ],
      "cost_usd": 0.05
    }
    ```

    This function parses that structure, handling:
    - Non-JSON preamble (CLI startup messages)
    - Multiple JSON objects (takes first valid one with errors[])
    - JSON in stderr (some error modes write there)
    - Truncated JSON (tries to recover)

    Args:
        output: Raw stdout from Claude CLI execution.
        stderr: Optional stderr output (some errors appear here).

    Returns:
        List of ParsedCliError objects, or empty list if parsing fails.
    """
    errors: list[ParsedCliError] = []

    # Try both stdout and stderr - errors can appear in either
    for text in [output, stderr]:
        if not text:
            continue

        found_errors = _extract_json_errors_from_text(text)
        if found_errors:
            errors.extend(found_errors)

    # Deduplicate by message (same error might appear in both streams)
    seen_messages: set[str] = set()
    unique_errors: list[ParsedCliError] = []
    for error in errors:
        if error.message not in seen_messages:
            seen_messages.add(error.message)
            unique_errors.append(error)

    return unique_errors

get_signal_name

get_signal_name(sig_num)

Get human-readable signal name.

Parameters:

Name Type Description Default
sig_num int

The signal number (e.g., signal.SIGTERM)

required

Returns:

Type Description
str

Human-readable signal name (e.g., "SIGTERM") or "signal N" if unknown

Source code in src/marianne/core/errors/signals.py
def get_signal_name(sig_num: int) -> str:
    """Get human-readable signal name.

    Args:
        sig_num: The signal number (e.g., signal.SIGTERM)

    Returns:
        Human-readable signal name (e.g., "SIGTERM") or "signal N" if unknown
    """
    signal_names: dict[int, str] = {
        signal.SIGTERM: "SIGTERM",
        signal.SIGKILL: "SIGKILL",
        signal.SIGINT: "SIGINT",
        signal.SIGSEGV: "SIGSEGV",
        signal.SIGABRT: "SIGABRT",
        signal.SIGBUS: "SIGBUS",
        signal.SIGFPE: "SIGFPE",
        signal.SIGHUP: "SIGHUP",
        signal.SIGPIPE: "SIGPIPE",
    }
    return signal_names.get(sig_num, f"signal {sig_num}")