Skip to content

Index

ipc

IPC layer for the conductor — Unix socket + JSON-RPC 2.0.

Classes

DaemonClient

DaemonClient(socket_path, *, timeout=30.0, pool_size=_DEFAULT_POOL_SIZE)

Async client for the Marianne daemon Unix socket IPC.

Parameters

socket_path: Path to the Unix domain socket created by DaemonServer. timeout: Seconds to wait for a response before raising TimeoutError. pool_size: Maximum number of pooled connections for call().

Source code in src/marianne/daemon/ipc/client.py
def __init__(
    self,
    socket_path: Path,
    *,
    timeout: float = 30.0,
    pool_size: int = _DEFAULT_POOL_SIZE,
) -> None:
    self._socket_path = socket_path
    self._timeout = timeout
    self._pool_size = pool_size
    self._next_id = 0
    self._pool: ConnectionPool | None = None
Functions
close async
close()

Close the connection pool. Safe to call multiple times.

Source code in src/marianne/daemon/ipc/client.py
async def close(self) -> None:
    """Close the connection pool.  Safe to call multiple times."""
    if self._pool is not None:
        await self._pool.close()
        self._pool = None
call async
call(method, params=None)

Send a JSON-RPC request and return the result.

Uses the connection pool. If a pooled connection is stale (server closed it, daemon restarted), discards it and retries once with a fresh connection.

Raises:

Type Description
DaemonNotRunningError

socket unreachable

DaemonError(subclass)

server returned an error response

TimeoutError

no response within self._timeout

Source code in src/marianne/daemon/ipc/client.py
async def call(
    self,
    method: str,
    params: dict[str, Any] | None = None,
) -> Any:
    """Send a JSON-RPC request and return the result.

    Uses the connection pool.  If a pooled connection is stale (server
    closed it, daemon restarted), discards it and retries once with a
    fresh connection.

    Raises:
        DaemonNotRunningError: socket unreachable
        DaemonError (subclass): server returned an error response
        TimeoutError: no response within ``self._timeout``
    """
    request = JsonRpcRequest(
        method=method,
        params=params,
        id=self._next_request_id(),
    )

    pool = self._get_pool()
    reader, writer = await pool.acquire()

    try:
        result = await self._send_and_receive(reader, writer, request)
    except _RETRYABLE_IO_ERRORS:
        # Stale or broken connection — discard and retry once
        pool.discard(writer)
        _logger.debug("pool_retry_on_stale", method=method)
        reader, writer = await pool.acquire()
        try:
            result = await self._send_and_receive(reader, writer, request)
        except _RETRYABLE_IO_ERRORS:
            pool.discard(writer)
            raise
        except Exception:
            pool.discard(writer)
            raise
    except DaemonNotRunningError:
        # Empty readline — server closed the connection
        pool.discard(writer)
        _logger.debug("pool_retry_on_disconnect", method=method)
        reader, writer = await pool.acquire()
        try:
            result = await self._send_and_receive(reader, writer, request)
        except Exception:
            pool.discard(writer)
            raise
    except Exception:
        # Application-level errors (DaemonError from JSON-RPC error)
        # — the connection is healthy, release it
        pool.release(reader, writer)
        raise

    pool.release(reader, writer)
    return result
stream async
stream(method, params=None)

Send a JSON-RPC request and yield streaming notifications.

Notifications (no id) are yielded as dicts. The final response (with id) terminates the iteration.

If the final response contains an error, raises the corresponding DaemonError.

