Skip to content

parsers

parsers

JSON parsing utilities for error extraction.

Contains functions for parsing CLI JSON output to extract structured errors and identifying root causes from multiple errors.

Classes

Functions

try_parse_json_errors

try_parse_json_errors(output, stderr='')

Extract errors[] array from JSON output.

Claude CLI returns structured JSON with an errors[] array:

{
  "result": "...",
  "errors": [
    {"type": "system", "message": "Rate limit exceeded"},
    {"type": "user", "message": "spawn claude ENOENT"}
  ],
  "cost_usd": 0.05
}

This function parses that structure, handling: - Non-JSON preamble (CLI startup messages) - Multiple JSON objects (takes first valid one with errors[]) - JSON in stderr (some error modes write there) - Truncated JSON (tries to recover)

Parameters:

Name Type Description Default
output str

Raw stdout from Claude CLI execution.

required
stderr str

Optional stderr output (some errors appear here).

''

Returns:

Type Description
list[ParsedCliError]

List of ParsedCliError objects, or empty list if parsing fails.

Source code in src/marianne/core/errors/parsers.py
def try_parse_json_errors(output: str, stderr: str = "") -> list[ParsedCliError]:
    """Extract errors[] array from JSON output.

    Claude CLI returns structured JSON with an `errors[]` array:
    ```json
    {
      "result": "...",
      "errors": [
        {"type": "system", "message": "Rate limit exceeded"},
        {"type": "user", "message": "spawn claude ENOENT"}
      ],
      "cost_usd": 0.05
    }
    ```

    This function parses that structure, handling:
    - Non-JSON preamble (CLI startup messages)
    - Multiple JSON objects (takes first valid one with errors[])
    - JSON in stderr (some error modes write there)
    - Truncated JSON (tries to recover)

    Args:
        output: Raw stdout from Claude CLI execution.
        stderr: Optional stderr output (some errors appear here).

    Returns:
        List of ParsedCliError objects, or empty list if parsing fails.
    """
    errors: list[ParsedCliError] = []

    # Try both stdout and stderr - errors can appear in either
    for text in [output, stderr]:
        if not text:
            continue

        found_errors = _extract_json_errors_from_text(text)
        if found_errors:
            errors.extend(found_errors)

    # Deduplicate by message (same error might appear in both streams)
    seen_messages: set[str] = set()
    unique_errors: list[ParsedCliError] = []
    for error in errors:
        if error.message not in seen_messages:
            seen_messages.add(error.message)
            unique_errors.append(error)

    return unique_errors

classify_single_json_error

classify_single_json_error(parsed_error, exit_code=None, exit_reason=None)

Classify a single error from the JSON errors[] array.

This function uses type-based classification first, then falls back to message pattern matching. The error type from CLI ("system", "user", "tool") guides initial classification.

Parameters:

Name Type Description Default
parsed_error ParsedCliError

A ParsedCliError extracted from CLI JSON output.

required
exit_code int | None

Optional exit code for context.

None
exit_reason ExitReason | None

Optional exit reason for context.

None

Returns:

Type Description
ClassifiedError

ClassifiedError with appropriate category and error code.

