Skip to content

client

client

Async Unix domain socket client for Marianne daemon IPC.

Provides DaemonClient with two call patterns:

  • call(method, params): send a JSON-RPC request, await a single response. Uses a ConnectionPool to reuse connections across calls.
  • stream(method, params): send a request, yield streaming notifications, and return the final result when the stream ends.

Plus typed convenience methods (status, submit_job, etc.) that wrap call with Pydantic model serialization for type safety at the CLI layer.

Attributes

Classes

ConnectionPool

ConnectionPool(socket_path, *, max_size=_DEFAULT_POOL_SIZE, max_idle_seconds=_DEFAULT_MAX_IDLE_SECONDS, connect_timeout=_DEFAULT_CONNECT_TIMEOUT, acquire_timeout=30.0)

LIFO connection pool for Unix domain sockets.

Internal implementation detail — not exported. Manages a bounded set of reusable (reader, writer) pairs so DaemonClient.call() does not pay the cost of opening a fresh socket for every RPC.

Parameters

socket_path: Unix domain socket to connect to. max_size: Maximum number of connections (checked-out + idle combined). max_idle_seconds: Connections idle longer than this are discarded on next acquire. connect_timeout: Seconds to wait when opening a new connection. acquire_timeout: Seconds to wait for the pool semaphore when all slots are busy.

Source code in src/marianne/daemon/ipc/client.py
def __init__(
    self,
    socket_path: Path,
    *,
    max_size: int = _DEFAULT_POOL_SIZE,
    max_idle_seconds: float = _DEFAULT_MAX_IDLE_SECONDS,
    connect_timeout: float = _DEFAULT_CONNECT_TIMEOUT,
    acquire_timeout: float = 30.0,
) -> None:
    if max_size < 1:
        raise ValueError(f"max_size must be >= 1, got {max_size}")
    if max_idle_seconds <= 0:
        raise ValueError(f"max_idle_seconds must be > 0, got {max_idle_seconds}")
    if connect_timeout <= 0:
        raise ValueError(f"connect_timeout must be > 0, got {connect_timeout}")
    if acquire_timeout <= 0:
        raise ValueError(f"acquire_timeout must be > 0, got {acquire_timeout}")

    self._socket_path = socket_path
    self._max_size = max_size
    self._max_idle_seconds = max_idle_seconds
    self._connect_timeout = connect_timeout
    self._acquire_timeout = acquire_timeout

    # LIFO idle stack: (reader, writer, idle_since_monotonic)
    self._idle: list[tuple[asyncio.StreamReader, asyncio.StreamWriter, float]] = []
    self._semaphore = asyncio.Semaphore(max_size)
    self._closed = False
Attributes
closed property
closed

Whether the pool has been closed.

Functions
acquire async
acquire()

Acquire a connection from the pool.

Tries idle connections first (LIFO), skipping stale or broken ones. Opens a new connection if no idle connection is available.

Raises:

Type Description
DaemonNotRunningError

if the pool is closed or connection fails.

TimeoutError

if the pool semaphore cannot be acquired in time.

