Skip to content

cli_backend

cli_backend

Config-driven CLI instrument backend.

PluginCliBackend is a generic Backend implementation driven by an InstrumentProfile. Instead of writing Python for each CLI tool, you write a ~30-line YAML profile and Marianne handles command construction, output parsing, and error classification.

The music metaphor: a musician doesn't need to know how the instrument was built — they need to know how to play it. The profile is the instrument's spec sheet; this backend is the player.

Security: Uses asyncio.create_subprocess_exec (not shell). No shell injection risk. Environment variables from the profile are validated before passing to the subprocess.

Classes

PluginCliBackend

PluginCliBackend(profile, working_directory=None)

Bases: Backend

Generic CLI backend driven by an InstrumentProfile.

Builds CLI commands, runs them via asyncio subprocess, and parses output according to the profile's output and error configuration.

This is the core of the instrument plugin system — any CLI tool with a YAML profile can be used as a mzt instrument.

Initialize from an InstrumentProfile.

Parameters:

Name Type Description Default
profile InstrumentProfile

The instrument profile describing how to invoke and parse this CLI tool.

required
working_directory Path | None

Optional working directory for subprocess.

None

Raises:

Type Description
ValueError

If the profile is not a CLI instrument.

Source code in src/marianne/execution/instruments/cli_backend.py
def __init__(
    self,
    profile: InstrumentProfile,
    working_directory: Path | None = None,
) -> None:
    """Initialize from an InstrumentProfile.

    Args:
        profile: The instrument profile describing how to invoke
            and parse this CLI tool.
        working_directory: Optional working directory for subprocess.

    Raises:
        ValueError: If the profile is not a CLI instrument.
    """
    if profile.kind != "cli" or profile.cli is None:
        raise ValueError(
            f"PluginCliBackend requires kind=cli with a cli profile, "
            f"got kind={profile.kind}"
        )

    self._profile = profile
    self._cli = profile.cli
    self._working_directory: Path | None = working_directory
    self._preamble: str | None = None
    self._prompt_extensions: list[str] = []
    self._output_log_path: Path | None = None
    self._model: str | None = profile.default_model

    # PID tracking callbacks for orphan detection.
    # Set by the daemon's ProcessGroupManager (via BackendPool) when
    # running under the conductor. Standalone CLI mode leaves these
    # as None — no orphan tracking needed.
    self._on_process_spawned: Callable[[int], None] | None = None
    self._on_process_exited: Callable[[int], None] | None = None

    # Override tracking — mirrors the pattern in ClaudeCliBackend.
    # _saved_model stores the pre-override value so clear_overrides()
    # can restore it. _has_overrides guards against double-clear.
    self._saved_model: str | None = None
    self._has_overrides: bool = False

    _logger.debug(
        "plugin_cli_backend_initialized",
        instrument=profile.name,
        executable=self._cli.command.executable,
        model=self._model,
    )
Attributes
name property
name

Human-readable backend name.

Functions
apply_overrides
apply_overrides(overrides)

Apply per-sheet parameter overrides for the next execution.

Supports: - model: Override the default_model from the instrument profile.

Must be paired with clear_overrides() after execution. Callers MUST hold override_lock for the entire apply → execute → clear window when parallel execution is possible.

Source code in src/marianne/execution/instruments/cli_backend.py
def apply_overrides(self, overrides: dict[str, object]) -> None:
    """Apply per-sheet parameter overrides for the next execution.

    Supports:
    - ``model``: Override the default_model from the instrument profile.

    Must be paired with clear_overrides() after execution. Callers
    MUST hold override_lock for the entire apply → execute → clear
    window when parallel execution is possible.
    """
    if not overrides:
        return
    self._saved_model = self._model
    self._has_overrides = True
    if "model" in overrides:
        self._model = str(overrides["model"])
clear_overrides
clear_overrides()

Restore original backend parameters after per-sheet execution.

Source code in src/marianne/execution/instruments/cli_backend.py
def clear_overrides(self) -> None:
    """Restore original backend parameters after per-sheet execution."""
    if not self._has_overrides:
        return
    self._model = self._saved_model
    self._saved_model = None
    self._has_overrides = False
set_preamble
set_preamble(preamble)

Set preamble to prepend to the next prompt.

Source code in src/marianne/execution/instruments/cli_backend.py
def set_preamble(self, preamble: str | None) -> None:
    """Set preamble to prepend to the next prompt."""
    self._preamble = preamble
set_prompt_extensions
set_prompt_extensions(extensions)

Set prompt extensions to append to the next prompt.

Source code in src/marianne/execution/instruments/cli_backend.py
def set_prompt_extensions(self, extensions: list[str]) -> None:
    """Set prompt extensions to append to the next prompt."""
    self._prompt_extensions = list(extensions)
set_output_log_path
set_output_log_path(path)

Set base path for real-time output logging.

Source code in src/marianne/execution/instruments/cli_backend.py
def set_output_log_path(self, path: Path | None) -> None:
    """Set base path for real-time output logging."""
    self._output_log_path = path
execute async
execute(prompt, *, timeout_seconds=None)

Execute a prompt through the CLI instrument.

Builds the command from profile config, runs it as a subprocess, and parses the output according to the profile's output config.

Parameters:

Name Type Description Default
prompt str

The prompt to execute.

required
timeout_seconds float | None

Per-execution timeout override.

None

Returns:

Type Description
ExecutionResult

ExecutionResult with parsed output and metadata.

