Skip to content

server

server

Async Unix domain socket server for Marianne daemon IPC.

Binds a Unix socket, accepts concurrent client connections, reads newline-delimited JSON-RPC 2.0 requests, dispatches them through RequestHandler, and writes responses back. Handles connection lifecycle and socket cleanup on shutdown.

Classes

DaemonServer

DaemonServer(socket_path, handler, *, permissions=432, max_connections=DEFAULT_MAX_CONNECTIONS, max_concurrent_requests=DEFAULT_MAX_CONCURRENT_REQUESTS)

Async Unix domain socket server with JSON-RPC 2.0 routing.

Parameters

socket_path: Filesystem path for the Unix domain socket. handler: RequestHandler that dispatches JSON-RPC methods. permissions: Octal file permissions applied to the socket after creation. Defaults to 0o660 (owner + group read/write). max_connections: Maximum simultaneous connected clients. FD protection — idle connections are cheap, so the default is high (~500). max_concurrent_requests: Maximum requests being processed at once across all connections. This is the real concurrency control (~50).

Source code in src/marianne/daemon/ipc/server.py
def __init__(
    self,
    socket_path: Path,
    handler: RequestHandler,
    *,
    permissions: int = 0o660,
    max_connections: int = DEFAULT_MAX_CONNECTIONS,
    max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS,
) -> None:
    if max_connections < 1:
        raise ValueError(f"max_connections must be >= 1, got {max_connections}")
    if max_concurrent_requests < 1:
        raise ValueError(
            f"max_concurrent_requests must be >= 1, got {max_concurrent_requests}"
        )

    self._socket_path = socket_path
    self._handler = handler
    self._permissions = permissions
    self._max_connections = max_connections
    self._max_concurrent_requests = max_concurrent_requests
    self._server: asyncio.Server | None = None
    self._connections: set[asyncio.Task[None]] = set()
    self._connection_semaphore = asyncio.Semaphore(max_connections)
    self._request_semaphore = asyncio.Semaphore(max_concurrent_requests)
Attributes
is_running property
is_running

Return whether the server is currently accepting connections.

Functions
start async
start()

Bind the Unix socket and start accepting connections.

Source code in src/marianne/daemon/ipc/server.py
async def start(self) -> None:
    """Bind the Unix socket and start accepting connections."""
    # Ensure parent directory exists
    self._socket_path.parent.mkdir(parents=True, exist_ok=True)
    # Reject symlinks to prevent redirection attacks
    if self._socket_path.is_symlink():
        raise OSError(
            f"Socket path is a symlink (possible attack): {self._socket_path}"
        )
    # Remove stale socket from a previous unclean shutdown
    self._socket_path.unlink(missing_ok=True)

    self._server = await asyncio.start_unix_server(
        self._accept_connection,
        path=str(self._socket_path),
        limit=MAX_MESSAGE_BYTES,
    )
    os.chmod(self._socket_path, self._permissions)

    _logger.info(
        "ipc_server_started",
        socket_path=str(self._socket_path),
        max_connections=self._max_connections,
        max_concurrent_requests=self._max_concurrent_requests,
    )
stop async
stop()

Stop the server, cancel active connections, and remove the socket.

Source code in src/marianne/daemon/ipc/server.py
async def stop(self) -> None:
    """Stop the server, cancel active connections, and remove the socket."""
    if self._server is None:
        return

    # Stop accepting new connections
    self._server.close()
    await self._server.wait_closed()

    # Cancel all active connection tasks
    for task in self._connections:
        task.cancel()
    if self._connections:
        results = await asyncio.gather(*self._connections, return_exceptions=True)
        for result in results:
            if isinstance(result, BaseException) and not isinstance(
                result, asyncio.CancelledError
            ):
                _logger.warning(
                    "ipc_server.connection_exception_during_stop",
                    error=str(result),
                    error_type=type(result).__name__,
                )
    self._connections.clear()

    # Clean up socket file
    self._socket_path.unlink(missing_ok=True)
    self._server = None

    _logger.info("ipc_server_stopped")

Functions