Skip to content

helpers

helpers

Shared utilities for Marianne CLI commands.

This module contains helpers used across multiple CLI command modules: - Logger setup and configuration - Backend creation helpers - Config loading utilities - Console/Rich setup - State backend discovery - Common utility functions

Classes

ErrorMessages

Constants for CLI error messages.

Centralizing error messages ensures consistent user-facing output and simplifies potential localization.

OutputLevel

Bases: str, Enum

Output verbosity level.

CliLoggingConfig dataclass

CliLoggingConfig(level='WARNING', file=None, format='console', configured=False)

Centralized CLI logging configuration state.

Replaces 4 module-level variables with a single typed dataclass. All getter/setter functions below delegate to this instance.

Functions

set_output_level

set_output_level(level)

Set the output level.

Parameters:

Name Type Description Default
level OutputLevel

The new output level.

required
Source code in src/marianne/cli/helpers.py
def set_output_level(level: OutputLevel) -> None:
    """Set the output level.

    Args:
        level: The new output level.
    """
    global _output_level
    _output_level = level

is_verbose

is_verbose()

Check if verbose output is enabled.

Source code in src/marianne/cli/helpers.py
def is_verbose() -> bool:
    """Check if verbose output is enabled."""
    return _output_level == OutputLevel.VERBOSE

is_quiet

is_quiet()

Check if quiet output is enabled.

Source code in src/marianne/cli/helpers.py
def is_quiet() -> bool:
    """Check if quiet output is enabled."""
    return _output_level == OutputLevel.QUIET

set_log_level

set_log_level(level)

Set the log level.

Parameters:

Name Type Description Default
level str

Log level string (DEBUG, INFO, WARNING, ERROR).

required
Source code in src/marianne/cli/helpers.py
def set_log_level(level: str) -> None:
    """Set the log level.

    Args:
        level: Log level string (DEBUG, INFO, WARNING, ERROR).
    """
    _log_config.level = level  # type: ignore[assignment]

set_log_file

set_log_file(path)

Set the log file path.

When a log file is specified, logs are written to the file in console format. Rich CLI output (progress bars, status tables) is separate from structured logging and still displays on console.

Parameters:

Name Type Description Default
path Path | None

Path for log file output, or None to disable file logging.

required
Source code in src/marianne/cli/helpers.py
def set_log_file(path: Path | None) -> None:
    """Set the log file path.

    When a log file is specified, logs are written to the file in
    console format. Rich CLI output (progress bars, status tables)
    is separate from structured logging and still displays on console.

    Args:
        path: Path for log file output, or None to disable file logging.
    """
    _log_config.file = path
    if path:
        _log_config.format = "console"

set_log_format

set_log_format(fmt)

Set the log format.

Parameters:

Name Type Description Default
fmt str

Log format string (json, console, both).

required
Source code in src/marianne/cli/helpers.py
def set_log_format(fmt: str) -> None:
    """Set the log format.

    Args:
        fmt: Log format string (json, console, both).
    """
    _log_config.format = fmt  # type: ignore[assignment]

configure_global_logging

configure_global_logging(console)

Configure logging based on global CLI options.

This is called after all callbacks have processed their options. Only configures once per session.

Parameters:

Name Type Description Default
console Console

Rich console for error output.

required

Raises:

Type Description
Exit

If logging configuration fails.

Source code in src/marianne/cli/helpers.py
def configure_global_logging(console: Console) -> None:
    """Configure logging based on global CLI options.

    This is called after all callbacks have processed their options.
    Only configures once per session.

    Args:
        console: Rich console for error output.

    Raises:
        typer.Exit: If logging configuration fails.
    """
    if _log_config.configured:
        return

    try:
        configure_logging(
            level=_log_config.level,
            format=_log_config.format,
            file_path=_log_config.file,
        )
        _log_config.configured = True
        # Note: Intentionally not logging here to avoid polluting --json output
        # If debugging is needed, use --log-file to redirect logs
    except ValueError as e:
        # Handle configuration errors (e.g., format="both" without file_path)
        from .output import output_error

        output_error(
            f"Logging configuration error: {e}",
            hints=[
                "Check --log-format and --log-file options.",
                "format='both' requires --log-file to be set.",
            ],
        )
        raise typer.Exit(1) from None

create_notifiers_from_config

create_notifiers_from_config(notification_configs)

Create Notifier instances from notification configuration.

Parameters:

Name Type Description Default
notification_configs list[NotificationConfig]

List of NotificationConfig from job config.

required

Returns:

Type Description
list[Notifier]

List of configured Notifier instances.

Source code in src/marianne/notifications/factory.py
def create_notifiers_from_config(
    notification_configs: list[NotificationConfig],
) -> list[Notifier]:
    """Create Notifier instances from notification configuration.

    Args:
        notification_configs: List of NotificationConfig from job config.

    Returns:
        List of configured Notifier instances.
    """
    from marianne.notifications.desktop import DesktopNotifier
    from marianne.notifications.slack import SlackNotifier
    from marianne.notifications.webhook import WebhookNotifier

    notifiers: list[Notifier] = []

    for config in notification_configs:
        notifier: Notifier | None = None
        # Cast Literal list to str list for from_config methods
        events: list[str] = list(config.on_events)

        if config.type == "desktop":
            notifier = DesktopNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        elif config.type == "slack":
            notifier = SlackNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        elif config.type == "webhook":
            notifier = WebhookNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        else:
            _logger.warning("unknown_notification_type", type=config.type)
            continue

        if notifier:
            notifiers.append(notifier)

    return notifiers

require_conductor

require_conductor(routed, *, json_output=False)