Source code in src/marianne/daemon/ipc/client.py
async def stream(
    self,
    method: str,
    params: dict[str, Any] | None = None,
) -> AsyncIterator[dict[str, Any]]:
    """Send a JSON-RPC request and yield streaming notifications.

    Notifications (no ``id``) are yielded as dicts.  The final
    response (with ``id``) terminates the iteration.

    If the final response contains an error, raises the
    corresponding ``DaemonError``.
    """
    request = JsonRpcRequest(
        method=method,
        params=params,
        id=self._next_request_id(),
    )

    async with self._connect() as (reader, writer):
        writer.write(request.model_dump_json().encode() + b"\n")
        await writer.drain()

        async for raw_line in reader:
            if not raw_line:
                break

            msg = json.loads(raw_line)

            # Check if this is the final response (has our request id)
            msg_id = msg.get("id")
            if msg_id is not None:
                # Final response — stream is done
                if "error" in msg:
                    raise rpc_error_to_exception(msg["error"])
                return

            # Notification — yield params to caller
            yield msg.get("params", {})
is_daemon_running async
is_daemon_running()

Check if daemon is running by performing a lightweight health RPC.

Uses daemon.health instead of bare socket connect so stale sockets left by crashed daemons are properly detected.

Short-circuits immediately if the socket path doesn't exist, consistent with _connect()'s guard.

Source code in src/marianne/daemon/ipc/client.py
async def is_daemon_running(self) -> bool:
    """Check if daemon is running by performing a lightweight health RPC.

    Uses ``daemon.health`` instead of bare socket connect so stale
    sockets left by crashed daemons are properly detected.

    Short-circuits immediately if the socket path doesn't exist,
    consistent with ``_connect()``'s guard.
    """
    if not self._socket_path.exists():
        return False
    try:
        await self.call("daemon.health")
        return True
    except (ConnectionRefusedError, FileNotFoundError, TimeoutError, OSError):
        return False
    except Exception:
        # Any other failure (malformed response, protocol error, etc.)
        # means the daemon is not functional.
        return False
status async
status()

Get daemon status.

Source code in src/marianne/daemon/ipc/client.py
async def status(self) -> DaemonStatus:
    """Get daemon status."""
    result = await self.call("daemon.status")
    return DaemonStatus(**result)
submit_job async
submit_job(request)

Submit a job to the daemon.

Source code in src/marianne/daemon/ipc/client.py
async def submit_job(self, request: JobRequest) -> JobResponse:
    """Submit a job to the daemon."""
    result = await self.call("job.submit", request.model_dump(mode="json"))
    return JobResponse(**result)
get_job_status async
get_job_status(job_id, workspace)