Source code in src/marianne/daemon/ipc/client.py
async def acquire(
    self,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
    """Acquire a connection from the pool.

    Tries idle connections first (LIFO), skipping stale or broken ones.
    Opens a new connection if no idle connection is available.

    Raises:
        DaemonNotRunningError: if the pool is closed or connection fails.
        TimeoutError: if the pool semaphore cannot be acquired in time.
    """
    if self._closed:
        raise DaemonNotRunningError("Connection pool is closed")

    try:
        await asyncio.wait_for(
            self._semaphore.acquire(), timeout=self._acquire_timeout,
        )
    except TimeoutError as exc:
        raise TimeoutError(
            f"Pool exhausted: all {self._max_size} connections in use"
        ) from exc

    # Try idle connections (LIFO — hot connections first)
    now = time.monotonic()
    while self._idle:
        reader, writer, idle_since = self._idle.pop()

        # Skip stale connections
        if (now - idle_since) > self._max_idle_seconds:
            _logger.debug("pool_discard_stale")
            self._close_writer(writer)
            continue

        # Skip broken connections
        if writer.is_closing() or reader.at_eof():
            _logger.debug("pool_discard_broken")
            self._close_writer(writer)
            continue

        _logger.debug("pool_reuse_connection")
        return reader, writer

    # No usable idle connection — open a fresh one
    return await self._open_connection()
release
release(reader, writer)

Return a healthy connection to the idle stack.

Source code in src/marianne/daemon/ipc/client.py
def release(
    self,
    reader: asyncio.StreamReader,
    writer: asyncio.StreamWriter,
) -> None:
    """Return a healthy connection to the idle stack."""
    if self._closed or writer.is_closing() or reader.at_eof():
        self._close_writer(writer)
        self._semaphore.release()
        return

    if len(self._idle) >= self._max_size:
        # Idle stack full — discard this connection
        self._close_writer(writer)
        self._semaphore.release()
        return

    self._idle.append((reader, writer, time.monotonic()))
    self._semaphore.release()
    _logger.debug("pool_release_connection", idle_count=len(self._idle))
discard
discard(writer)

Discard a broken connection and release its semaphore slot.

Source code in src/marianne/daemon/ipc/client.py
def discard(self, writer: asyncio.StreamWriter) -> None:
    """Discard a broken connection and release its semaphore slot."""
    self._close_writer(writer)
    self._semaphore.release()
    _logger.debug("pool_discard_connection")
close async
close()

Close the pool and all idle connections. Idempotent.

Source code in src/marianne/daemon/ipc/client.py
async def close(self) -> None:
    """Close the pool and all idle connections.  Idempotent."""
    if self._closed:
        return
    self._closed = True

    while self._idle:
        _, writer, _ = self._idle.pop()
        self._close_writer(writer)

    _logger.debug("pool_closed")

DaemonClient

DaemonClient(socket_path, *, timeout=30.0, pool_size=_DEFAULT_POOL_SIZE)

Async client for the Marianne daemon Unix socket IPC.

Parameters

socket_path: Path to the Unix domain socket created by DaemonServer. timeout: Seconds to wait for a response before raising TimeoutError. pool_size: Maximum number of pooled connections for call().

Source code in src/marianne/daemon/ipc/client.py
def __init__(
    self,
    socket_path: Path,
    *,
    timeout: float = 30.0,
    pool_size: int = _DEFAULT_POOL_SIZE,
) -> None:
    self._socket_path = socket_path
    self._timeout = timeout
    self._pool_size = pool_size
    self._next_id = 0
    self._pool: ConnectionPool | None = None
Functions
close async
close()

Close the connection pool. Safe to call multiple times.

Source code in src/marianne/daemon/ipc/client.py
async def close(self) -> None:
    """Close the connection pool.  Safe to call multiple times."""
    if self._pool is not None:
        await self._pool.close()
        self._pool = None
call async
call(method, params=None)

Send a JSON-RPC request and return the result.

Uses the connection pool. If a pooled connection is stale (server closed it, daemon restarted), discards it and retries once with a fresh connection.

Raises:

Type Description
DaemonNotRunningError

socket unreachable

DaemonError(subclass)

server returned an error response

TimeoutError

no response within self._timeout

Source code in src/marianne/daemon/ipc/client.py
async def call(
    self,
    method: str,
    params: dict[str, Any] | None = None,
) -> Any:
    """Send a JSON-RPC request and return the result.

    Uses the connection pool.  If a pooled connection is stale (server
    closed it, daemon restarted), discards it and retries once with a
    fresh connection.

    Raises:
        DaemonNotRunningError: socket unreachable
        DaemonError (subclass): server returned an error response
        TimeoutError: no response within ``self._timeout``
    """
    request = JsonRpcRequest(
        method=method,
        params=params,
        id=self._next_request_id(),
    )

    pool = self._get_pool()
    reader, writer = await pool.acquire()

    try:
        result = await self._send_and_receive(reader, writer, request)
    except _RETRYABLE_IO_ERRORS:
        # Stale or broken connection — discard and retry once
        pool.discard(writer)
        _logger.debug("pool_retry_on_stale", method=method)
        reader, writer = await pool.acquire()
        try:
            result = await self._send_and_receive(reader, writer, request)
        except _RETRYABLE_IO_ERRORS:
            pool.discard(writer)
            raise
        except Exception:
            pool.discard(writer)
            raise
    except DaemonNotRunningError:
        # Empty readline — server closed the connection
        pool.discard(writer)
        _logger.debug("pool_retry_on_disconnect", method=method)
        reader, writer = await pool.acquire()
        try:
            result = await self._send_and_receive(reader, writer, request)
        except Exception:
            pool.discard(writer)
            raise
    except Exception:
        # Application-level errors (DaemonError from JSON-RPC error)
        # — the connection is healthy, release it
        pool.release(reader, writer)
        raise

    pool.release(reader, writer)
    return result
stream async
stream(method, params=None)

Send a JSON-RPC request and yield streaming notifications.

Notifications (no id) are yielded as dicts. The final response (with id) terminates the iteration.

If the final response contains an error, raises the corresponding DaemonError.

Source code in src/marianne/daemon/ipc/client.py
async def stream(
    self,
    method: str,
    params: dict[str, Any] | None = None,
) -> AsyncIterator[dict[str, Any]]:
    """Send a JSON-RPC request and yield streaming notifications.

    Notifications (no ``id``) are yielded as dicts.  The final
    response (with ``id``) terminates the iteration.

    If the final response contains an error, raises the
    corresponding ``DaemonError``.
    """
    request = JsonRpcRequest(
        method=method,
        params=params,
        id=self._next_request_id(),
    )

    async with self._connect() as (reader, writer):
        writer.write(request.model_dump_json().encode() + b"\n")
        await writer.drain()

        async for raw_line in reader:
            if not raw_line:
                break

            msg = json.loads(raw_line)

            # Check if this is the final response (has our request id)
            msg_id = msg.get("id")
            if msg_id is not None:
                # Final response — stream is done
                if "error" in msg:
                    raise rpc_error_to_exception(msg["error"])
                return

            # Notification — yield params to caller
            yield msg.get("params", {})
is_daemon_running async
is_daemon_running()

Check if daemon is running by performing a lightweight health RPC.

Uses daemon.health instead of bare socket connect so stale sockets left by crashed daemons are properly detected.

Short-circuits immediately if the socket path doesn't exist, consistent with _connect()'s guard.

Source code in src/marianne/daemon/ipc/client.py
async def is_daemon_running(self) -> bool:
    """Check if daemon is running by performing a lightweight health RPC.

    Uses ``daemon.health`` instead of bare socket connect so stale
    sockets left by crashed daemons are properly detected.

    Short-circuits immediately if the socket path doesn't exist,
    consistent with ``_connect()``'s guard.
    """
    if not self._socket_path.exists():
        return False
    try:
        await self.call("daemon.health")
        return True
    except (ConnectionRefusedError, FileNotFoundError, TimeoutError, OSError):
        return False
    except Exception:
        # Any other failure (malformed response, protocol error, etc.)
        # means the daemon is not functional.
        return False
status async
status()

Get daemon status.

Source code in src/marianne/daemon/ipc/client.py
async def status(self) -> DaemonStatus:
    """Get daemon status."""
    result = await self.call("daemon.status")
    return DaemonStatus(**result)
submit_job async
submit_job(request)

Submit a job to the daemon.

Source code in src/marianne/daemon/ipc/client.py
async def submit_job(self, request: JobRequest) -> JobResponse:
    """Submit a job to the daemon."""
    result = await self.call("job.submit", request.model_dump(mode="json"))
    return JobResponse(**result)
get_job_status async
get_job_status(job_id, workspace)

Get status of a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def get_job_status(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Get status of a specific job."""
    result = await self.call("job.status", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
pause_job async
pause_job(job_id, workspace)

Pause a running job. Returns {"paused": bool}.

Source code in src/marianne/daemon/ipc/client.py
async def pause_job(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Pause a running job. Returns ``{"paused": bool}``."""
    result = await self.call("job.pause", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
resume_job async
resume_job(job_id, workspace)

Resume a paused job. Returns a JobResponse dict.

Source code in src/marianne/daemon/ipc/client.py
async def resume_job(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Resume a paused job. Returns a JobResponse dict."""
    result = await self.call("job.resume", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
cancel_job async
cancel_job(job_id, workspace)

Cancel a running or paused job. Returns {"cancelled": bool}.

Source code in src/marianne/daemon/ipc/client.py
async def cancel_job(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Cancel a running or paused job. Returns ``{"cancelled": bool}``."""
    result = await self.call("job.cancel", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
list_jobs async
list_jobs()

List all jobs known to the daemon.

Source code in src/marianne/daemon/ipc/client.py
async def list_jobs(self) -> list[dict[str, Any]]:
    """List all jobs known to the daemon."""
    return cast(list[dict[str, Any]], await self.call("job.list"))
clear_jobs async
clear_jobs(statuses=None, older_than_seconds=None, job_ids=None)

Clear terminal jobs from the daemon registry.

Parameters:

Name Type Description Default
statuses list[str] | None

Status filter (defaults to terminal statuses).

None
older_than_seconds float | None

Only clear jobs older than this.

None
job_ids list[str] | None

Only clear these specific job IDs.

None

Returns:

Type Description
dict[str, Any]

Dict with "deleted" count.

Source code in src/marianne/daemon/ipc/client.py
async def clear_jobs(
    self,
    statuses: list[str] | None = None,
    older_than_seconds: float | None = None,
    job_ids: list[str] | None = None,
) -> dict[str, Any]:
    """Clear terminal jobs from the daemon registry.

    Args:
        statuses: Status filter (defaults to terminal statuses).
        older_than_seconds: Only clear jobs older than this.
        job_ids: Only clear these specific job IDs.

    Returns:
        Dict with "deleted" count.
    """
    params: dict[str, Any] = {}
    if statuses is not None:
        params["statuses"] = statuses
    if older_than_seconds is not None:
        params["older_than_seconds"] = older_than_seconds
    if job_ids is not None:
        params["job_ids"] = job_ids
    return cast(dict[str, Any], await self.call("job.clear", params))
config async
config()

Get the conductor's live running configuration.

Source code in src/marianne/daemon/ipc/client.py
async def config(self) -> dict[str, Any]:
    """Get the conductor's live running configuration."""
    return cast(dict[str, Any], await self.call("daemon.config"))
health async
health()

Liveness probe — is the daemon process alive?

Source code in src/marianne/daemon/ipc/client.py
async def health(self) -> dict[str, Any]:
    """Liveness probe — is the daemon process alive?"""
    return cast(dict[str, Any], await self.call("daemon.health"))
readiness async
readiness()

Readiness probe — is the daemon accepting new jobs?

Source code in src/marianne/daemon/ipc/client.py
async def readiness(self) -> dict[str, Any]:
    """Readiness probe — is the daemon accepting new jobs?"""
    return cast(dict[str, Any], await self.call("daemon.ready"))
get_errors async
get_errors(job_id, workspace=None)

Get errors for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def get_errors(self, job_id: str, workspace: str | None = None) -> dict[str, Any]:
    """Get errors for a specific job."""
    result = await self.call("job.errors", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
diagnose async
diagnose(job_id, workspace=None)

Get diagnostic data for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def diagnose(self, job_id: str, workspace: str | None = None) -> dict[str, Any]:
    """Get diagnostic data for a specific job."""
    result = await self.call("job.diagnose", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
get_execution_history async
get_execution_history(job_id, workspace=None, sheet_num=None, limit=50)

Get execution history for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def get_execution_history(
    self, job_id: str, workspace: str | None = None,
    sheet_num: int | None = None, limit: int = 50,
) -> dict[str, Any]:
    """Get execution history for a specific job."""
    result = await self.call("job.history", {
        "job_id": job_id, "workspace": workspace,
        SHEET_NUM_KEY: sheet_num, "limit": limit,
    })
    return cast(dict[str, Any], result)
recover_job async
recover_job(job_id, workspace=None, sheet_num=None, dry_run=False)

Request recovery data for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def recover_job(
    self, job_id: str, workspace: str | None = None,
    sheet_num: int | None = None, dry_run: bool = False,
) -> dict[str, Any]:
    """Request recovery data for a specific job."""
    result = await self.call("job.recover", {
        "job_id": job_id, "workspace": workspace,
        SHEET_NUM_KEY: sheet_num, "dry_run": dry_run,
    })
    return cast(dict[str, Any], result)
rate_limits async
rate_limits()

Get current rate limit state across all backends.

Source code in src/marianne/daemon/ipc/client.py
async def rate_limits(self) -> dict[str, Any]:
    """Get current rate limit state across all backends."""
    return cast(dict[str, Any], await self.call("daemon.rate_limits"))
learning_patterns async
learning_patterns(limit=20)

Get recent learning patterns from the global store.

Source code in src/marianne/daemon/ipc/client.py
async def learning_patterns(self, limit: int = 20) -> dict[str, Any]:
    """Get recent learning patterns from the global store."""
    return cast(dict[str, Any], await self.call("daemon.learning.patterns", {"limit": limit}))

Functions