Skip to content

Index

backends

Execution backends — instruments that musicians play.

Classes

AnthropicApiBackend

AnthropicApiBackend(model='claude-sonnet-4-5-20250929', api_key_env='ANTHROPIC_API_KEY', max_tokens=16384, temperature=0.7, timeout_seconds=300.0)

Bases: Backend

Run prompts directly via the Anthropic API.

Uses the official anthropic SDK for direct API access. Supports all Claude models available through the API.

Initialize API backend.

Parameters:

Name Type Description Default
model str

Model ID to use (e.g., claude-sonnet-4-5-20250929)

'claude-sonnet-4-5-20250929'
api_key_env str

Environment variable containing API key

'ANTHROPIC_API_KEY'
max_tokens int

Maximum tokens for response

16384
temperature float

Sampling temperature (0.0-1.0)

0.7
timeout_seconds float

Maximum time for API request

300.0
Source code in src/marianne/backends/anthropic_api.py
def __init__(
    self,
    model: str = "claude-sonnet-4-5-20250929",
    api_key_env: str = "ANTHROPIC_API_KEY",
    max_tokens: int = 16384,
    temperature: float = 0.7,
    timeout_seconds: float = 300.0,  # 5 minute default for API
):
    """Initialize API backend.

    Args:
        model: Model ID to use (e.g., claude-sonnet-4-5-20250929)
        api_key_env: Environment variable containing API key
        max_tokens: Maximum tokens for response
        temperature: Sampling temperature (0.0-1.0)
        timeout_seconds: Maximum time for API request
    """
    self.model = model
    self.api_key_env = api_key_env
    self.max_tokens = max_tokens
    self.temperature = temperature
    self.timeout_seconds = timeout_seconds
    self._working_directory: Path | None = None

    # Real-time output logging paths (set per-sheet by runner)
    # Matches ClaudeCliBackend pattern for observability parity
    self._stdout_log_path: Path | None = None
    self._stderr_log_path: Path | None = None

    # Get API key from environment
    self._api_key = os.environ.get(api_key_env)

    # Create async client (lazily initialized in execute)
    self._client: anthropic.AsyncAnthropic | None = None
    self._client_lock = asyncio.Lock()

    # Use shared ErrorClassifier for consistent error detection
    self._error_classifier = ErrorClassifier()

    # Per-sheet overrides (GH#78) — saved originals for clear_overrides()
    self._saved_model: str | None = None
    self._saved_temperature: float | None = None
    self._saved_max_tokens: int | None = None
    self._has_overrides: bool = False
Functions
from_config classmethod
from_config(config)

Create backend from configuration.

Source code in src/marianne/backends/anthropic_api.py
@classmethod
def from_config(cls, config: BackendConfig) -> "AnthropicApiBackend":
    """Create backend from configuration."""
    return cls(
        model=config.model,
        api_key_env=config.api_key_env,
        max_tokens=config.max_tokens,
        temperature=config.temperature,
        timeout_seconds=config.timeout_seconds,
    )
apply_overrides
apply_overrides(overrides)

Apply per-sheet overrides for the next execution.

Source code in src/marianne/backends/anthropic_api.py
def apply_overrides(self, overrides: dict[str, object]) -> None:
    """Apply per-sheet overrides for the next execution."""
    if not overrides:
        return
    self._saved_model = self.model
    self._saved_temperature = self.temperature
    self._saved_max_tokens = self.max_tokens
    self._has_overrides = True
    if "model" in overrides:
        self.model = str(overrides["model"])
    if "temperature" in overrides:
        self.temperature = float(overrides["temperature"])  # type: ignore[arg-type]
    if "max_tokens" in overrides:
        self.max_tokens = int(overrides["max_tokens"])  # type: ignore[call-overload]
clear_overrides
clear_overrides()

Restore original backend parameters after per-sheet execution.

Source code in src/marianne/backends/anthropic_api.py
def clear_overrides(self) -> None:
    """Restore original backend parameters after per-sheet execution."""
    if not self._has_overrides:
        return
    self.model = self._saved_model  # type: ignore[assignment]
    self.temperature = self._saved_temperature  # type: ignore[assignment]
    self.max_tokens = self._saved_max_tokens  # type: ignore[assignment]
    self._saved_model = None
    self._saved_temperature = None
    self._saved_max_tokens = None
    self._has_overrides = False
set_output_log_path
set_output_log_path(path)

Set base path for real-time output logging.

Called per-sheet by runner to enable writing API responses to log files. Provides observability parity with ClaudeCliBackend.

Parameters:

Name Type Description Default
path Path | None

Base path for log files (without extension), or None to disable.

required
Source code in src/marianne/backends/anthropic_api.py
def set_output_log_path(self, path: Path | None) -> None:
    """Set base path for real-time output logging.

    Called per-sheet by runner to enable writing API responses to log files.
    Provides observability parity with ClaudeCliBackend.

    Args:
        path: Base path for log files (without extension), or None to disable.
    """
    if path is None:
        self._stdout_log_path = None
        self._stderr_log_path = None
    else:
        self._stdout_log_path = path.with_suffix(".stdout.log")
        self._stderr_log_path = path.with_suffix(".stderr.log")
execute async
execute(prompt, *, timeout_seconds=None)

Execute a prompt via the Anthropic API.

Parameters:

Name Type Description Default
prompt str

The prompt to send to Claude

required
timeout_seconds float | None

Per-call timeout override. API backend uses its own HTTP timeout from __init__; per-call override is logged but not enforced.

None

Returns:

Type Description
ExecutionResult

ExecutionResult with API response and metadata