Source code in src/marianne/execution/instruments/cli_backend.py
async def execute(
    self,
    prompt: str,
    *,
    timeout_seconds: float | None = None,
) -> ExecutionResult:
    """Execute a prompt through the CLI instrument.

    Builds the command from profile config, runs it as a subprocess,
    and parses the output according to the profile's output config.

    Args:
        prompt: The prompt to execute.
        timeout_seconds: Per-execution timeout override.

    Returns:
        ExecutionResult with parsed output and metadata.
    """
    effective_timeout = timeout_seconds or self._profile.default_timeout_seconds
    cmd = self._build_command(prompt, timeout_seconds=effective_timeout)
    env = self._build_env()
    use_stdin = self._cli.command.prompt_via_stdin

    _logger.info(
        "plugin_cli_execute_start",
        instrument=self._profile.name,
        executable=cmd[0],
        prompt_length=len(prompt),
        timeout=effective_timeout,
        stdin_mode=use_stdin,
    )

    start_time = time.monotonic()
    stdout_data = ""
    stderr_data = ""
    exit_code: int | None = None
    exit_reason = "completed"
    proc: asyncio.subprocess.Process | None = None

    try:
        # Resolve executable to full path to avoid FileNotFoundError
        # in daemon contexts where asyncio's posix_spawn may fail to
        # resolve executables even though they exist on PATH.
        import shutil as _shutil
        _resolved = _shutil.which(cmd[0])
        if _resolved:
            cmd = [_resolved] + cmd[1:]

        # Ensure working directory exists before spawning —
        # asyncio.create_subprocess_exec raises FileNotFoundError
        # for both missing executables AND missing cwd, so we
        # must ensure cwd exists to disambiguate errors.
        if self._working_directory and not self._working_directory.exists():
            self._working_directory.mkdir(parents=True, exist_ok=True)

        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdin=asyncio.subprocess.PIPE if use_stdin else None,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(self._working_directory) if self._working_directory else None,
            env=env,
            start_new_session=self._cli.command.start_new_session,
        )

        # Track PID for orphan detection by the daemon's pgroup manager
        if self._on_process_spawned and proc.pid is not None:
            self._on_process_spawned(proc.pid)

        # When using stdin mode, write the assembled prompt to the
        # subprocess stdin and close it to signal EOF. This must
        # happen before communicate() — if the subprocess waits for
        # stdin EOF before producing output, reading stdout first
        # would deadlock.
        if use_stdin and proc.stdin is not None:
            full_prompt = self._build_prompt(prompt)
            proc.stdin.write(full_prompt.encode("utf-8"))
            await proc.stdin.drain()
            proc.stdin.close()

        try:
            stdout_bytes, stderr_bytes = await asyncio.wait_for(
                proc.communicate(),
                timeout=effective_timeout,
            )
            stdout_data = stdout_bytes.decode("utf-8", errors="replace")
            stderr_data = stderr_bytes.decode("utf-8", errors="replace")
            exit_code = proc.returncode
        except TimeoutError:
            _logger.warning(
                "plugin_cli_timeout",
                instrument=self._profile.name,
                timeout=effective_timeout,
            )
            proc.kill()
            await proc.wait()
            exit_reason = "timeout"

    except FileNotFoundError:
        # After ensuring cwd exists above, this is genuinely about
        # a missing executable, not a missing working directory.
        stderr_data = f"Executable not found: {cmd[0]}"
        exit_reason = "error"
        _logger.error(
            "plugin_cli_executable_not_found",
            instrument=self._profile.name,
            executable=cmd[0],
        )
    except OSError as e:
        stderr_data = f"Failed to start process: {e}"
        exit_reason = "error"
        _logger.error(
            "plugin_cli_execution_error",
            instrument=self._profile.name,
            error=str(e),
        )

    # Untrack PID — process and children are cleaned up
    if self._on_process_exited and proc is not None and proc.pid is not None:
        self._on_process_exited(proc.pid)

    duration = time.monotonic() - start_time

    # Parse the output
    result = self._parse_output(
        stdout_data, stderr_data, exit_code=exit_code,
    )

    # Override fields that _parse_output doesn't set
    result.duration_seconds = duration
    if exit_reason == "timeout":
        result.success = False
        result.exit_reason = "timeout"
    elif exit_reason == "error":
        result.success = False
        result.exit_reason = "error"

    _logger.info(
        "plugin_cli_execute_complete",
        instrument=self._profile.name,
        success=result.success,
        duration=f"{duration:.2f}s",
        exit_code=exit_code,
        rate_limited=result.rate_limited,
    )

    if result.rate_limited:
        _logger.info(
            "plugin_cli_rate_limit_detected",
            instrument=self._profile.name,
            exit_code=exit_code,
            stdout_head=stdout_data[:1000] if stdout_data else "",
            stderr_head=stderr_data[:1000] if stderr_data else "",
            error_type=result.error_type,
            error_message=result.error_message,
        )

    return result
health_check async
health_check()

Check if the CLI instrument is available.

Verifies the executable exists on PATH. Does not run a test prompt — that would consume API quota.

Returns:

Type Description
bool

True if the executable is found on PATH.

Source code in src/marianne/execution/instruments/cli_backend.py
async def health_check(self) -> bool:
    """Check if the CLI instrument is available.

    Verifies the executable exists on PATH. Does not run a test
    prompt — that would consume API quota.

    Returns:
        True if the executable is found on PATH.
    """
    executable = self._cli.command.executable
    found = shutil.which(executable) is not None

    if not found:
        _logger.warning(
            "plugin_cli_health_check_failed",
            instrument=self._profile.name,
            executable=executable,
            reason="not_on_path",
        )

    return found

Functions