Get status of a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def get_job_status(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Get status of a specific job."""
    result = await self.call("job.status", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
pause_job async
pause_job(job_id, workspace)

Pause a running job. Returns {"paused": bool}.

Source code in src/marianne/daemon/ipc/client.py
async def pause_job(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Pause a running job. Returns ``{"paused": bool}``."""
    result = await self.call("job.pause", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
resume_job async
resume_job(job_id, workspace)

Resume a paused job. Returns a JobResponse dict.

Source code in src/marianne/daemon/ipc/client.py
async def resume_job(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Resume a paused job. Returns a JobResponse dict."""
    result = await self.call("job.resume", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
cancel_job async
cancel_job(job_id, workspace)

Cancel a running or paused job. Returns {"cancelled": bool}.

Source code in src/marianne/daemon/ipc/client.py
async def cancel_job(self, job_id: str, workspace: str) -> dict[str, Any]:
    """Cancel a running or paused job. Returns ``{"cancelled": bool}``."""
    result = await self.call("job.cancel", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
list_jobs async
list_jobs()

List all jobs known to the daemon.

Source code in src/marianne/daemon/ipc/client.py
async def list_jobs(self) -> list[dict[str, Any]]:
    """List all jobs known to the daemon."""
    return cast(list[dict[str, Any]], await self.call("job.list"))
clear_jobs async
clear_jobs(statuses=None, older_than_seconds=None, job_ids=None)

Clear terminal jobs from the daemon registry.

Parameters:

Name Type Description Default
statuses list[str] | None

Status filter (defaults to terminal statuses).

None
older_than_seconds float | None

Only clear jobs older than this.

None
job_ids list[str] | None

Only clear these specific job IDs.

None

Returns:

Type Description
dict[str, Any]

Dict with "deleted" count.

Source code in src/marianne/daemon/ipc/client.py
async def clear_jobs(
    self,
    statuses: list[str] | None = None,
    older_than_seconds: float | None = None,
    job_ids: list[str] | None = None,
) -> dict[str, Any]:
    """Clear terminal jobs from the daemon registry.

    Args:
        statuses: Status filter (defaults to terminal statuses).
        older_than_seconds: Only clear jobs older than this.
        job_ids: Only clear these specific job IDs.

    Returns:
        Dict with "deleted" count.
    """
    params: dict[str, Any] = {}
    if statuses is not None:
        params["statuses"] = statuses
    if older_than_seconds is not None:
        params["older_than_seconds"] = older_than_seconds
    if job_ids is not None:
        params["job_ids"] = job_ids
    return cast(dict[str, Any], await self.call("job.clear", params))
config async
config()

Get the conductor's live running configuration.

Source code in src/marianne/daemon/ipc/client.py
async def config(self) -> dict[str, Any]:
    """Get the conductor's live running configuration."""
    return cast(dict[str, Any], await self.call("daemon.config"))
health async
health()

Liveness probe — is the daemon process alive?

Source code in src/marianne/daemon/ipc/client.py
async def health(self) -> dict[str, Any]:
    """Liveness probe — is the daemon process alive?"""
    return cast(dict[str, Any], await self.call("daemon.health"))
readiness async
readiness()

Readiness probe — is the daemon accepting new jobs?

Source code in src/marianne/daemon/ipc/client.py
async def readiness(self) -> dict[str, Any]:
    """Readiness probe — is the daemon accepting new jobs?"""
    return cast(dict[str, Any], await self.call("daemon.ready"))
get_errors async
get_errors(job_id, workspace=None)

Get errors for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def get_errors(self, job_id: str, workspace: str | None = None) -> dict[str, Any]:
    """Get errors for a specific job."""
    result = await self.call("job.errors", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
diagnose async
diagnose(job_id, workspace=None)

Get diagnostic data for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def diagnose(self, job_id: str, workspace: str | None = None) -> dict[str, Any]:
    """Get diagnostic data for a specific job."""
    result = await self.call("job.diagnose", {"job_id": job_id, "workspace": workspace})
    return cast(dict[str, Any], result)
get_execution_history async
get_execution_history(job_id, workspace=None, sheet_num=None, limit=50)

Get execution history for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def get_execution_history(
    self, job_id: str, workspace: str | None = None,
    sheet_num: int | None = None, limit: int = 50,
) -> dict[str, Any]:
    """Get execution history for a specific job."""
    result = await self.call("job.history", {
        "job_id": job_id, "workspace": workspace,
        SHEET_NUM_KEY: sheet_num, "limit": limit,
    })
    return cast(dict[str, Any], result)
recover_job async
recover_job(job_id, workspace=None, sheet_num=None, dry_run=False)

Request recovery data for a specific job.

Source code in src/marianne/daemon/ipc/client.py
async def recover_job(
    self, job_id: str, workspace: str | None = None,
    sheet_num: int | None = None, dry_run: bool = False,
) -> dict[str, Any]:
    """Request recovery data for a specific job."""
    result = await self.call("job.recover", {
        "job_id": job_id, "workspace": workspace,
        SHEET_NUM_KEY: sheet_num, "dry_run": dry_run,
    })
    return cast(dict[str, Any], result)
rate_limits async
rate_limits()

Get current rate limit state across all backends.

Source code in src/marianne/daemon/ipc/client.py
async def rate_limits(self) -> dict[str, Any]:
    """Get current rate limit state across all backends."""
    return cast(dict[str, Any], await self.call("daemon.rate_limits"))
learning_patterns async
learning_patterns(limit=20)

Get recent learning patterns from the global store.

Source code in src/marianne/daemon/ipc/client.py
async def learning_patterns(self, limit: int = 20) -> dict[str, Any]:
    """Get recent learning patterns from the global store."""
    return cast(dict[str, Any], await self.call("daemon.learning.patterns", {"limit": limit}))

DaemonServer

DaemonServer(socket_path, handler, *, permissions=432, max_connections=DEFAULT_MAX_CONNECTIONS, max_concurrent_requests=DEFAULT_MAX_CONCURRENT_REQUESTS)

Async Unix domain socket server with JSON-RPC 2.0 routing.

Parameters

socket_path: Filesystem path for the Unix domain socket. handler: RequestHandler that dispatches JSON-RPC methods. permissions: Octal file permissions applied to the socket after creation. Defaults to 0o660 (owner + group read/write). max_connections: Maximum simultaneous connected clients. FD protection — idle connections are cheap, so the default is high (~500). max_concurrent_requests: Maximum requests being processed at once across all connections. This is the real concurrency control (~50).

Source code in src/marianne/daemon/ipc/server.py
def __init__(
    self,
    socket_path: Path,
    handler: RequestHandler,
    *,
    permissions: int = 0o660,
    max_connections: int = DEFAULT_MAX_CONNECTIONS,
    max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS,
) -> None:
    if max_connections < 1:
        raise ValueError(f"max_connections must be >= 1, got {max_connections}")
    if max_concurrent_requests < 1:
        raise ValueError(
            f"max_concurrent_requests must be >= 1, got {max_concurrent_requests}"
        )

    self._socket_path = socket_path
    self._handler = handler
    self._permissions = permissions
    self._max_connections = max_connections
    self._max_concurrent_requests = max_concurrent_requests
    self._server: asyncio.Server | None = None
    self._connections: set[asyncio.Task[None]] = set()
    self._connection_semaphore = asyncio.Semaphore(max_connections)
    self._request_semaphore = asyncio.Semaphore(max_concurrent_requests)
Attributes
is_running property
is_running

Return whether the server is currently accepting connections.

Functions
start async
start()

Bind the Unix socket and start accepting connections.

Source code in src/marianne/daemon/ipc/server.py
async def start(self) -> None:
    """Bind the Unix socket and start accepting connections."""
    # Ensure parent directory exists
    self._socket_path.parent.mkdir(parents=True, exist_ok=True)
    # Reject symlinks to prevent redirection attacks
    if self._socket_path.is_symlink():
        raise OSError(
            f"Socket path is a symlink (possible attack): {self._socket_path}"
        )
    # Remove stale socket from a previous unclean shutdown
    self._socket_path.unlink(missing_ok=True)

    self._server = await asyncio.start_unix_server(
        self._accept_connection,
        path=str(self._socket_path),
        limit=MAX_MESSAGE_BYTES,
    )
    os.chmod(self._socket_path, self._permissions)

    _logger.info(
        "ipc_server_started",
        socket_path=str(self._socket_path),
        max_connections=self._max_connections,
        max_concurrent_requests=self._max_concurrent_requests,
    )
stop async
stop()

Stop the server, cancel active connections, and remove the socket.

Source code in src/marianne/daemon/ipc/server.py
async def stop(self) -> None:
    """Stop the server, cancel active connections, and remove the socket."""
    if self._server is None:
        return

    # Stop accepting new connections
    self._server.close()
    await self._server.wait_closed()

    # Cancel all active connection tasks
    for task in self._connections:
        task.cancel()
    if self._connections:
        results = await asyncio.gather(*self._connections, return_exceptions=True)
        for result in results:
            if isinstance(result, BaseException) and not isinstance(
                result, asyncio.CancelledError
            ):
                _logger.warning(
                    "ipc_server.connection_exception_during_stop",
                    error=str(result),
                    error_type=type(result).__name__,
                )
    self._connections.clear()

    # Clean up socket file
    self._socket_path.unlink(missing_ok=True)
    self._server = None

    _logger.info("ipc_server_stopped")