Source code in src/marianne/backends/anthropic_api.py
async def execute(
    self, prompt: str, *, timeout_seconds: float | None = None,
) -> ExecutionResult:
    """Execute a prompt via the Anthropic API.

    Args:
        prompt: The prompt to send to Claude
        timeout_seconds: Per-call timeout override. API backend uses its
            own HTTP timeout from ``__init__``; per-call override is
            logged but not enforced.

    Returns:
        ExecutionResult with API response and metadata
    """
    if timeout_seconds is not None:
        _logger.debug(
            "timeout_override_ignored",
            backend="anthropic_api",
            requested=timeout_seconds,
            actual=self.timeout_seconds,
        )
    start_time = time.monotonic()

    # Log API request at DEBUG level (never log prompt content or API keys)
    _logger.debug(
        "api_request",
        model=self.model,
        max_tokens=self.max_tokens,
        temperature=self.temperature,
        prompt_length=len(prompt),
        # Note: prompt preview intentionally omitted for security
    )

    try:
        client = await self._get_client()

        response = await client.messages.create(
            model=self.model,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
            messages=[
                {"role": "user", "content": prompt}
            ],
        )

        duration = time.monotonic() - start_time

        # Extract response text
        content_blocks = response.content
        response_text = ""
        for block in content_blocks:
            if hasattr(block, "text"):
                response_text += block.text

        # Calculate tokens used
        input_tokens = response.usage.input_tokens if response.usage else None
        output_tokens = response.usage.output_tokens if response.usage else None
        tokens_used = (
            input_tokens + output_tokens
            if input_tokens is not None and output_tokens is not None
            else None
        )

        # Log successful response at INFO level
        _logger.info(
            "api_response",
            duration_seconds=duration,
            model=self.model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            total_tokens=tokens_used,
            response_length=len(response_text),
        )

        # Write API response to log file for post-mortem analysis
        self._write_log_file(self._stdout_log_path, response_text)

        return ExecutionResult(
            success=True,
            exit_code=0,
            stdout=response_text,
            stderr="",
            duration_seconds=duration,
            model=self.model,
            tokens_used=tokens_used,  # Legacy field for backwards compatibility
            input_tokens=input_tokens,
            output_tokens=output_tokens,
        )

    except anthropic.RateLimitError as e:
        duration = time.monotonic() - start_time
        rate_limit_wait = self._error_classifier.extract_rate_limit_wait(str(e))
        _logger.warning(
            "rate_limit_error",
            duration_seconds=duration,
            model=self.model,
            error_message=str(e),
            parsed_wait_seconds=rate_limit_wait,
        )
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=429,
            stdout="",
            stderr=str(e),
            duration_seconds=duration,
            rate_limited=True,
            rate_limit_wait_seconds=rate_limit_wait,
            error_type="rate_limit",
            error_message=f"Rate limited: {e}",
            model=self.model,
        )

    except anthropic.AuthenticationError as e:
        duration = time.monotonic() - start_time
        # Note: Never log API key details, just that auth failed
        _logger.error(
            "authentication_error",
            duration_seconds=duration,
            model=self.model,
            api_key_env=self.api_key_env,  # Only log env var name, not value
        )
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=401,
            stdout="",
            stderr=str(e),
            duration_seconds=duration,
            error_type="authentication",
            error_message=f"Authentication failed: {e}",
            model=self.model,
        )

    except anthropic.BadRequestError as e:
        duration = time.monotonic() - start_time
        _logger.error(
            "bad_request_error",
            duration_seconds=duration,
            model=self.model,
            error_message=str(e),
        )
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=400,
            stdout="",
            stderr=str(e),
            duration_seconds=duration,
            error_type="bad_request",
            error_message=f"Bad request: {e}",
            model=self.model,
        )

    except anthropic.APITimeoutError as e:
        duration = time.monotonic() - start_time
        _logger.error(
            "api_timeout_error",
            duration_seconds=duration,
            timeout_seconds=self.timeout_seconds,
            model=self.model,
        )
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=408,
            stdout="",
            stderr=str(e),
            duration_seconds=duration,
            error_type="timeout",
            error_message=f"API timeout after {self.timeout_seconds}s: {e}",
            model=self.model,
        )

    except anthropic.APIConnectionError as e:
        duration = time.monotonic() - start_time
        _logger.error(
            "api_connection_error",
            duration_seconds=duration,
            model=self.model,
            error_message=str(e),
        )
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=503,
            stdout="",
            stderr=str(e),
            duration_seconds=duration,
            error_type="connection",
            error_message=f"Connection error: {e}",
            model=self.model,
        )

    except anthropic.APIStatusError as e:
        duration = time.monotonic() - start_time
        status_code = e.status_code if hasattr(e, "status_code") else 500
        # Check if this is a rate limit error by status code or message
        rate_limited = self._detect_rate_limit(stderr=str(e), exit_code=status_code)
        rate_limit_wait = (
            self._error_classifier.extract_rate_limit_wait(str(e))
            if rate_limited else None
        )

        if rate_limited:
            _logger.warning(
                "rate_limit_error",
                duration_seconds=duration,
                status_code=status_code,
                model=self.model,
                error_message=str(e),
                parsed_wait_seconds=rate_limit_wait,
            )
        else:
            _logger.error(
                "api_status_error",
                duration_seconds=duration,
                status_code=status_code,
                model=self.model,
                error_message=str(e),
            )

        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=status_code,
            stdout="",
            stderr=str(e),
            duration_seconds=duration,
            rate_limited=rate_limited,
            rate_limit_wait_seconds=rate_limit_wait,
            error_type="rate_limit" if rate_limited else "api_error",
            error_message=str(e),
            model=self.model,
        )

    except RuntimeError as e:
        # API key not found
        duration = time.monotonic() - start_time
        _logger.error(
            "configuration_error",
            duration_seconds=duration,
            error_message=str(e),
            api_key_env=self.api_key_env,  # Only log env var name, not value
        )
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=1,
            stdout="",
            stderr=str(e),
            duration_seconds=duration,
            error_type="configuration",
            error_message=str(e),
            model=self.model,
        )

    except Exception as e:
        duration = time.monotonic() - start_time
        _logger.exception(
            "unexpected_error",
            duration_seconds=duration,
            model=self.model,
            error_message=str(e),
        )
        self._write_log_file(self._stderr_log_path, str(e))
        raise
health_check async
health_check()

Check if the API is available and authenticated.

Uses a minimal prompt to verify connectivity.

Source code in src/marianne/backends/anthropic_api.py
async def health_check(self) -> bool:
    """Check if the API is available and authenticated.

    Uses a minimal prompt to verify connectivity.
    """
    if not self._api_key:
        _logger.warning(
            "health_check_failed",
            error_type="MissingAPIKey",
            error="No API key configured — set ANTHROPIC_API_KEY",
        )
        return False

    try:
        client = await self._get_client()
        # Minimal prompt to verify API access
        response = await client.messages.create(
            model=self.model,
            max_tokens=10,
            messages=[{"role": "user", "content": "Reply with only: ok"}],
        )
        # Check we got a response
        return len(response.content) > 0
    except (anthropic.APIError, TimeoutError, OSError) as e:
        _logger.warning("health_check_failed", error_type=type(e).__name__, error=str(e))
        return False
availability_check async
availability_check()

Check if the API client can be created without making an API call.

Unlike health_check(), this does NOT send a request or consume API quota. Used after quota exhaustion waits.

Source code in src/marianne/backends/anthropic_api.py
async def availability_check(self) -> bool:
    """Check if the API client can be created without making an API call.

    Unlike health_check(), this does NOT send a request or consume
    API quota. Used after quota exhaustion waits.
    """
    if not self._api_key:
        return False
    try:
        await self._get_client()
        return True
    except Exception:
        return False
close async
close()

Close the async client connection (idempotent).

Source code in src/marianne/backends/anthropic_api.py
async def close(self) -> None:
    """Close the async client connection (idempotent)."""
    if self._client is not None:
        client = self._client
        self._client = None
        try:
            await client.close()
        except (OSError, RuntimeError, anthropic.APIError):
            _logger.debug("client_close_error", exc_info=True)

Backend

Bases: ABC

Abstract base class for Claude execution backends.

Backends handle the actual execution of prompts through Claude, whether via CLI subprocess or direct API calls.

Attributes
name abstractmethod property
name

Human-readable backend name.

working_directory property writable
working_directory

Working directory for backend execution.

Subprocess-based backends (e.g. ClaudeCliBackend) use this as the cwd for child processes. API-based backends store it but don't use it directly.

Returns None if no working directory is set, meaning the process CWD is used.

Thread/Concurrency Safety: This property is NOT safe to mutate while executions are in-flight. The worktree isolation layer sets it before any sheet execution starts, and restores it in the finally block after all sheets complete. During parallel execution, all concurrent sheets share the same working directory (the worktree path), so the value is read-only while sheets are running. Never change this property from a concurrent task mid-execution.

override_lock property
override_lock

Lock for serializing apply_overrides → execute → clear_overrides cycles.

Parallel sheet execution must acquire this lock around the entire override lifecycle to prevent concurrent sheets from stomping on each other's saved originals.

Functions
execute abstractmethod async
execute(prompt, *, timeout_seconds=None)

Execute a prompt and return the result.

Parameters:

Name Type Description Default
prompt str

The prompt to send to Claude

required
timeout_seconds float | None

Per-call timeout override. If provided, overrides the backend's default timeout for this single execution.

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output and metadata

Source code in src/marianne/backends/base.py
@abstractmethod
async def execute(
    self, prompt: str, *, timeout_seconds: float | None = None,
) -> ExecutionResult:
    """Execute a prompt and return the result.

    Args:
        prompt: The prompt to send to Claude
        timeout_seconds: Per-call timeout override. If provided, overrides
            the backend's default timeout for this single execution.

    Returns:
        ExecutionResult with output and metadata
    """
    ...
health_check abstractmethod async
health_check()

Check if the backend is available and working.

Used to verify connectivity before starting a job, and to check if rate limits have lifted.

Returns:

Type Description
bool

True if backend is ready, False otherwise

Source code in src/marianne/backends/base.py
@abstractmethod
async def health_check(self) -> bool:
    """Check if the backend is available and working.

    Used to verify connectivity before starting a job,
    and to check if rate limits have lifted.

    Returns:
        True if backend is ready, False otherwise
    """
    ...
availability_check async
availability_check()

Lightweight check: is the backend reachable without consuming API quota?

Unlike health_check(), this must NOT send prompts or consume tokens. Used after quota exhaustion waits where sending a prompt would fail.

Default returns True (assume available). Backends override for real checks.

Source code in src/marianne/backends/base.py
async def availability_check(self) -> bool:
    """Lightweight check: is the backend reachable without consuming API quota?

    Unlike health_check(), this must NOT send prompts or consume tokens.
    Used after quota exhaustion waits where sending a prompt would fail.

    Default returns True (assume available). Backends override for
    real checks.
    """
    return True
close async
close()

Close the backend and release resources.

Override in subclasses that hold persistent connections or resources. Default implementation is a no-op for backends without cleanup needs.