Source code in src/marianne/core/errors/parsers.py
def classify_single_json_error(
    parsed_error: ParsedCliError,
    exit_code: int | None = None,
    exit_reason: ExitReason | None = None,
) -> ClassifiedError:
    """Classify a single error from the JSON errors[] array.

    This function uses type-based classification first, then falls back to
    message pattern matching. The error type from CLI ("system", "user", "tool")
    guides initial classification.

    Args:
        parsed_error: A ParsedCliError extracted from CLI JSON output.
        exit_code: Optional exit code for context.
        exit_reason: Optional exit reason for context.

    Returns:
        ClassifiedError with appropriate category and error code.
    """
    message = parsed_error.message.lower()
    error_type = parsed_error.error_type.lower()

    # === Type-based classification ===

    if error_type == "system":
        # System errors are usually API/service level
        # Check rate limit patterns
        rate_limit_indicators = [
            "rate limit", "rate_limit", "quota", "too many requests",
            "429", "hit your limit", "limit exceeded", "daily limit",
        ]
        if any(indicator in message for indicator in rate_limit_indicators):
            # Differentiate capacity vs rate limit
            capacity_indicators = ["capacity", "overloaded", "try again later", "unavailable"]
            if any(indicator in message for indicator in capacity_indicators):
                return ClassifiedError(
                    category=ErrorCategory.RATE_LIMIT,
                    message=parsed_error.message,
                    error_code=ErrorCode.CAPACITY_EXCEEDED,
                    exit_code=exit_code,
                    exit_reason=exit_reason,
                    retriable=True,
                    suggested_wait_seconds=300.0,
                )
            return ClassifiedError(
                category=ErrorCategory.RATE_LIMIT,
                message=parsed_error.message,
                error_code=ErrorCode.RATE_LIMIT_API,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,
                suggested_wait_seconds=3600.0,
            )

        # Check auth patterns
        auth_indicators = ["unauthorized", "authentication", "invalid api key", "401", "403"]
        if any(indicator in message for indicator in auth_indicators):
            return ClassifiedError(
                category=ErrorCategory.AUTH,
                message=parsed_error.message,
                error_code=ErrorCode.BACKEND_AUTH,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

    elif error_type == "user":
        # User errors are usually environment/config issues
        # ENOENT is critical - often the root cause
        # Common patterns: "ENOENT", "spawn claude ENOENT", "command not found"
        if "enoent" in message or "command not found" in message:
            return ClassifiedError(
                category=ErrorCategory.CONFIGURATION,
                message=parsed_error.message,
                error_code=ErrorCode.BACKEND_NOT_FOUND,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=True,  # Might recover after reinstall
                suggested_wait_seconds=30.0,
                error_info=ErrorInfo(
                    reason="BINARY_NOT_FOUND",
                    domain="marianne.backend.claude_cli",
                    metadata={"original_message": parsed_error.message},
                ),
            )

        if "permission denied" in message or "access denied" in message:
            return ClassifiedError(
                category=ErrorCategory.AUTH,
                message=parsed_error.message,
                error_code=ErrorCode.BACKEND_AUTH,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

        if "no such file" in message or "not found" in message:
            return ClassifiedError(
                category=ErrorCategory.CONFIGURATION,
                message=parsed_error.message,
                error_code=ErrorCode.CONFIG_PATH_NOT_FOUND,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

    elif error_type == "tool":
        # Tool errors need message analysis
        if "mcp" in message or "server" in message:
            return ClassifiedError(
                category=ErrorCategory.CONFIGURATION,
                message=parsed_error.message,
                error_code=ErrorCode.CONFIG_MCP_ERROR,
                exit_code=exit_code,
                exit_reason=exit_reason,
                retriable=False,
            )

        # Tool execution failures are often validation issues
        return ClassifiedError(
            category=ErrorCategory.VALIDATION,
            message=parsed_error.message,
            error_code=ErrorCode.VALIDATION_COMMAND_FAILED,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=10.0,
        )

    # === Message pattern fallback ===

    # Network errors
    network_indicators = [
        "connection refused", "connection reset", "econnrefused",
        "etimedout", "network unreachable",
    ]
    if any(indicator in message for indicator in network_indicators):
        return ClassifiedError(
            category=ErrorCategory.NETWORK,
            message=parsed_error.message,
            error_code=ErrorCode.NETWORK_CONNECTION_FAILED,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=30.0,
        )

    # DNS errors
    dns_indicators = ["dns", "getaddrinfo", "enotfound", "resolve"]
    if any(indicator in message for indicator in dns_indicators):
        return ClassifiedError(
            category=ErrorCategory.NETWORK,
            message=parsed_error.message,
            error_code=ErrorCode.NETWORK_DNS_ERROR,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=30.0,
        )

    # SSL/TLS errors
    ssl_indicators = ["ssl", "tls", "certificate", "handshake"]
    if any(indicator in message for indicator in ssl_indicators):
        return ClassifiedError(
            category=ErrorCategory.NETWORK,
            message=parsed_error.message,
            error_code=ErrorCode.NETWORK_SSL_ERROR,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=30.0,
        )

    # Timeout patterns
    timeout_indicators = ["timeout", "timed out"]
    if any(indicator in message for indicator in timeout_indicators):
        return ClassifiedError(
            category=ErrorCategory.TIMEOUT,
            message=parsed_error.message,
            error_code=ErrorCode.EXECUTION_TIMEOUT,
            exit_code=exit_code,
            exit_reason=exit_reason,
            retriable=True,
            suggested_wait_seconds=60.0,
        )

    # Default: unknown error with the original message
    return ClassifiedError(
        category=ErrorCategory.TRANSIENT,
        message=parsed_error.message,
        error_code=ErrorCode.UNKNOWN,
        exit_code=exit_code,
        exit_reason=exit_reason,
        retriable=True,
        suggested_wait_seconds=30.0,
    )

select_root_cause

select_root_cause(errors)

Select the most likely root cause from multiple errors.

Uses priority-based scoring where lower score = more fundamental cause. Applies context modifiers for specific error combinations that commonly mask root causes.

Known masking patterns: - ENOENT masks everything (missing binary causes cascading failures) - Auth errors mask rate limits (can't hit rate limit if auth fails) - Network errors mask service errors (can't reach service to get errors) - Config errors mask execution errors (bad config causes execution failure) - Timeout masks completion (timed out = never got to complete)

Parameters:

Name Type Description Default
errors list[ClassifiedError]

List of classified errors to analyze.

required

Returns:

Type Description
ClassifiedError

Tuple of (root_cause, symptoms, confidence).

list[ClassifiedError]
  • root_cause: The most fundamental error that likely caused others
float
  • symptoms: Other errors that are likely consequences
tuple[ClassifiedError, list[ClassifiedError], float]
  • confidence: 0.0-1.0 confidence in root cause identification (higher when there's a clear priority gap)
Source code in src/marianne/core/errors/parsers.py
def select_root_cause(
    errors: list[ClassifiedError],
) -> tuple[ClassifiedError, list[ClassifiedError], float]:
    """Select the most likely root cause from multiple errors.

    Uses priority-based scoring where lower score = more fundamental cause.
    Applies context modifiers for specific error combinations that commonly
    mask root causes.

    Known masking patterns:
    - ENOENT masks everything (missing binary causes cascading failures)
    - Auth errors mask rate limits (can't hit rate limit if auth fails)
    - Network errors mask service errors (can't reach service to get errors)
    - Config errors mask execution errors (bad config causes execution failure)
    - Timeout masks completion (timed out = never got to complete)

    Args:
        errors: List of classified errors to analyze.

    Returns:
        Tuple of (root_cause, symptoms, confidence).
        - root_cause: The most fundamental error that likely caused others
        - symptoms: Other errors that are likely consequences
        - confidence: 0.0-1.0 confidence in root cause identification
          (higher when there's a clear priority gap)
    """
    if not errors:
        # Return an unknown error as fallback
        unknown = ClassifiedError(
            category=ErrorCategory.FATAL,
            message="No errors provided",
            error_code=ErrorCode.UNKNOWN,
            retriable=False,
        )
        return (unknown, [], 0.0)

    if len(errors) == 1:
        return (errors[0], [], 1.0)

    # Calculate modified priorities using index-based lookup
    # (ClassifiedError is a mutable dataclass and not hashable)
    error_codes_present = {e.error_code for e in errors}
    priorities: list[int] = []

    for error in errors:
        priority = ROOT_CAUSE_PRIORITY.get(error.error_code, 99)

        # === Priority Modifiers for Common Masking Patterns ===

        # ENOENT (missing binary) masks everything - it's almost always root cause
        if error.error_code == ErrorCode.BACKEND_NOT_FOUND and any(
            e.error_code != ErrorCode.BACKEND_NOT_FOUND for e in errors
        ):
            priority -= 10  # Strong boost - ENOENT is very fundamental

        # Config path not found is similar - can't run without config
        if error.error_code == ErrorCode.CONFIG_PATH_NOT_FOUND:
            priority -= 5

        # Auth errors mask rate limits (can't be rate limited if auth fails)
        if error.error_code == ErrorCode.BACKEND_AUTH and (
            ErrorCode.RATE_LIMIT_API in error_codes_present
            or ErrorCode.RATE_LIMIT_CLI in error_codes_present
        ):
            priority -= 5

        # Network errors mask service errors
        if error.error_code in (
            ErrorCode.NETWORK_CONNECTION_FAILED,
            ErrorCode.NETWORK_DNS_ERROR,
            ErrorCode.NETWORK_SSL_ERROR,
        ) and (
            ErrorCode.BACKEND_TIMEOUT in error_codes_present
            or ErrorCode.RATE_LIMIT_API in error_codes_present
        ):
            priority -= 3

        # MCP config errors mask tool execution errors
        if (
            error.error_code == ErrorCode.CONFIG_MCP_ERROR
            and ErrorCode.VALIDATION_COMMAND_FAILED in error_codes_present
        ):
            priority -= 3

        # CLI mode errors (streaming vs JSON) are config issues that mask execution
        if error.error_code == ErrorCode.CONFIG_CLI_MODE_ERROR and any(
            e.error_code.category == "execution" for e in errors
        ):
            priority -= 3

        # Timeout is a symptom when paired with rate limits (waited too long)
        if (
            error.error_code == ErrorCode.EXECUTION_TIMEOUT
            and ErrorCode.RATE_LIMIT_API in error_codes_present
        ):
            priority += 5  # Demote timeout - rate limit is root cause

        priorities.append(priority)

    # Find minimum priority (root cause)
    min_idx = min(range(len(errors)), key=lambda i: priorities[i])
    root_cause = errors[min_idx]
    root_priority = priorities[min_idx]

    # Build symptoms list (all errors except root cause)
    symptoms = [errors[i] for i in range(len(errors)) if i != min_idx]
    symptom_priorities = [priorities[i] for i in range(len(errors)) if i != min_idx]

    # Calculate confidence based on priority gap
    # Higher gap = clearer root cause = more confidence
    if symptom_priorities:
        next_priority = min(symptom_priorities)
        gap = next_priority - root_priority

        # Base confidence starts at 0.5 for multiple errors
        # Each priority tier gap adds 5% confidence
        confidence = min(0.5 + (gap * 0.05), 1.0)

        # Boost confidence for known high-signal root causes
        if root_cause.error_code in (
            ErrorCode.BACKEND_NOT_FOUND,  # ENOENT is almost always correct
            ErrorCode.BACKEND_AUTH,  # Auth failures are clear
            ErrorCode.CONFIG_PATH_NOT_FOUND,  # Missing config is clear
        ):
            confidence = min(confidence + 0.15, 1.0)

        # Lower confidence when all errors are in same tier (ambiguous)
        if gap == 0:
            confidence = 0.4  # Significant ambiguity
    else:
        confidence = 1.0

    return (root_cause, symptoms, confidence)