Exit with a clear error when the conductor is not running.

Used by CLI commands that route through the conductor IPC. If the conductor was not reachable, this prints a helpful message directing the user to mzt start.

Parameters:

Name Type Description Default
routed bool

Whether try_daemon_route succeeded.

required
json_output bool

If True, output error as JSON instead of Rich markup.

False

Raises:

Type Description
Exit(1)

If the conductor is not running.

Source code in src/marianne/cli/helpers.py
def require_conductor(
    routed: bool,
    *,
    json_output: bool = False,
) -> None:
    """Exit with a clear error when the conductor is not running.

    Used by CLI commands that route through the conductor IPC. If the
    conductor was not reachable, this prints a helpful message directing
    the user to ``mzt start``.

    Args:
        routed: Whether ``try_daemon_route`` succeeded.
        json_output: If True, output error as JSON instead of Rich markup.

    Raises:
        typer.Exit(1): If the conductor is not running.
    """
    if routed:
        return

    from .output import output_error

    output_error(
        "Marianne conductor is not running.",
        hints=["Start it with: mzt start"],
        json_output=json_output,
    )
    raise typer.Exit(1)

await_early_failure async

await_early_failure(job_id, *, timeout=1.5, poll_interval=0.2)

Poll job status briefly to detect early failures after submission.

After a job is submitted and accepted, template rendering errors or other immediate failures happen within milliseconds. This function polls the daemon for up to timeout seconds so the CLI can report the failure inline instead of printing a cheerful "Job queued".

Fail-open: any exception returns None so this never blocks the CLI.

Source code in src/marianne/cli/helpers.py
async def await_early_failure(
    job_id: str,
    *,
    timeout: float = 1.5,
    poll_interval: float = 0.2,
) -> dict[str, Any] | None:
    """Poll job status briefly to detect early failures after submission.

    After a job is submitted and accepted, template rendering errors or
    other immediate failures happen within milliseconds.  This function
    polls the daemon for up to ``timeout`` seconds so the CLI can report
    the failure inline instead of printing a cheerful "Job queued".

    Fail-open: any exception returns ``None`` so this never blocks the CLI.
    """
    try:
        from marianne.daemon.detect import _resolve_socket_path
        from marianne.daemon.ipc.client import DaemonClient

        socket_path = _resolve_socket_path(None)
        client = DaemonClient(socket_path)

        _terminal_states = {"failed", "cancelled"}
        _active_states = {"running", "queued"}

        elapsed = 0.0
        while elapsed < timeout:
            await asyncio.sleep(poll_interval)
            elapsed += poll_interval

            result = await client.call("job.status", {"job_id": job_id})
            if not isinstance(result, dict):
                continue

            status = result.get("status", "")
            if status in _terminal_states:
                return result
            if status == "completed":
                return result
            if status in _active_states:
                continue

        return None
    except Exception:
        return None

query_rate_limits async

query_rate_limits()

Query the conductor for active rate limit information.

Returns the backends dict from the daemon.rate_limits IPC response, e.g. {"claude-cli": {"seconds_remaining": 120.0}}.

Returns None if the conductor is unreachable or an error occurs. Fail-open: this never raises — callers can safely display extra info when available and skip it when not.

Source code in src/marianne/cli/helpers.py
async def query_rate_limits() -> dict[str, dict[str, float]] | None:
    """Query the conductor for active rate limit information.

    Returns the ``backends`` dict from the ``daemon.rate_limits`` IPC
    response, e.g. ``{"claude-cli": {"seconds_remaining": 120.0}}``.

    Returns ``None`` if the conductor is unreachable or an error occurs.
    Fail-open: this never raises — callers can safely display extra info
    when available and skip it when not.
    """
    try:
        from marianne.daemon.detect import try_daemon_route

        routed, result = await try_daemon_route("daemon.rate_limits", {})
        if not routed or not isinstance(result, dict):
            return None
        backends = result.get("backends")
        if isinstance(backends, dict):
            return backends
        return None
    except Exception:
        return None

check_pid_alive

check_pid_alive(pid)

Check if a process with the given PID is running.

Uses os.kill(pid, 0) — signal 0 checks existence without sending a signal.

Returns:

Type Description
bool

True if the process exists (even if owned by another user).

bool

False if the process does not exist.

Source code in src/marianne/cli/helpers.py
def check_pid_alive(pid: int) -> bool:
    """Check if a process with the given PID is running.

    Uses ``os.kill(pid, 0)`` — signal 0 checks existence without sending
    a signal.

    Returns:
        ``True`` if the process exists (even if owned by another user).
        ``False`` if the process does not exist.
    """
    import os

    try:
        os.kill(pid, 0)
        return True
    except PermissionError:
        # Process exists but owned by another user
        return True
    except OSError:
        # Process does not exist
        return False

get_last_activity_time

get_last_activity_time(job)

Get the most recent activity timestamp from the job.

Checks sheet last_activity_at fields and updated_at.

Parameters:

Name Type Description Default
job CheckpointState

CheckpointState to check.

required

Returns:

Type Description
datetime | None

datetime of last activity, or None if not available.

Source code in src/marianne/cli/helpers.py
def get_last_activity_time(job: CheckpointState) -> datetime | None:
    """Get the most recent activity timestamp from the job.

    Checks sheet last_activity_at fields and updated_at.

    Args:
        job: CheckpointState to check.

    Returns:
        datetime of last activity, or None if not available.
    """
    candidates = [
        ts for ts in (
            job.updated_at,
            *(sheet.last_activity_at for sheet in job.sheets.values()),
        )
        if ts is not None
    ]
    return max(candidates) if candidates else None