Skip to content

classifier

classifier

ErrorClassifier implementation for pattern-based error classification.

This is the main classifier class that analyzes stdout, stderr, exit codes, and signals to produce ClassifiedError instances with appropriate retry behavior.

Attributes

Classes

ErrorClassifier

ErrorClassifier(rate_limit_patterns=None, auth_patterns=None, network_patterns=None)

Classifies errors based on patterns and exit codes.

Pattern matching follows the approach from run-sheet-review.sh which checks output for rate limit indicators.

Initialize classifier with detection patterns.

Parameters:

Name Type Description Default
rate_limit_patterns list[str] | None

Regex patterns indicating rate limiting

None
auth_patterns list[str] | None

Regex patterns indicating auth failures

None
network_patterns list[str] | None

Regex patterns indicating network issues

None
Source code in src/marianne/core/errors/classifier.py
def __init__(
    self,
    rate_limit_patterns: list[str] | None = None,
    auth_patterns: list[str] | None = None,
    network_patterns: list[str] | None = None,
):
    """Initialize classifier with detection patterns.

    Args:
        rate_limit_patterns: Regex patterns indicating rate limiting
        auth_patterns: Regex patterns indicating auth failures
        network_patterns: Regex patterns indicating network issues
    """
    self.rate_limit_patterns = _compile_patterns(
        rate_limit_patterns or _DEFAULT_RATE_LIMIT_PATTERNS
    )
    self.auth_patterns = _compile_patterns(auth_patterns or _DEFAULT_AUTH_PATTERNS)
    self.network_patterns = _compile_patterns(network_patterns or _DEFAULT_NETWORK_PATTERNS)
    self.dns_patterns = _compile_patterns(_DEFAULT_DNS_PATTERNS)
    self.ssl_patterns = _compile_patterns(_DEFAULT_SSL_PATTERNS)
    self.capacity_patterns = _compile_patterns(_DEFAULT_CAPACITY_PATTERNS)
    self.quota_exhaustion_patterns = _compile_patterns(_DEFAULT_QUOTA_EXHAUSTION_PATTERNS)
    self.reset_time_patterns = _compile_patterns(_DEFAULT_RESET_TIME_PATTERNS)
    self.mcp_patterns = _compile_patterns(_DEFAULT_MCP_PATTERNS)
    self.cli_mode_patterns = _compile_patterns(_DEFAULT_CLI_MODE_PATTERNS)
    self.enoent_patterns = _compile_patterns(_DEFAULT_ENOENT_PATTERNS)
    self.stale_patterns = _compile_patterns(_DEFAULT_STALE_PATTERNS)

    # Pre-computed combined regex patterns for _matches_any().
    # Each pattern list is merged into a single alternation regex so that
    # matching is a single .search() call per category.
    self._combined_cache: dict[int, re.Pattern[str]] = {}
    for attr_name in (
        "rate_limit_patterns", "auth_patterns", "network_patterns",
        "dns_patterns", "ssl_patterns", "capacity_patterns",
        "quota_exhaustion_patterns", "mcp_patterns",
        "cli_mode_patterns", "enoent_patterns", "stale_patterns",
    ):
        patterns = getattr(self, attr_name)
        if patterns:
            alternation = "|".join(f"(?:{p.pattern})" for p in patterns)
            self._combined_cache[id(patterns)] = re.compile(
                alternation, re.IGNORECASE
            )
Functions
parse_reset_time
parse_reset_time(text)

Parse reset time from message and return seconds until reset.

Supports patterns like: - "resets at 9pm" -> seconds until 9pm (or next day if past) - "resets at 21:00" -> seconds until 21:00 - "resets in 3 hours" -> 3 * 3600 seconds - "resets in 30 minutes" -> 30 * 60 seconds

Parameters:

Name Type Description Default
text str

Error message that may contain reset time info.

required

Returns:

Type Description
float | None

Seconds until reset, or None if no reset time found.

float | None

Returns minimum of RESET_TIME_MINIMUM_WAIT_SECONDS to avoid immediate retries.