This method should be idempotent - calling it multiple times should be safe.

Source code in src/marianne/backends/base.py
async def close(self) -> None:  # noqa: B027
    """Close the backend and release resources.

    Override in subclasses that hold persistent connections or resources.
    Default implementation is a no-op for backends without cleanup needs.

    This method should be idempotent - calling it multiple times should be safe.
    """
__aenter__ async
__aenter__()

Async context manager entry.

Source code in src/marianne/backends/base.py
async def __aenter__(self) -> Backend:
    """Async context manager entry."""
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit — ensures close() is called.

Source code in src/marianne/backends/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Async context manager exit — ensures close() is called."""
    await self.close()
apply_overrides
apply_overrides(overrides)

Apply per-sheet parameter overrides for the next execution.

Called per-sheet by the runner when sheet_overrides is configured. Subclasses store the overrides and apply them in execute(). clear_overrides() is called after execution to restore defaults.

Callers MUST hold override_lock for the entire apply → execute → clear window when parallel execution is possible.

Default implementation is a no-op for backends without override support.

Parameters:

Name Type Description Default
overrides dict[str, object]

Dict of parameter name → value. Only non-None values from SheetBackendOverride are included.

required
Source code in src/marianne/backends/base.py
def apply_overrides(self, overrides: dict[str, object]) -> None:  # noqa: B027
    """Apply per-sheet parameter overrides for the next execution.

    Called per-sheet by the runner when ``sheet_overrides`` is configured.
    Subclasses store the overrides and apply them in ``execute()``.
    ``clear_overrides()`` is called after execution to restore defaults.

    Callers MUST hold ``override_lock`` for the entire
    apply → execute → clear window when parallel execution is possible.

    Default implementation is a no-op for backends without override support.

    Args:
        overrides: Dict of parameter name → value. Only non-None values
            from ``SheetBackendOverride`` are included.
    """
clear_overrides
clear_overrides()

Clear per-sheet parameter overrides, restoring defaults.

Called after each sheet execution to ensure the next sheet uses global config. Default implementation is a no-op.

Source code in src/marianne/backends/base.py
def clear_overrides(self) -> None:  # noqa: B027
    """Clear per-sheet parameter overrides, restoring defaults.

    Called after each sheet execution to ensure the next sheet uses
    global config. Default implementation is a no-op.
    """
set_preamble
set_preamble(_preamble)

Set the dynamic preamble for the next execution.

Called per-sheet by the runner with a context-aware preamble built by build_preamble(). The preamble includes sheet identity, position, workspace, and retry status.

Override in subclasses that support prompt injection. Default implementation is a no-op for backends without this capability.

Parameters:

Name Type Description Default
_preamble str | None

Preamble text to prepend, or None to clear.

required
Source code in src/marianne/backends/base.py
def set_preamble(self, _preamble: str | None) -> None:  # noqa: B027
    """Set the dynamic preamble for the next execution.

    Called per-sheet by the runner with a context-aware preamble built by
    ``build_preamble()``. The preamble includes sheet identity, position,
    workspace, and retry status.

    Override in subclasses that support prompt injection.
    Default implementation is a no-op for backends without this capability.

    Args:
        _preamble: Preamble text to prepend, or None to clear.
    """
set_prompt_extensions
set_prompt_extensions(_extensions)

Set prompt extensions for the next execution.

Extensions are additional directive blocks injected after the preamble. Called per-sheet by the runner to apply score-level and sheet-level prompt extensions (GH#76).

Override in subclasses that support prompt injection. Default implementation is a no-op for backends without this capability.

Parameters:

Name Type Description Default
_extensions list[str]

List of extension text blocks.

required
Source code in src/marianne/backends/base.py
def set_prompt_extensions(self, _extensions: list[str]) -> None:  # noqa: B027
    """Set prompt extensions for the next execution.

    Extensions are additional directive blocks injected after the
    preamble. Called per-sheet by the runner to apply score-level and
    sheet-level prompt extensions (GH#76).

    Override in subclasses that support prompt injection.
    Default implementation is a no-op for backends without this capability.

    Args:
        _extensions: List of extension text blocks.
    """
set_output_log_path
set_output_log_path(_path)

Set base path for real-time output logging.

Called per-sheet by runner to enable streaming output to log files. This provides visibility into backend output during long executions.

Uses industry-standard separate files for stdout and stderr: - {path}.stdout.log - standard output - {path}.stderr.log - standard error

Override in subclasses that support real-time output streaming. Default implementation is a no-op for backends without this capability.

Parameters:

Name Type Description Default
_path Path | None

Base path for log files (without extension), or None to disable.

required
Source code in src/marianne/backends/base.py
def set_output_log_path(self, _path: Path | None) -> None:  # noqa: B027
    """Set base path for real-time output logging.

    Called per-sheet by runner to enable streaming output to log files.
    This provides visibility into backend output during long executions.

    Uses industry-standard separate files for stdout and stderr:
    - {path}.stdout.log - standard output
    - {path}.stderr.log - standard error

    Override in subclasses that support real-time output streaming.
    Default implementation is a no-op for backends without this capability.

    Args:
        _path: Base path for log files (without extension), or None to disable.
    """

ExecutionResult dataclass

ExecutionResult(success, stdout, stderr, duration_seconds, exit_code=None, exit_signal=None, exit_reason='completed', started_at=utc_now(), rate_limited=False, rate_limit_wait_seconds=None, error_type=None, error_message=None, model=None, tokens_used=None, input_tokens=None, output_tokens=None)

Result of executing a prompt through a backend.

Captures all relevant output and metadata for validation and debugging.

Note: Fields are ordered with required fields first, then optional fields with defaults, as required by Python dataclasses.

Attributes
success instance-attribute
success

Whether the execution completed without error (exit code 0).

stdout instance-attribute
stdout

Standard output from the command.

stderr instance-attribute
stderr

Standard error from the command.

duration_seconds instance-attribute
duration_seconds

Execution duration in seconds.

exit_code class-attribute instance-attribute
exit_code = None

Process exit code or HTTP status code. None if killed by signal.

exit_signal class-attribute instance-attribute
exit_signal = None

Signal number if process was killed by a signal (e.g., 9=SIGKILL, 15=SIGTERM).

On Unix, when a process is killed by a signal, returncode = -signal_number. This field extracts that signal for clearer diagnostics.

exit_reason class-attribute instance-attribute
exit_reason = 'completed'

Why the execution ended: - completed: Normal exit (exit_code set) - timeout: Process was killed due to timeout - killed: Process was killed by external signal - error: Internal error prevented execution

started_at class-attribute instance-attribute
started_at = field(default_factory=utc_now)

When execution started.

rate_limited class-attribute instance-attribute
rate_limited = False

Whether rate limiting was detected.

rate_limit_wait_seconds class-attribute instance-attribute
rate_limit_wait_seconds = None

Parsed wait duration from rate limit error, in seconds.

When set, this is the actual duration extracted from the API's error message (e.g. 'retry after 300 seconds'). When None, callers should use their default wait time.

error_type class-attribute instance-attribute
error_type = None

Classified error type if failed.

error_message class-attribute instance-attribute
error_message = None

Human-readable error message.

model class-attribute instance-attribute
model = None

Model used for execution.

tokens_used class-attribute instance-attribute
tokens_used = None

Total tokens consumed (API backend only).

.. deprecated:: Use input_tokens + output_tokens instead. This field will be removed in a future version. Equivalent: tokens_used == (input_tokens or 0) + (output_tokens or 0).

input_tokens class-attribute instance-attribute
input_tokens = None

Input tokens consumed (prompt tokens). None if not available from backend.

output_tokens class-attribute instance-attribute
output_tokens = None

Output tokens consumed (completion tokens). None if not available from backend.

output property
output

Combined stdout and stderr.

Functions
__post_init__
__post_init__()

Validate invariant: success=True requires exit_code 0 or None.

Source code in src/marianne/backends/base.py
def __post_init__(self) -> None:
    """Validate invariant: success=True requires exit_code 0 or None."""
    if self.success and self.exit_code is not None and self.exit_code != 0:
        raise ValueError(
            f"Inconsistent ExecutionResult: success=True but exit_code={self.exit_code}. "
            "success=True should only be set when exit_code is 0 or None."
        )

ClaudeCliBackend

ClaudeCliBackend(skip_permissions=True, disable_mcp=True, output_format='text', cli_model=None, allowed_tools=None, system_prompt_file=None, working_directory=None, timeout_seconds=1800.0, progress_callback=None, progress_interval_seconds=5.0, cli_extra_args=None)

Bases: Backend

Run prompts via the Claude CLI.

Uses asyncio.create_subprocess_exec to invoke claude -p <prompt>. This is shell-injection safe as arguments are passed as a list.

Initialize CLI backend.

Parameters:

Name Type Description Default
skip_permissions bool

Pass --dangerously-skip-permissions

True
disable_mcp bool

Disable MCP servers for faster execution (--strict-mcp-config)

True
output_format str

Output format (json, text, stream-json)

'text'
cli_model str | None

Model to use (--model flag), None uses default

None
allowed_tools list[str] | None

Restrict to specific tools (--allowedTools)

None
system_prompt_file Path | None

Custom system prompt file (--system-prompt)

None
working_directory Path | None

Working directory for running commands

None
timeout_seconds float

Maximum time allowed per prompt

1800.0
progress_callback ProgressCallback | None

Optional callback for progress updates during execution. Called with dict containing: bytes_received, lines_received, elapsed_seconds, phase.

None
progress_interval_seconds float

How often to call progress callback (default 5s).

5.0
cli_extra_args list[str] | None

Extra arguments to pass to claude CLI (escape hatch).

None
Source code in src/marianne/backends/claude_cli.py
def __init__(
    self,
    skip_permissions: bool = True,
    disable_mcp: bool = True,
    output_format: str = "text",
    cli_model: str | None = None,
    allowed_tools: list[str] | None = None,
    system_prompt_file: Path | None = None,
    working_directory: Path | None = None,
    timeout_seconds: float = 1800.0,  # 30 minute default
    progress_callback: ProgressCallback | None = None,
    progress_interval_seconds: float = 5.0,
    cli_extra_args: list[str] | None = None,
):
    """Initialize CLI backend.

    Args:
        skip_permissions: Pass --dangerously-skip-permissions
        disable_mcp: Disable MCP servers for faster execution (--strict-mcp-config)
        output_format: Output format (json, text, stream-json)
        cli_model: Model to use (--model flag), None uses default
        allowed_tools: Restrict to specific tools (--allowedTools)
        system_prompt_file: Custom system prompt file (--system-prompt)
        working_directory: Working directory for running commands
        timeout_seconds: Maximum time allowed per prompt
        progress_callback: Optional callback for progress updates during execution.
            Called with dict containing: bytes_received, lines_received,
            elapsed_seconds, phase.
        progress_interval_seconds: How often to call progress callback (default 5s).
        cli_extra_args: Extra arguments to pass to claude CLI (escape hatch).
    """
    self.skip_permissions = skip_permissions
    self.disable_mcp = disable_mcp
    self.output_format = output_format
    self.cli_model = cli_model
    self.allowed_tools = allowed_tools
    self.system_prompt_file = system_prompt_file
    self._working_directory = working_directory
    self.timeout_seconds = timeout_seconds
    self.progress_callback = progress_callback
    self.progress_interval_seconds = progress_interval_seconds
    self.cli_extra_args = cli_extra_args or []

    # PID tracking callbacks for orphan detection.
    # Set by the daemon's ProcessGroupManager when running under the conductor.
    # Standalone CLI mode leaves these as None — no orphan tracking needed.
    self._on_process_spawned: Callable[[int], None] | None = None
    self._on_process_exited: Callable[[int], None] | None = None

    # Real-time output logging paths (set per-sheet by runner)
    # Industry standard: separate files for stdout and stderr
    self._stdout_log_path: Path | None = None
    self._stderr_log_path: Path | None = None

    # Partial output accumulator — populated during execution so that
    # _handle_execution_timeout() can capture partial output on timeout
    # instead of returning empty strings.
    self._partial_stdout_chunks: list[bytes] = []
    self._partial_stderr_chunks: list[bytes] = []

    # Verify claude CLI is available
    self._claude_path = shutil.which("claude")

    # Track log write failures for filesystem flakiness visibility
    self.log_write_failures: int = 0

    # Use shared ErrorClassifier for rate limit detection
    # This ensures consistent classification with the runner
    self._error_classifier = ErrorClassifier()

    # Dynamic preamble — context-aware identity/position/retry info built
    # per-sheet by the runner via set_preamble().
    self._preamble: str | None = None

    # Prompt extensions (GH#76) — additional directives injected after the
    # preamble and before the user prompt. Set per-sheet by runner
    # via set_prompt_extensions().
    self._prompt_extensions: list[str] = []

    # Per-sheet overrides (GH#78) — saved originals for clear_overrides()
    self._saved_cli_model: str | None = None
    self._has_overrides: bool = False
Functions
from_config classmethod
from_config(config)

Create backend from configuration.

Source code in src/marianne/backends/claude_cli.py
@classmethod
def from_config(cls, config: BackendConfig) -> "ClaudeCliBackend":
    """Create backend from configuration."""
    return cls(
        skip_permissions=config.skip_permissions,
        disable_mcp=config.disable_mcp,
        output_format=config.output_format,
        cli_model=config.cli_model,
        allowed_tools=config.allowed_tools,
        system_prompt_file=config.system_prompt_file,
        working_directory=config.working_directory,
        timeout_seconds=config.timeout_seconds,
        cli_extra_args=config.cli_extra_args,
    )
apply_overrides
apply_overrides(overrides)

Apply per-sheet overrides for the next execution.

Source code in src/marianne/backends/claude_cli.py
def apply_overrides(self, overrides: dict[str, object]) -> None:
    """Apply per-sheet overrides for the next execution."""
    if not overrides:
        return
    self._saved_cli_model = self.cli_model
    self._has_overrides = True
    if "cli_model" in overrides:
        self.cli_model = str(overrides["cli_model"])
clear_overrides
clear_overrides()

Restore original backend parameters after per-sheet execution.

Source code in src/marianne/backends/claude_cli.py
def clear_overrides(self) -> None:
    """Restore original backend parameters after per-sheet execution."""
    if not self._has_overrides:
        return
    self.cli_model = self._saved_cli_model
    self._saved_cli_model = None
    self._has_overrides = False
set_output_log_path
set_output_log_path(path)

Set base path for real-time output logging.

Called per-sheet by runner to enable streaming output to log files. This provides visibility into Claude's output during long executions.

Uses industry-standard separate files for stdout and stderr: - {path}.stdout.log - standard output - {path}.stderr.log - standard error

This enables clean tail -f monitoring without stream interleaving.

Parameters:

Name Type Description Default
path Path | None

Base path for log files (without extension), or None to disable. Example: workspace/logs/sheet-01 creates sheet-01.stdout.log

required
Source code in src/marianne/backends/claude_cli.py
def set_output_log_path(self, path: Path | None) -> None:
    """Set base path for real-time output logging.

    Called per-sheet by runner to enable streaming output to log files.
    This provides visibility into Claude's output during long executions.

    Uses industry-standard separate files for stdout and stderr:
    - {path}.stdout.log - standard output
    - {path}.stderr.log - standard error

    This enables clean `tail -f` monitoring without stream interleaving.

    Args:
        path: Base path for log files (without extension), or None to disable.
              Example: workspace/logs/sheet-01 creates sheet-01.stdout.log
    """
    if path is None:
        self._stdout_log_path = None
        self._stderr_log_path = None
    else:
        self._stdout_log_path = path.with_suffix(".stdout.log")
        self._stderr_log_path = path.with_suffix(".stderr.log")
set_preamble
set_preamble(preamble)

Set the dynamic preamble for the next execution.

Parameters:

Name Type Description Default
preamble str | None

Preamble text to prepend, or None to clear.

required
Source code in src/marianne/backends/claude_cli.py
def set_preamble(self, preamble: str | None) -> None:
    """Set the dynamic preamble for the next execution.

    Args:
        preamble: Preamble text to prepend, or None to clear.
    """
    self._preamble = preamble
set_prompt_extensions
set_prompt_extensions(extensions)

Set prompt extensions for the next execution.

Extensions are additional directive blocks injected after the preamble. Called per-sheet by the runner to apply score-level and sheet-level extensions.

Parameters:

Name Type Description Default
extensions list[str]

List of extension text blocks. Empty strings are ignored.

required
Source code in src/marianne/backends/claude_cli.py
def set_prompt_extensions(self, extensions: list[str]) -> None:
    """Set prompt extensions for the next execution.

    Extensions are additional directive blocks injected after the
    preamble. Called per-sheet by the runner to apply score-level and
    sheet-level extensions.

    Args:
        extensions: List of extension text blocks. Empty strings are ignored.
    """
    self._prompt_extensions = [e for e in extensions if e.strip()]
execute async
execute(prompt, *, timeout_seconds=None)

Execute a prompt (Backend protocol implementation).

Source code in src/marianne/backends/claude_cli.py
async def execute(
    self, prompt: str, *, timeout_seconds: float | None = None,
) -> ExecutionResult:
    """Execute a prompt (Backend protocol implementation)."""
    return await self._execute_impl(prompt, timeout_seconds=timeout_seconds)
health_check async
health_check()

Check if claude CLI is available and responsive.

Source code in src/marianne/backends/claude_cli.py
async def health_check(self) -> bool:
    """Check if claude CLI is available and responsive."""
    if not self._claude_path:
        return False

    try:
        # Simple test prompt
        result = await self._execute_impl("Say 'ready' and nothing else.")
        return result.success and "ready" in result.stdout.lower()
    except (TimeoutError, OSError, RuntimeError) as e:
        _logger.warning("health_check_failed", error_type=type(e).__name__, error=str(e))
        return False
availability_check async
availability_check()

Check if the claude CLI binary exists and is executable.

Unlike health_check(), this does NOT send a prompt or consume API quota. Used after quota exhaustion waits.

Source code in src/marianne/backends/claude_cli.py
async def availability_check(self) -> bool:
    """Check if the claude CLI binary exists and is executable.

    Unlike health_check(), this does NOT send a prompt or consume
    API quota. Used after quota exhaustion waits.
    """
    if not self._claude_path:
        return False
    return os.path.isfile(self._claude_path) and os.access(
        self._claude_path, os.X_OK,
    )

OllamaBackend

OllamaBackend(base_url='http://localhost:11434', model='llama3.1:8b', timeout=300.0, num_ctx=32768, keep_alive='5m', max_tool_iterations=10, mcp_proxy=None)

Bases: HttpxClientMixin, Backend

Backend for Ollama model execution with tool translation.

Implements the Backend protocol for local Ollama models. Supports: - MCP tool schema translation to Ollama function format - Multi-turn agentic loop for tool calling - Health checks via /api/tags endpoint

Example usage

backend = OllamaBackend( base_url="http://localhost:11434", model="llama3.1:8b", ) result = await backend.execute("Write a hello world function")

Initialize Ollama backend.

Parameters:

Name Type Description Default
base_url str

Ollama server URL (default: http://localhost:11434)

'http://localhost:11434'
model str

Model to use (must support tool calling)

'llama3.1:8b'
timeout float

Request timeout in seconds

300.0
num_ctx int

Context window size (recommend >= 32768 for Claude Code tools)

32768
keep_alive str

Keep model loaded duration (e.g., "5m", "1h")

'5m'
max_tool_iterations int

Maximum tool call iterations per execution

10
mcp_proxy MCPProxyService | None

Optional MCPProxyService for tool execution

None
Source code in src/marianne/backends/ollama.py
def __init__(
    self,
    base_url: str = "http://localhost:11434",
    model: str = "llama3.1:8b",
    timeout: float = 300.0,
    num_ctx: int = 32768,
    keep_alive: str = "5m",
    max_tool_iterations: int = 10,
    mcp_proxy: MCPProxyService | None = None,
) -> None:
    """Initialize Ollama backend.

    Args:
        base_url: Ollama server URL (default: http://localhost:11434)
        model: Model to use (must support tool calling)
        timeout: Request timeout in seconds
        num_ctx: Context window size (recommend >= 32768 for Claude Code tools)
        keep_alive: Keep model loaded duration (e.g., "5m", "1h")
        max_tool_iterations: Maximum tool call iterations per execution
        mcp_proxy: Optional MCPProxyService for tool execution
    """
    self.base_url = base_url.rstrip("/")
    self.model = model
    self.timeout = timeout
    self.num_ctx = num_ctx
    self.keep_alive = keep_alive
    self.max_tool_iterations = max_tool_iterations
    self.mcp_proxy = mcp_proxy
    self._working_directory: Path | None = None
    self._preamble: str | None = None
    self._prompt_extensions: list[str] = []

    # HTTP client lifecycle via shared mixin
    self._init_httpx_mixin(self.base_url, self.timeout, connect_timeout=10.0)
Attributes
name property
name

Human-readable backend name.

Functions
from_config classmethod
from_config(config)

Create backend from configuration.

Parameters:

Name Type Description Default
config BackendConfig

Backend configuration with ollama settings

required

Returns:

Type Description
OllamaBackend

Configured OllamaBackend instance

Source code in src/marianne/backends/ollama.py
@classmethod
def from_config(cls, config: BackendConfig) -> OllamaBackend:
    """Create backend from configuration.

    Args:
        config: Backend configuration with ollama settings

    Returns:
        Configured OllamaBackend instance
    """
    ollama_cfg = config.ollama
    return cls(
        base_url=ollama_cfg.base_url,
        model=ollama_cfg.model,
        timeout=ollama_cfg.timeout_seconds,
        num_ctx=ollama_cfg.num_ctx,
        keep_alive=ollama_cfg.keep_alive,
        max_tool_iterations=ollama_cfg.max_tool_iterations,
    )
set_preamble
set_preamble(preamble)

Set the dynamic preamble for the next execution.

Source code in src/marianne/backends/ollama.py
def set_preamble(self, preamble: str | None) -> None:
    """Set the dynamic preamble for the next execution."""
    self._preamble = preamble
set_prompt_extensions
set_prompt_extensions(extensions)

Set prompt extensions for the next execution.

Source code in src/marianne/backends/ollama.py
def set_prompt_extensions(self, extensions: list[str]) -> None:
    """Set prompt extensions for the next execution."""
    self._prompt_extensions = [e for e in extensions if e.strip()]
execute async
execute(prompt, *, timeout_seconds=None)

Execute a prompt and return the result.

Runs the agentic loop if tools are available via MCPProxyService, otherwise performs a simple completion.

Parameters:

Name Type Description Default
prompt str

The prompt to send to Ollama

required
timeout_seconds float | None

Per-call timeout override. Ollama uses the httpx client-level timeout from __init__; per-call override is logged but not enforced.

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output and metadata

Source code in src/marianne/backends/ollama.py
async def execute(
    self,
    prompt: str,
    *,
    timeout_seconds: float | None = None,
) -> ExecutionResult:
    """Execute a prompt and return the result.

    Runs the agentic loop if tools are available via MCPProxyService,
    otherwise performs a simple completion.

    Args:
        prompt: The prompt to send to Ollama
        timeout_seconds: Per-call timeout override. Ollama uses the httpx
            client-level timeout from ``__init__``; per-call override is
            logged but not enforced.

    Returns:
        ExecutionResult with output and metadata
    """
    if timeout_seconds is not None:
        _logger.debug(
            "timeout_override_ignored",
            backend="ollama",
            requested=timeout_seconds,
            actual=self.timeout,
        )
    start_time = time.monotonic()
    started_at = utc_now()

    _logger.debug(
        "ollama_execute_start",
        model=self.model,
        prompt_length=len(prompt),
        has_mcp_proxy=self.mcp_proxy is not None,
    )

    try:
        # Build initial messages, injecting preamble/extensions
        if self._preamble or self._prompt_extensions:
            parts: list[str] = []
            if self._preamble:
                parts.append(self._preamble)
            parts.append(prompt)
            if self._prompt_extensions:
                parts.append("\n".join(self._prompt_extensions))
            messages = [OllamaMessage(role="user", content="\n".join(parts))]
        else:
            messages = [OllamaMessage(role="user", content=prompt)]

        # Get tools if MCP proxy is available
        tools: list[OllamaToolDef] = []
        mcp_degraded: str | None = None
        if self.mcp_proxy:
            try:
                mcp_tools = await self.mcp_proxy.list_tools()
                tools = self._translate_tools_to_ollama(mcp_tools)
                _logger.debug("tools_loaded", tool_count=len(tools))
            except (OSError, ConnectionError, TimeoutError, httpx.HTTPError) as e:
                mcp_degraded = (
                    f"[MCP DEGRADED] Tool loading failed ({type(e).__name__}: {e}); "
                    "running in non-agentic mode. "
                    "Check MCP server connectivity and configuration."
                )
                _logger.warning(
                    "mcp_tool_load_failed.falling_back_to_non_agentic",
                    error=str(e),
                    error_type=type(e).__name__,
                    hint=mcp_degraded,
                )

        # Run agentic loop if tools available, else simple completion
        if tools:
            result = await self._agentic_loop(messages, tools)
        else:
            result = await self._simple_completion(messages)

        duration = time.monotonic() - start_time
        result.duration_seconds = duration
        result.started_at = started_at
        result.model = self.model

        # Surface MCP degradation in result so callers can detect it
        if mcp_degraded:
            result.stderr = (
                f"{result.stderr}\n{mcp_degraded}" if result.stderr
                else mcp_degraded
            )
            if not result.error_message:
                result.error_message = mcp_degraded

        _logger.info(
            "ollama_execute_complete",
            success=result.success,
            duration_seconds=duration,
            input_tokens=result.input_tokens,
            output_tokens=result.output_tokens,
        )

        return result

    except httpx.ConnectError as e:
        duration = time.monotonic() - start_time
        _logger.error("ollama_connection_error", error=str(e))
        return ExecutionResult(
            success=False,
            stdout="",
            stderr=f"Connection error: {e}",
            duration_seconds=duration,
            started_at=started_at,
            error_type="connection",
            error_message=str(e),
            model=self.model,
        )

    except httpx.TimeoutException as e:
        duration = time.monotonic() - start_time
        _logger.error("ollama_timeout", error=str(e))
        return ExecutionResult(
            success=False,
            stdout="",
            stderr=f"Timeout: {e}",
            duration_seconds=duration,
            started_at=started_at,
            exit_reason="timeout",
            error_type="timeout",
            error_message=str(e),
            model=self.model,
        )

    except Exception as e:
        duration = time.monotonic() - start_time
        _logger.exception("ollama_execute_error", error=str(e))
        raise
health_check async
health_check()

Check if Ollama is available and model is loaded.

Uses /api/tags to verify Ollama is running and configured model exists.

Returns:

Type Description
bool

True if healthy, False otherwise

Source code in src/marianne/backends/ollama.py
async def health_check(self) -> bool:
    """Check if Ollama is available and model is loaded.

    Uses /api/tags to verify Ollama is running and configured model exists.

    Returns:
        True if healthy, False otherwise
    """
    try:
        client = await self._get_client()
        response = await client.get("/api/tags", timeout=10.0)

        if response.status_code != 200:
            _logger.warning(
                "ollama_health_check_failed",
                status_code=response.status_code,
            )
            return False

        data = response.json()
        models = data.get("models", [])

        # Check if our model is available
        model_base = self.model.split(":")[0]
        available = any(
            entry.get("name", "").startswith(model_base)
            for entry in models
        )

        if not available:
            _logger.warning(
                "ollama_model_not_found",
                model=self.model,
                available_models=[entry.get("name") for entry in models],
            )

        return available

    except (httpx.HTTPError, OSError, ValueError) as e:
        _logger.warning("ollama_health_check_error", error=str(e), exc_info=True)
        return False
close async
close()

Close HTTP client and release resources.

Source code in src/marianne/backends/ollama.py
async def close(self) -> None:
    """Close HTTP client and release resources."""
    await self._close_httpx_client()

OpenRouterBackend

OpenRouterBackend(model=_DEFAULT_MODEL, api_key_env='OPENROUTER_API_KEY', max_tokens=16384, temperature=0.7, timeout_seconds=300.0, base_url=_OPENROUTER_BASE_URL)

Bases: HttpxClientMixin, Backend

Run prompts via the OpenRouter API (OpenAI-compatible).

Provides direct HTTP access to 300+ models including free-tier options. Uses HttpxClientMixin for lazy, connection-pooled httpx client lifecycle.

Example usage::

backend = OpenRouterBackend(
    model="minimax/minimax-m1-80k",
    api_key_env="OPENROUTER_API_KEY",
)
result = await backend.execute("Explain quicksort")

Initialize OpenRouter backend.

Parameters:

Name Type Description Default
model str

Model ID (e.g., 'minimax/minimax-m1-80k', 'google/gemma-4').

_DEFAULT_MODEL
api_key_env str

Environment variable containing API key.

'OPENROUTER_API_KEY'
max_tokens int

Maximum tokens for response.

16384
temperature float

Sampling temperature (0.0-2.0).

0.7
timeout_seconds float

Maximum time for API request.

300.0
base_url str

OpenRouter API base URL (without endpoint path).

_OPENROUTER_BASE_URL
Source code in src/marianne/backends/openrouter.py
def __init__(
    self,
    model: str = _DEFAULT_MODEL,
    api_key_env: str = "OPENROUTER_API_KEY",
    max_tokens: int = 16384,
    temperature: float = 0.7,
    timeout_seconds: float = 300.0,
    base_url: str = _OPENROUTER_BASE_URL,
) -> None:
    """Initialize OpenRouter backend.

    Args:
        model: Model ID (e.g., 'minimax/minimax-m1-80k', 'google/gemma-4').
        api_key_env: Environment variable containing API key.
        max_tokens: Maximum tokens for response.
        temperature: Sampling temperature (0.0-2.0).
        timeout_seconds: Maximum time for API request.
        base_url: OpenRouter API base URL (without endpoint path).
    """
    if not model:
        raise ValueError("model must be a non-empty string")
    if not api_key_env:
        raise ValueError("api_key_env must be a non-empty string")
    if max_tokens < 1:
        raise ValueError(f"max_tokens must be >= 1, got {max_tokens}")
    if timeout_seconds <= 0:
        raise ValueError(f"timeout_seconds must be > 0, got {timeout_seconds}")

    self.model = model
    self.api_key_env = api_key_env
    self.max_tokens = max_tokens
    self.temperature = temperature
    self.timeout_seconds = timeout_seconds
    self._working_directory: Path | None = None

    # Read API key from environment (may be None — checked at execute time)
    self._api_key: str | None = os.environ.get(api_key_env)

    # Error classifier for rate limit wait extraction
    self._error_classifier = ErrorClassifier()

    # Per-sheet overrides — saved originals for clear_overrides()
    self._saved_model: str | None = None
    self._saved_temperature: float | None = None
    self._saved_max_tokens: int | None = None
    self._has_overrides: bool = False

    # Real-time output logging (set per-sheet by runner)
    self._stdout_log_path: Path | None = None
    self._stderr_log_path: Path | None = None

    # Preamble and extensions for prompt injection
    self._preamble: str | None = None
    self._prompt_extensions: list[str] = []

    # Build auth headers
    headers: dict[str, str] = {
        "Content-Type": "application/json",
        "HTTP-Referer": "https://github.com/Mzzkc/marianne-ai-compose",
        "X-Title": "Marianne AI Compose",
    }
    if self._api_key:
        headers["Authorization"] = f"Bearer {self._api_key}"

    # HTTP client lifecycle via shared mixin
    self._init_httpx_mixin(
        base_url.rstrip("/"),
        timeout_seconds,
        connect_timeout=10.0,
        headers=headers,
    )
Attributes
name property
name

Human-readable backend name including model.

Functions
from_config classmethod
from_config(config)

Create backend from a BackendConfig.

Parameters:

Name Type Description Default
config object

A BackendConfig instance (typed as object to avoid circular import — BackendConfig lives in core.config).

required

Returns:

Type Description
OpenRouterBackend

Configured OpenRouterBackend instance.

Source code in src/marianne/backends/openrouter.py
@classmethod
def from_config(cls, config: object) -> OpenRouterBackend:
    """Create backend from a BackendConfig.

    Args:
        config: A BackendConfig instance (typed as object to avoid
            circular import — BackendConfig lives in core.config).

    Returns:
        Configured OpenRouterBackend instance.
    """
    model = getattr(config, "model", _DEFAULT_MODEL) or _DEFAULT_MODEL
    timeout = getattr(config, "timeout_seconds", 300.0)
    api_key_env = getattr(config, "api_key_env", "OPENROUTER_API_KEY")
    max_tokens = getattr(config, "max_tokens", 16384)
    temperature = getattr(config, "temperature", 0.7)
    return cls(
        model=model,
        api_key_env=api_key_env,
        max_tokens=max_tokens,
        temperature=temperature,
        timeout_seconds=timeout,
    )
apply_overrides
apply_overrides(overrides)

Apply per-sheet overrides for the next execution.

Source code in src/marianne/backends/openrouter.py
def apply_overrides(self, overrides: dict[str, object]) -> None:
    """Apply per-sheet overrides for the next execution."""
    if not overrides:
        return
    self._saved_model = self.model
    self._saved_temperature = self.temperature
    self._saved_max_tokens = self.max_tokens
    self._has_overrides = True
    if "model" in overrides:
        self.model = str(overrides["model"])
    if "temperature" in overrides:
        self.temperature = float(overrides["temperature"])  # type: ignore[arg-type]
    if "max_tokens" in overrides:
        self.max_tokens = int(overrides["max_tokens"])  # type: ignore[call-overload]
clear_overrides
clear_overrides()

Restore original backend parameters after per-sheet execution.

Source code in src/marianne/backends/openrouter.py
def clear_overrides(self) -> None:
    """Restore original backend parameters after per-sheet execution."""
    if not self._has_overrides:
        return
    self.model = self._saved_model  # type: ignore[assignment]
    self.temperature = self._saved_temperature  # type: ignore[assignment]
    self.max_tokens = self._saved_max_tokens  # type: ignore[assignment]
    self._saved_model = None
    self._saved_temperature = None
    self._saved_max_tokens = None
    self._has_overrides = False
set_preamble
set_preamble(preamble)

Set the dynamic preamble for the next execution.

Source code in src/marianne/backends/openrouter.py
def set_preamble(self, preamble: str | None) -> None:
    """Set the dynamic preamble for the next execution."""
    self._preamble = preamble
set_prompt_extensions
set_prompt_extensions(extensions)

Set prompt extensions for the next execution.

Source code in src/marianne/backends/openrouter.py
def set_prompt_extensions(self, extensions: list[str]) -> None:
    """Set prompt extensions for the next execution."""
    self._prompt_extensions = [e for e in extensions if e.strip()]
set_output_log_path
set_output_log_path(path)

Set base path for real-time output logging.

Parameters:

Name Type Description Default
path Path | None

Base path for log files (without extension), or None to disable.

required
Source code in src/marianne/backends/openrouter.py
def set_output_log_path(self, path: Path | None) -> None:
    """Set base path for real-time output logging.

    Args:
        path: Base path for log files (without extension), or None to disable.
    """
    if path is None:
        self._stdout_log_path = None
        self._stderr_log_path = None
    else:
        self._stdout_log_path = path.with_suffix(".stdout.log")
        self._stderr_log_path = path.with_suffix(".stderr.log")
execute async
execute(prompt, *, timeout_seconds=None)

Execute a prompt via the OpenRouter API.

Sends a chat completion request to OpenRouter's OpenAI-compatible endpoint and returns the result.

Parameters:

Name Type Description Default
prompt str

The prompt to send.

required
timeout_seconds float | None

Per-call timeout override. Logged but not enforced (httpx client timeout from init is used).

None

Returns:

Type Description
ExecutionResult

ExecutionResult with API response and metadata.

Source code in src/marianne/backends/openrouter.py
async def execute(
    self, prompt: str, *, timeout_seconds: float | None = None,
) -> ExecutionResult:
    """Execute a prompt via the OpenRouter API.

    Sends a chat completion request to OpenRouter's OpenAI-compatible
    endpoint and returns the result.

    Args:
        prompt: The prompt to send.
        timeout_seconds: Per-call timeout override. Logged but not
            enforced (httpx client timeout from __init__ is used).

    Returns:
        ExecutionResult with API response and metadata.
    """
    if timeout_seconds is not None:
        _logger.debug(
            "timeout_override_ignored",
            backend="openrouter",
            requested=timeout_seconds,
            actual=self.timeout_seconds,
        )

    start_time = time.monotonic()
    started_at = utc_now()

    _logger.debug(
        "openrouter_execute_start",
        model=self.model,
        prompt_length=len(prompt),
        max_tokens=self.max_tokens,
    )

    # Check API key before making request
    if not self._api_key:
        duration = time.monotonic() - start_time
        msg = (
            f"API key not found in environment variable: {self.api_key_env}. "
            "Set it with: export OPENROUTER_API_KEY=your-key"
        )
        _logger.error(
            "configuration_error",
            api_key_env=self.api_key_env,
        )
        self._write_log_file(self._stderr_log_path, msg)
        return ExecutionResult(
            success=False,
            exit_code=1,
            stdout="",
            stderr=msg,
            duration_seconds=duration,
            started_at=started_at,
            error_type="configuration",
            error_message=msg,
            model=self.model,
        )

    assembled_prompt = self._build_prompt(prompt)

    payload: dict[str, Any] = {
        "model": self.model,
        "messages": [{"role": "user", "content": assembled_prompt}],
        "max_tokens": self.max_tokens,
        "temperature": self.temperature,
    }

    try:
        client = await self._get_client()

        response = await client.post(
            "/chat/completions",
            json=payload,
        )

        duration = time.monotonic() - start_time

        # Handle rate limiting via HTTP status
        if response.status_code == 429:
            return self._handle_rate_limit(response, duration, started_at)

        # Handle other HTTP errors
        if response.status_code >= 400:
            return self._handle_http_error(response, duration, started_at)

        # Parse successful response
        data = response.json()
        return self._parse_success_response(data, duration, started_at)

    except httpx.ConnectError as e:
        duration = time.monotonic() - start_time
        _logger.error("openrouter_connection_error", error=str(e))
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=503,
            stdout="",
            stderr=f"Connection error: {e}",
            duration_seconds=duration,
            started_at=started_at,
            error_type="connection",
            error_message=str(e),
            model=self.model,
        )

    except httpx.TimeoutException as e:
        duration = time.monotonic() - start_time
        _logger.error(
            "openrouter_timeout",
            timeout_seconds=self.timeout_seconds,
            error=str(e),
        )
        self._write_log_file(self._stderr_log_path, str(e))
        return ExecutionResult(
            success=False,
            exit_code=408,
            stdout="",
            stderr=f"Timeout after {self.timeout_seconds}s: {e}",
            duration_seconds=duration,
            started_at=started_at,
            exit_reason="timeout",
            error_type="timeout",
            error_message=f"API timeout after {self.timeout_seconds}s: {e}",
            model=self.model,
        )

    except Exception as e:
        duration = time.monotonic() - start_time
        _logger.exception(
            "openrouter_execute_error",
            model=self.model,
            error=str(e),
        )
        self._write_log_file(self._stderr_log_path, str(e))
        raise
health_check async
health_check()

Check if the OpenRouter API is reachable and authenticated.

Uses the /models endpoint (lightweight, no token consumption) to verify connectivity and authentication.

Returns:

Type Description
bool

True if healthy, False otherwise.

Source code in src/marianne/backends/openrouter.py
async def health_check(self) -> bool:
    """Check if the OpenRouter API is reachable and authenticated.

    Uses the /models endpoint (lightweight, no token consumption)
    to verify connectivity and authentication.

    Returns:
        True if healthy, False otherwise.
    """
    if not self._api_key:
        _logger.warning(
            "health_check_failed",
            error_type="MissingAPIKey",
            error=f"No API key configured — set {self.api_key_env}",
        )
        return False

    try:
        client = await self._get_client()
        response = await client.get("/models", timeout=10.0)
        if response.status_code == 200:
            return True
        _logger.warning(
            "openrouter_health_check_failed",
            status_code=response.status_code,
        )
        return False
    except (httpx.HTTPError, OSError, ValueError) as e:
        _logger.warning(
            "openrouter_health_check_error",
            error=str(e),
            error_type=type(e).__name__,
        )
        return False
availability_check async
availability_check()

Check if the backend can be initialized without consuming API quota.

Verifies that the API key is present and the httpx client can be created. Does NOT make any HTTP requests.

Source code in src/marianne/backends/openrouter.py
async def availability_check(self) -> bool:
    """Check if the backend can be initialized without consuming API quota.

    Verifies that the API key is present and the httpx client can be
    created. Does NOT make any HTTP requests.
    """
    if not self._api_key:
        return False
    try:
        await self._get_client()
        return True
    except Exception:
        return False
close async
close()

Close HTTP client and release resources.

Source code in src/marianne/backends/openrouter.py
async def close(self) -> None:
    """Close HTTP client and release resources."""
    await self._close_httpx_client()

RecursiveLightBackend

RecursiveLightBackend(rl_endpoint='http://localhost:8080', user_id=None, timeout=30.0)

Bases: HttpxClientMixin, Backend

Execute prompts via Recursive Light HTTP API.

Uses httpx.AsyncClient to communicate with the Recursive Light server for TDF-aligned processing with confidence scoring, domain activations, and boundary state tracking.

The RL server provides dual-LLM processing: - LLM #1 (unconscious): Confidence assessment and domain activation - LLM #2 (conscious): Response generation with accumulated wisdom

Attributes:

Name Type Description
rl_endpoint

Base URL for the Recursive Light API.

user_id

Unique identifier for this Marianne instance.

timeout

Request timeout in seconds.

Initialize Recursive Light backend.

Parameters:

Name Type Description Default
rl_endpoint str

Base URL for the Recursive Light API server. Defaults to localhost:8080 for local development.

'http://localhost:8080'
user_id str | None

Unique identifier for this Marianne instance. Generates a UUID if not provided.

None
timeout float

Request timeout in seconds. Defaults to 30.0.

30.0
Source code in src/marianne/backends/recursive_light.py
def __init__(
    self,
    rl_endpoint: str = "http://localhost:8080",
    user_id: str | None = None,
    timeout: float = 30.0,
) -> None:
    """Initialize Recursive Light backend.

    Args:
        rl_endpoint: Base URL for the Recursive Light API server.
            Defaults to localhost:8080 for local development.
        user_id: Unique identifier for this Marianne instance.
            Generates a UUID if not provided.
        timeout: Request timeout in seconds. Defaults to 30.0.
    """
    self.rl_endpoint = rl_endpoint.rstrip("/")
    self.user_id = user_id or str(uuid.uuid4())
    self.timeout = timeout
    self._working_directory: Path | None = None

    # HTTP client lifecycle via shared mixin
    self._init_httpx_mixin(
        self.rl_endpoint,
        self.timeout,
        headers={
            "Content-Type": "application/json",
            "X-Marianne-User-ID": self.user_id,
        },
    )
Attributes
name property
name

Human-readable backend name.

Functions
from_config classmethod
from_config(config)

Create backend from configuration.

Source code in src/marianne/backends/recursive_light.py
@classmethod
def from_config(cls, config: "BackendConfig") -> "RecursiveLightBackend":
    """Create backend from configuration."""
    rl_config = config.recursive_light
    return cls(
        rl_endpoint=rl_config.endpoint,
        user_id=rl_config.user_id,
        timeout=rl_config.timeout,
    )
execute async
execute(prompt, *, timeout_seconds=None)

Execute a prompt through Recursive Light API.

Sends the prompt to RL's /api/process endpoint and parses the response for text output plus RL-specific metadata (confidence, domain activations, boundary states, quality).

Parameters:

Name Type Description Default
prompt str

The prompt to send to Recursive Light.

required
timeout_seconds float | None

Per-call timeout override. RL backend uses its own HTTP timeout from __init__; per-call override is logged but not enforced.

None

Returns:

Type Description
ExecutionResult

ExecutionResult with output text and RL metadata populated.

ExecutionResult

On connection errors, returns a failed result with graceful

ExecutionResult

error handling (not raising exceptions).

Source code in src/marianne/backends/recursive_light.py
async def execute(
    self,
    prompt: str,
    *,
    timeout_seconds: float | None = None,
) -> ExecutionResult:
    """Execute a prompt through Recursive Light API.

    Sends the prompt to RL's /api/process endpoint and parses
    the response for text output plus RL-specific metadata
    (confidence, domain activations, boundary states, quality).

    Args:
        prompt: The prompt to send to Recursive Light.
        timeout_seconds: Per-call timeout override. RL backend uses its
            own HTTP timeout from ``__init__``; per-call override is
            logged but not enforced.

    Returns:
        ExecutionResult with output text and RL metadata populated.
        On connection errors, returns a failed result with graceful
        error handling (not raising exceptions).
    """
    if timeout_seconds is not None:
        _logger.debug(
            "timeout_override_ignored",
            backend="recursive_light",
            requested=timeout_seconds,
            actual=self.timeout,
        )
    start_time = time.monotonic()
    started_at = utc_now()

    # Log HTTP request details at DEBUG level
    _logger.debug(
        "http_request",
        endpoint=f"{self.rl_endpoint}/api/process",
        user_id=self.user_id,
        timeout=self.timeout,
        prompt_length=len(prompt),
    )

    try:
        client = await self._get_client()

        # Build request payload
        payload = {
            "user_id": self.user_id,
            "message": prompt,
        }

        # POST to RL process endpoint
        response = await client.post("/api/process", json=payload)
        duration = time.monotonic() - start_time

        if response.status_code != 200:
            _logger.error(
                "api_error_response",
                duration_seconds=duration,
                status_code=response.status_code,
                response_text=response.text[:500] if response.text else None,
            )
            return ExecutionResult(
                success=False,
                exit_code=response.status_code,
                stdout="",
                stderr=f"RL API error: {response.status_code} - {response.text}",
                duration_seconds=duration,
                started_at=started_at,
                error_type="api_error",
                error_message=f"HTTP {response.status_code}: {response.text[:200]}",
            )

        # Parse response JSON
        data = response.json()

        # Parse response into ExecutionResult
        result = self._parse_rl_response(data, duration, started_at)

        _logger.info(
            "http_response",
            duration_seconds=duration,
            status_code=response.status_code,
            response_length=len(result.stdout) if result.stdout else 0,
        )

        return result

    except httpx.ConnectError as e:
        duration = time.monotonic() - start_time
        _logger.warning(
            "connection_error",
            duration_seconds=duration,
            endpoint=self.rl_endpoint,
            error_message=str(e),
        )
        return ExecutionResult(
            success=False,
            exit_code=1,
            stdout="",
            stderr=f"Connection error: {e}",
            duration_seconds=duration,
            started_at=started_at,
            error_type="connection_error",
            error_message=f"Failed to connect to RL at {self.rl_endpoint}: {e}",
        )

    except httpx.TimeoutException as e:
        duration = time.monotonic() - start_time
        _logger.warning(
            "request_timeout",
            duration_seconds=duration,
            timeout_seconds=self.timeout,
            endpoint=self.rl_endpoint,
        )
        return ExecutionResult(
            success=False,
            exit_code=124,  # Timeout exit code
            stdout="",
            stderr=f"Request timed out: {e}",
            duration_seconds=duration,
            started_at=started_at,
            error_type="timeout",
            error_message=f"Timed out after {self.timeout}s",
        )

    except Exception as e:
        duration = time.monotonic() - start_time
        _logger.exception(
            "unexpected_error",
            duration_seconds=duration,
            endpoint=self.rl_endpoint,
            error_message=str(e),
        )
        raise
health_check async
health_check()

Check if Recursive Light server is available and responding.

Attempts to reach the RL health endpoint (or root) to verify connectivity before starting a job.

Returns:

Type Description
bool

True if RL server is healthy and responding, False otherwise.

Source code in src/marianne/backends/recursive_light.py
async def health_check(self) -> bool:
    """Check if Recursive Light server is available and responding.

    Attempts to reach the RL health endpoint (or root) to verify
    connectivity before starting a job.

    Returns:
        True if RL server is healthy and responding, False otherwise.
    """
    try:
        client = await self._get_client()

        # Try health endpoint first, then fall back to root
        for endpoint in ("/health", "/api/health", "/"):
            try:
                response = await client.get(endpoint)
                if response.status_code == 200:
                    return True
            except httpx.HTTPStatusError:
                continue

        return False

    except (httpx.ConnectError, httpx.TimeoutException) as e:
        _logger.debug("health_check_unreachable", error=f"{type(e).__name__}: {e}")
        return False
    except (httpx.HTTPError, OSError, RuntimeError) as e:
        _logger.warning("health_check_failed", error=f"{type(e).__name__}: {e}")
        return False
close async
close()

Close the HTTP client connection.

Should be called when done using the backend to clean up resources.

Source code in src/marianne/backends/recursive_light.py
async def close(self) -> None:
    """Close the HTTP client connection.

    Should be called when done using the backend to clean up resources.
    """
    await self._close_httpx_client()