Async Unix domain socket server for Marianne daemon IPC.
Binds a Unix socket, accepts concurrent client connections, reads
newline-delimited JSON-RPC 2.0 requests, dispatches them through
RequestHandler, and writes responses back. Handles connection
lifecycle and socket cleanup on shutdown.
Classes
DaemonServer
DaemonServer(socket_path, handler, *, permissions=432, max_connections=DEFAULT_MAX_CONNECTIONS, max_concurrent_requests=DEFAULT_MAX_CONCURRENT_REQUESTS)
Async Unix domain socket server with JSON-RPC 2.0 routing.
Parameters
socket_path:
Filesystem path for the Unix domain socket.
handler:
RequestHandler that dispatches JSON-RPC methods.
permissions:
Octal file permissions applied to the socket after creation.
Defaults to 0o660 (owner + group read/write).
max_connections:
Maximum simultaneous connected clients. FD protection — idle
connections are cheap, so the default is high (~500).
max_concurrent_requests:
Maximum requests being processed at once across all connections.
This is the real concurrency control (~50).
Source code in src/marianne/daemon/ipc/server.py
| def __init__(
self,
socket_path: Path,
handler: RequestHandler,
*,
permissions: int = 0o660,
max_connections: int = DEFAULT_MAX_CONNECTIONS,
max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS,
) -> None:
if max_connections < 1:
raise ValueError(f"max_connections must be >= 1, got {max_connections}")
if max_concurrent_requests < 1:
raise ValueError(
f"max_concurrent_requests must be >= 1, got {max_concurrent_requests}"
)
self._socket_path = socket_path
self._handler = handler
self._permissions = permissions
self._max_connections = max_connections
self._max_concurrent_requests = max_concurrent_requests
self._server: asyncio.Server | None = None
self._connections: set[asyncio.Task[None]] = set()
self._connection_semaphore = asyncio.Semaphore(max_connections)
self._request_semaphore = asyncio.Semaphore(max_concurrent_requests)
|
Attributes
is_running
property
Return whether the server is currently accepting connections.
Functions
start
async
Bind the Unix socket and start accepting connections.
Source code in src/marianne/daemon/ipc/server.py
| async def start(self) -> None:
"""Bind the Unix socket and start accepting connections."""
# Ensure parent directory exists
self._socket_path.parent.mkdir(parents=True, exist_ok=True)
# Reject symlinks to prevent redirection attacks
if self._socket_path.is_symlink():
raise OSError(
f"Socket path is a symlink (possible attack): {self._socket_path}"
)
# Remove stale socket from a previous unclean shutdown
self._socket_path.unlink(missing_ok=True)
self._server = await asyncio.start_unix_server(
self._accept_connection,
path=str(self._socket_path),
limit=MAX_MESSAGE_BYTES,
)
os.chmod(self._socket_path, self._permissions)
_logger.info(
"ipc_server_started",
socket_path=str(self._socket_path),
max_connections=self._max_connections,
max_concurrent_requests=self._max_concurrent_requests,
)
|
stop
async
Stop the server, cancel active connections, and remove the socket.
Source code in src/marianne/daemon/ipc/server.py
| async def stop(self) -> None:
"""Stop the server, cancel active connections, and remove the socket."""
if self._server is None:
return
# Stop accepting new connections
self._server.close()
await self._server.wait_closed()
# Cancel all active connection tasks
for task in self._connections:
task.cancel()
if self._connections:
results = await asyncio.gather(*self._connections, return_exceptions=True)
for result in results:
if isinstance(result, BaseException) and not isinstance(
result, asyncio.CancelledError
):
_logger.warning(
"ipc_server.connection_exception_during_stop",
error=str(result),
error_type=type(result).__name__,
)
self._connections.clear()
# Clean up socket file
self._socket_path.unlink(missing_ok=True)
self._server = None
_logger.info("ipc_server_stopped")
|
Functions