Source code in src/marianne/core/errors/classifier.py
def parse_reset_time(self, text: str) -> float | None:
    """Parse reset time from message and return seconds until reset.

    Supports patterns like:
    - "resets at 9pm" -> seconds until 9pm (or next day if past)
    - "resets at 21:00" -> seconds until 21:00
    - "resets in 3 hours" -> 3 * 3600 seconds
    - "resets in 30 minutes" -> 30 * 60 seconds

    Args:
        text: Error message that may contain reset time info.

    Returns:
        Seconds until reset, or None if no reset time found.
        Returns minimum of RESET_TIME_MINIMUM_WAIT_SECONDS to avoid immediate retries.
    """

    for pattern in self.reset_time_patterns:
        match = pattern.search(text)
        if not match:
            continue

        groups = match.groups()

        # Pattern: "resets in X hours/minutes"
        if (
            len(groups) == 2
            and groups[1]
            and groups[1].lower() in ("hour", "hr", "minute", "min")
        ):
            amount = int(groups[0])
            unit = groups[1].lower()
            seconds: float = amount * 3600 if unit in ("hour", "hr") else amount * 60
            return self._clamp_wait(seconds)

        # Pattern: "resets at X:XX" (24-hour time)
        if len(groups) == 2 and groups[1] and groups[1].isdigit():
            hour = int(groups[0])
            minute = int(groups[1])
            now = datetime.now()
            reset_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
            if reset_time <= now:
                reset_time += timedelta(days=1)  # Next day
            seconds = (reset_time - now).total_seconds()
            return self._clamp_wait(seconds)

        # Pattern: "resets at Xpm/Xam"
        if len(groups) == 2 and groups[1] and groups[1].lower() in ("am", "pm"):
            hour = int(groups[0])
            meridiem = groups[1].lower()
            if meridiem == "pm" and hour != 12:
                hour += 12
            elif meridiem == "am" and hour == 12:
                hour = 0
            now = datetime.now()
            reset_time = now.replace(hour=hour, minute=0, second=0, microsecond=0)
            if reset_time <= now:
                reset_time += timedelta(days=1)  # Next day
            seconds = (reset_time - now).total_seconds()
            return self._clamp_wait(seconds)

    # No pattern matched, return default wait
    return None
extract_rate_limit_wait
extract_rate_limit_wait(text)

Extract wait duration from rate limit error text.

Supports common patterns from Anthropic, Claude Code, and generic APIs: - "retry after N seconds/minutes/hours" - "try again in N seconds/minutes/hours" - "wait N seconds/minutes/hours" - "Retry-After: N" (header value) - "resets in N hours/minutes" (delegates to parse_reset_time)

Parameters:

Name Type Description Default
text str

Error message or combined stdout/stderr.

required

Returns:

Type Description
float | None

Seconds to wait, clamped to [MIN, MAX], or None if no pattern matches.

Source code in src/marianne/core/errors/classifier.py
def extract_rate_limit_wait(self, text: str) -> float | None:
    """Extract wait duration from rate limit error text.

    Supports common patterns from Anthropic, Claude Code, and generic APIs:
    - "retry after N seconds/minutes/hours"
    - "try again in N seconds/minutes/hours"
    - "wait N seconds/minutes/hours"
    - "Retry-After: N" (header value)
    - "resets in N hours/minutes" (delegates to parse_reset_time)

    Args:
        text: Error message or combined stdout/stderr.

    Returns:
        Seconds to wait, clamped to [MIN, MAX], or None if no pattern matches.
    """
    if not text:
        return None

    # Defer to existing parse_reset_time for "resets in/at" patterns
    reset_time = self.parse_reset_time(text)
    if reset_time is not None:
        return reset_time

    import re as _re

    patterns: list[tuple[_re.Pattern[str], float]] = [
        # "retry after N seconds/second"
        (_re.compile(r"retry\s+after\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "retry after N minutes/minute"
        (_re.compile(r"retry\s+after\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "retry after N hours/hour"
        (_re.compile(r"retry\s+after\s+(\d+)\s*h(?:ours?)?", _re.IGNORECASE), 3600.0),
        # "try again in N seconds"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "try again in N minutes"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "try again in N hours"
        (_re.compile(r"try\s+again\s+in\s+(\d+)\s*h(?:ours?)?", _re.IGNORECASE), 3600.0),
        # "wait N seconds"
        (_re.compile(r"wait\s+(\d+)\s*s(?:econds?)?", _re.IGNORECASE), 1.0),
        # "wait N minutes"
        (_re.compile(r"wait\s+(\d+)\s*m(?:in(?:utes?)?)?", _re.IGNORECASE), 60.0),
        # "Retry-After: N" (header value, always seconds)
        (_re.compile(r"[Rr]etry-?After\s*:\s*(\d+)", _re.IGNORECASE), 1.0),
    ]

    for pattern, multiplier in patterns:
        match = pattern.search(text)
        if match:
            value = int(match.group(1))
            return self._clamp_wait(value * multiplier)

    return None
classify
classify(stdout='', stderr='', exit_code=None, exit_signal=None, exit_reason=None, exception=None, output_format=None)

Classify an error based on output, exit code, and signal.

Delegates to sub-classifiers in priority order: 1. Signal-based exits (_classify_signal) 2. Timeout exit reason 3. Pattern-matching on output (_classify_by_pattern) 4. Exit code analysis (_classify_by_exit_code) 5. Unknown fallback

Parameters:

Name Type Description Default
stdout str

Standard output from the command

''
stderr str

Standard error from the command

''
exit_code int | None

Process exit code (0 = success), None if killed by signal

None
exit_signal int | None

Signal number if killed by signal

None
exit_reason ExitReason | None

Why execution ended (completed, timeout, killed, error)

None
exception Exception | None

Optional exception that was raised

None
output_format str | None

Backend output format ("text", "json", "stream-json"). When "text", exit code 1 is classified as E209 (validation) instead of E009 (unknown).

None

Returns:

Type Description
ClassifiedError

ClassifiedError with category, error_code, and metadata

Source code in src/marianne/core/errors/classifier.py
def classify(
    self,
    stdout: str = "",
    stderr: str = "",
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    exception: Exception | None = None,
    output_format: str | None = None,
) -> ClassifiedError:
    """Classify an error based on output, exit code, and signal.

    Delegates to sub-classifiers in priority order:
    1. Signal-based exits (_classify_signal)
    2. Timeout exit reason
    3. Pattern-matching on output (_classify_by_pattern)
    4. Exit code analysis (_classify_by_exit_code)
    5. Unknown fallback

    Args:
        stdout: Standard output from the command
        stderr: Standard error from the command
        exit_code: Process exit code (0 = success), None if killed by signal
        exit_signal: Signal number if killed by signal
        exit_reason: Why execution ended (completed, timeout, killed, error)
        exception: Optional exception that was raised
        output_format: Backend output format ("text", "json", "stream-json").
            When "text", exit code 1 is classified as E209 (validation)
            instead of E009 (unknown).

    Returns:
        ClassifiedError with category, error_code, and metadata
    """
    combined = f"{stdout}\n{stderr}"
    if exception:
        combined += f"\n{str(exception)}"

    # 0. Negative exit codes indicate signal kills (e.g., -9 = SIGKILL)
    # Python's subprocess reports killed-by-signal as negative exit codes.
    if exit_code is not None and exit_code < 0:
        exit_signal = abs(exit_code)

    # 1. Signal-based exits
    if exit_signal is not None:
        result = self._classify_signal(
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            stdout=stdout,
            stderr=stderr,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            retriable=result.retriable,
            suggested_wait=result.suggested_wait_seconds,
            message=result.message,
        )
        return result

    # 2. Timeout exit reason (even without signal)
    #    Differentiate stale detection (E006) from backend timeout (E001).
    #    Stale detection writes "Stale execution:" to stderr — F-097.
    if exit_reason == "timeout":
        is_stale = "stale execution" in combined.lower()
        error_code = (
            ErrorCode.EXECUTION_STALE if is_stale
            else ErrorCode.EXECUTION_TIMEOUT
        )
        message = (
            "Stale execution detected — no output activity"
            if is_stale
            else "Command timed out"
        )
        wait_seconds = 120.0 if is_stale else 60.0
        result = ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=message,
            error_code=error_code,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=result.retriable,
            message=result.message,
        )
        return result

    # 3. Pattern-matching on output text
    pattern_result = self._classify_by_pattern(
        combined, exit_code, exit_reason, exception,
    )
    if pattern_result is not None:
        return pattern_result

    # 4. Exit code analysis (with output for non-transient detection)
    exit_code_result = self._classify_by_exit_code(
        exit_code, exit_reason, exception, combined, output_format,
    )
    if exit_code_result is not None:
        return exit_code_result

    # 5. exit_code=None: process killed or disappeared without exit code.
    # Always retriable — this is never a deterministic user error.
    # Check stderr for OOM indicators to set appropriate wait time.
    if exit_code is None:
        # OOM/kill indicators in stderr → longer wait (memory needs to free)
        oom_indicators = ("killed", "out of memory", "oom", "cannot allocate")
        stderr_lower = stderr.lower() if stderr else ""
        is_oom = any(indicator in stderr_lower for indicator in oom_indicators)
        wait_seconds = 60.0 if is_oom else 10.0
        message = (
            "Process killed (possible OOM — retrying with longer wait)"
            if is_oom
            else "Process exited without exit code (possible signal race — retrying)"
        )

        result = ClassifiedError(
            category=ErrorCategory.TRANSIENT,
            message=message,
            error_code=ErrorCode.UNKNOWN,
            original_error=exception,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        _logger.warning(
            _EVT_ERROR_CLASSIFIED,
            category=result.category.value,
            error_code=result.error_code.value,
            exit_code=exit_code,
            retriable=result.retriable,
            is_oom=is_oom,
            message=result.message,
        )
        return result

    # 6. Unknown fallback
    result = ClassifiedError(
        category=ErrorCategory.FATAL,
        message=f"Unknown error (exit_code={exit_code})",
        error_code=ErrorCode.UNKNOWN,
        original_error=exception,
        exit_code=exit_code,
        exit_signal=None,
        exit_reason=exit_reason,
        retriable=False,
    )
    _logger.warning(
        _EVT_ERROR_CLASSIFIED,
        category=result.category.value,
        error_code=result.error_code.value,
        exit_code=exit_code,
        retriable=result.retriable,
        message=result.message,
    )
    return result
classify_execution
classify_execution(stdout='', stderr='', exit_code=None, exit_signal=None, exit_reason=None, exception=None, output_format=None, *, input=None)

Classify execution errors using structured JSON parsing with fallback.

This is the new multi-error classification method that: 1. Parses structured JSON errors[] from CLI output (if present) 2. Classifies each error independently (no short-circuiting) 3. Analyzes exit code and signal for additional context 4. Selects root cause using priority-based scoring 5. Returns all errors with primary/secondary designation

This method returns ClassificationResult which provides access to all detected errors while maintaining backward compatibility through the primary attribute.

Supports two calling conventions
  1. Keyword args (legacy): classify_execution(stdout=..., stderr=..., ...)
  2. Bundled (preferred): classify_execution(input=ClassificationInput(...))

When input is supplied, its fields take precedence over individual keyword arguments.

Parameters:

Name Type Description Default
stdout str

Standard output from the command (may contain JSON).

''
stderr str

Standard error from the command.

''
exit_code int | None

Process exit code (0 = success), None if killed by signal.

None
exit_signal int | None

Signal number if killed by signal.

None
exit_reason ExitReason | None

Why execution ended (completed, timeout, killed, error).

None
exception Exception | None

Optional exception that was raised.

None
output_format str | None

Expected output format (e.g. "json").

None
input ClassificationInput | None

Bundled classification input (preferred over individual kwargs).

None

Returns:

Type Description
ClassificationResult

ClassificationResult with primary error, secondary errors, and metadata.

Example
result = classifier.classify_execution(stdout, stderr, exit_code)

# Access primary (root cause) error
if result.primary.category == ErrorCategory.RATE_LIMIT:
    wait_time = result.primary.suggested_wait_seconds

# Access all errors for debugging
for error in result.all_errors:
    logger.info(f"{error.error_code.value}: {error.message}")
Source code in src/marianne/core/errors/classifier.py
def classify_execution(
    self,
    stdout: str = "",
    stderr: str = "",
    exit_code: int | None = None,
    exit_signal: int | None = None,
    exit_reason: ExitReason | None = None,
    exception: Exception | None = None,
    output_format: str | None = None,
    *,
    input: ClassificationInput | None = None,
) -> ClassificationResult:
    """Classify execution errors using structured JSON parsing with fallback.

    This is the new multi-error classification method that:
    1. Parses structured JSON errors[] from CLI output (if present)
    2. Classifies each error independently (no short-circuiting)
    3. Analyzes exit code and signal for additional context
    4. Selects root cause using priority-based scoring
    5. Returns all errors with primary/secondary designation

    This method returns ClassificationResult which provides access to
    all detected errors while maintaining backward compatibility through
    the `primary` attribute.

    Supports two calling conventions:
        1. **Keyword args** (legacy): ``classify_execution(stdout=..., stderr=..., ...)``
        2. **Bundled** (preferred): ``classify_execution(input=ClassificationInput(...))``

    When *input* is supplied, its fields take precedence over
    individual keyword arguments.

    Args:
        stdout: Standard output from the command (may contain JSON).
        stderr: Standard error from the command.
        exit_code: Process exit code (0 = success), None if killed by signal.
        exit_signal: Signal number if killed by signal.
        exit_reason: Why execution ended (completed, timeout, killed, error).
        exception: Optional exception that was raised.
        output_format: Expected output format (e.g. "json").
        input: Bundled classification input (preferred over individual kwargs).

    Returns:
        ClassificationResult with primary error, secondary errors, and metadata.

    Example:
        ```python
        result = classifier.classify_execution(stdout, stderr, exit_code)

        # Access primary (root cause) error
        if result.primary.category == ErrorCategory.RATE_LIMIT:
            wait_time = result.primary.suggested_wait_seconds

        # Access all errors for debugging
        for error in result.all_errors:
            logger.info(f"{error.error_code.value}: {error.message}")
        ```
    """
    if input is not None:
        stdout = input.stdout
        stderr = input.stderr
        exit_code = input.exit_code
        exit_signal = input.exit_signal
        exit_reason = input.exit_reason
        exception = input.exception
        output_format = input.output_format
    all_errors: list[ClassifiedError] = []
    raw_errors: list[ParsedCliError] = []
    classification_method = "structured"

    # === PHASE 1: Parse Structured JSON ===
    # Pass both stdout and stderr - errors can appear in either stream
    json_errors = try_parse_json_errors(stdout, stderr)
    raw_errors = json_errors

    if json_errors:
        for parsed_error in json_errors:
            classified = classify_single_json_error(
                parsed_error,
                exit_code=exit_code,
                exit_reason=exit_reason,
            )
            all_errors.append(classified)

    # === PHASE 2: Exit Code / Signal Analysis ===
    if exit_signal is not None:
        signal_error = self._classify_signal(
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            stdout=stdout,
            stderr=stderr,
        )
        # Only add if not duplicating an existing error code
        if not any(e.error_code == signal_error.error_code for e in all_errors):
            all_errors.append(signal_error)
            if not json_errors:
                classification_method = "exit_code"

    elif exit_reason == "timeout":
        # Differentiate stale detection (E006) from backend timeout (E001).
        # Stale detection writes "Stale execution:" to stderr — F-097.
        combined_for_stale = f"{stdout}\n{stderr}".lower()
        is_stale = "stale execution" in combined_for_stale
        timeout_code = (
            ErrorCode.EXECUTION_STALE if is_stale
            else ErrorCode.EXECUTION_TIMEOUT
        )
        timeout_message = (
            "Stale execution detected — no output activity"
            if is_stale
            else "Command timed out"
        )
        timeout_wait = 120.0 if is_stale else 60.0
        timeout_error = ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=timeout_message,
            error_code=timeout_code,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=timeout_wait,
        )
        if not any(e.error_code == timeout_code for e in all_errors):
            all_errors.append(timeout_error)
            if not json_errors:
                classification_method = "exit_code"

    elif exit_code is None and json_errors:
        # Process killed or disappeared without exit code, AND Phase 1
        # found JSON errors from partial output. Add a process-killed
        # error so select_root_cause can weigh it against JSON errors.
        # Without this, exit_code=None context is lost when Phase 1
        # finds errors (Phase 4 regex fallback is skipped).
        # When no JSON errors exist, Phase 4 calls classify() which
        # already handles exit_code=None correctly.
        stderr_lower = stderr.lower() if stderr else ""
        oom_indicators = ("killed", "out of memory", "oom", "cannot allocate")
        is_oom = any(indicator in stderr_lower for indicator in oom_indicators)
        wait_seconds = 60.0 if is_oom else 10.0
        message = (
            "Process killed (possible OOM — retrying with longer wait)"
            if is_oom
            else "Process exited without exit code "
            "(possible signal race — retrying)"
        )
        process_killed_error = ClassifiedError(
            category=ErrorCategory.TRANSIENT,
            message=message,
            error_code=ErrorCode.UNKNOWN,
            original_error=exception,
            exit_code=exit_code,
            exit_signal=None,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=wait_seconds,
        )
        all_errors.append(process_killed_error)

    # === PHASE 3: Exception Analysis ===
    if exception is not None:
        exc_str = str(exception).lower()
        # Try to classify based on exception message
        if "timeout" in exc_str:
            exc_error = ClassifiedError(
                category=ErrorCategory.TIMEOUT,
                message=str(exception),
                error_code=ErrorCode.EXECUTION_TIMEOUT,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=60.0,
            )
        elif "connection" in exc_str or "network" in exc_str:
            exc_error = ClassifiedError(
                category=ErrorCategory.NETWORK,
                message=str(exception),
                error_code=ErrorCode.NETWORK_CONNECTION_FAILED,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=30.0,
            )
        else:
            exc_error = ClassifiedError(
                category=ErrorCategory.TRANSIENT,
                message=str(exception),
                error_code=ErrorCode.UNKNOWN,
                original_error=exception,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=30.0,
            )
        # Only add if we don't have the same error code already
        if not any(e.error_code == exc_error.error_code for e in all_errors):
            all_errors.append(exc_error)

    # === PHASE 4: Regex Fallback (only if no structured errors) ===
    if not all_errors:
        classification_method = "regex_fallback"
        fallback_error = self.classify(
            stdout=stdout,
            stderr=stderr,
            exit_code=exit_code,
            exit_signal=exit_signal,
            exit_reason=exit_reason,
            exception=exception,
            output_format=output_format,
        )
        all_errors.append(fallback_error)

    # === PHASE 4.5: Rate Limit Override (always runs) ===
    # Rate limits in stdout/stderr must never be missed, even when Phase 1
    # found structured JSON errors that masked them. F-098: Claude CLI rate
    # limit messages appear in stdout but Phase 1 may produce generic errors
    # that prevent Phase 4 from firing.
    has_rate_limit_error = any(
        e.category == ErrorCategory.RATE_LIMIT for e in all_errors
    )
    if not has_rate_limit_error:
        combined_for_rate_limit = f"{stdout}\n{stderr}"
        if self._matches_any(combined_for_rate_limit, self.rate_limit_patterns):
            # Check for quota exhaustion (more specific) first
            if self._matches_any(
                combined_for_rate_limit, self.quota_exhaustion_patterns
            ):
                wait_seconds = (
                    self.parse_reset_time(combined_for_rate_limit)
                    or DEFAULT_QUOTA_WAIT_SECONDS
                )
                rate_limit_error = ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message="Token quota exhausted — detected in output",
                    error_code=ErrorCode.QUOTA_EXHAUSTED,
                    original_error=exception,
                    exit_code=exit_code,
                    exit_signal=None,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=wait_seconds,
                )
            else:
                error_code = (
                    ErrorCode.CAPACITY_EXCEEDED
                    if self._matches_any(
                        combined_for_rate_limit, self.capacity_patterns
                    )
                    else ErrorCode.RATE_LIMIT_API
                )
                rate_limit_error = ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message="Rate limit detected in output",
                    error_code=error_code,
                    original_error=exception,
                    exit_code=exit_code,
                    exit_signal=None,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=DEFAULT_RATE_LIMIT_WAIT_SECONDS,
                )
            all_errors.append(rate_limit_error)
            _logger.warning(
                "rate_limit_override",
                component="errors",
                error_code=rate_limit_error.error_code.value,
                message="Rate limit detected via Phase 4.5 override",
            )

    # === PHASE 5: Root Cause Selection ===
    root_cause, symptoms, confidence = select_root_cause(all_errors)

    # Log the classification result
    _logger.info(
        "execution_classified",
        method=classification_method,
        primary_code=root_cause.error_code.value,
        error_count=len(all_errors),
        confidence=confidence,
        all_codes=[e.error_code.value for e in all_errors],
    )

    return ClassificationResult(
        primary=root_cause,
        secondary=symptoms,
        raw_errors=raw_errors,
        confidence=confidence,
        classification_method=classification_method,
    )
from_config classmethod
from_config(rate_limit_patterns)

Create classifier from config patterns.

Source code in src/marianne/core/errors/classifier.py
@classmethod
def from_config(cls, rate_limit_patterns: list[str]) -> ErrorClassifier:
    """Create classifier from config patterns."""
    return cls(rate_limit_patterns=rate_limit_patterns)

Functions