Skip to content

Index

bridge

Marianne Ollama Bridge components.

This package provides integration between Marianne and local Ollama models with MCP tool support. The main components are:

  • MCPProxyService: Manages MCP server subprocesses for tool execution
  • ContextOptimizer: Optimizes tool context for limited context windows (Sheet 5)
  • HybridRouter: Routes between Ollama and Claude based on complexity (Sheet 6)
  • BridgeCoordinator: Top-level orchestrator for bridge components (Sheet 7)

Classes

ContentBlock dataclass

ContentBlock(type, text=None, data=None, mime_type=None, uri=None)

Content block in tool results.

MCPConnection dataclass

MCPConnection(config, process, stdin, stdout, capabilities=None, tools=list(), last_tool_refresh=0.0)

Active connection to an MCP server.

MCPProxyService

MCPProxyService(servers, tool_cache_ttl=300, request_timeout=30.0)

MCP client that manages server subprocesses and executes tools.

This service acts as an MCP CLIENT - it spawns and communicates with MCP SERVERS. Marianne's existing MCP server (for external integration) is separate from this client functionality.

Lifecycle
  1. start() - Spawn server processes and perform initialize handshake
  2. list_tools() - Get available tools (cached with TTL)
  3. execute_tool() - Call a specific tool
  4. stop() - Clean shutdown of all servers

Initialize MCP proxy service.

Parameters:

Name Type Description Default
servers list[MCPServerConfig]

List of MCP server configurations to connect to

required
tool_cache_ttl int

Seconds before refreshing tool list from servers

300
request_timeout float

Default timeout for JSON-RPC requests

30.0
Source code in src/marianne/bridge/mcp_proxy.py
def __init__(
    self,
    servers: list[MCPServerConfig],
    tool_cache_ttl: int = 300,
    request_timeout: float = 30.0,
) -> None:
    """Initialize MCP proxy service.

    Args:
        servers: List of MCP server configurations to connect to
        tool_cache_ttl: Seconds before refreshing tool list from servers
        request_timeout: Default timeout for JSON-RPC requests
    """
    self.servers = servers
    self.tool_cache_ttl = tool_cache_ttl
    self.request_timeout = request_timeout

    # Active connections keyed by server name
    self._connections: dict[str, MCPConnection] = {}

    # Tool name to server name mapping for routing
    self._tool_routing: dict[str, str] = {}

    # Request ID counter
    self._request_id = 0
Functions
start async
start()

Start all configured MCP servers.

For each server: 1. Spawn subprocess with asyncio.create_subprocess_exec 2. Send initialize request (JSON-RPC) 3. Wait for initialize response 4. Send initialized notification 5. Call tools/list to cache available tools

Raises:

Type Description
RuntimeError

If no servers could be started (total failure).

Source code in src/marianne/bridge/mcp_proxy.py
async def start(self) -> None:
    """Start all configured MCP servers.

    For each server:
    1. Spawn subprocess with asyncio.create_subprocess_exec
    2. Send initialize request (JSON-RPC)
    3. Wait for initialize response
    4. Send initialized notification
    5. Call tools/list to cache available tools

    Raises:
        RuntimeError: If no servers could be started (total failure).
    """
    _logger.info("mcp_proxy_starting", server_count=len(self.servers))

    failed_servers: list[str] = []

    for config in self.servers:
        try:
            conn = await self._start_server(config)
            self._connections[config.name] = conn

            # Initialize handshake
            await self._initialize_server(conn)

            # Fetch initial tool list
            await self._refresh_tools(conn)

            _logger.info(
                "mcp_server_started",
                server=config.name,
                tool_count=len(conn.tools),
            )

        except Exception as e:
            _logger.error(
                "mcp_server_start_failed",
                server=config.name,
                error=str(e),
            )
            failed_servers.append(config.name)
            # Terminate orphaned subprocess before removing connection
            partial_conn = self._connections.pop(config.name, None)
            if partial_conn is not None and partial_conn.process.returncode is None:
                try:
                    partial_conn.process.kill()
                    await partial_conn.process.wait()
                except (ProcessLookupError, OSError):
                    pass
            # Continue with other servers - don't fail entirely

    if failed_servers:
        _logger.warning(
            "mcp_proxy_partial_startup",
            failed_servers=failed_servers,
            connected_servers=len(self._connections),
            total_configured=len(self.servers),
        )

    if self.servers and not self._connections:
        raise RuntimeError(
            f"All {len(self.servers)} MCP servers failed to start: "
            f"{', '.join(failed_servers)}"
        )

    _logger.info(
        "mcp_proxy_started",
        connected_servers=len(self._connections),
        total_tools=len(self._tool_routing),
    )
stop async
stop()

Stop all MCP server subprocesses.

Sends SIGTERM and waits for graceful shutdown, then SIGKILL if needed.

Source code in src/marianne/bridge/mcp_proxy.py
async def stop(self) -> None:
    """Stop all MCP server subprocesses.

    Sends SIGTERM and waits for graceful shutdown, then SIGKILL if needed.
    """
    _logger.info("mcp_proxy_stopping", server_count=len(self._connections))

    for name, conn in self._connections.items():
        try:
            # Try graceful termination
            conn.process.terminate()
            try:
                await asyncio.wait_for(conn.process.wait(), timeout=5.0)
            except TimeoutError:
                # Force kill
                conn.process.kill()
                await conn.process.wait()

            _logger.debug("mcp_server_stopped", server=name)

        except Exception as e:
            _logger.warning(
                "mcp_server_stop_error",
                server=name,
                error=str(e),
            )

    self._connections.clear()
    self._tool_routing.clear()
    _logger.info("mcp_proxy_stopped")
__aenter__ async
__aenter__()

Async context manager entry.

Source code in src/marianne/bridge/mcp_proxy.py
async def __aenter__(self) -> MCPProxyService:
    """Async context manager entry."""
    await self.start()
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in src/marianne/bridge/mcp_proxy.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: types.TracebackType | None,
) -> None:
    """Async context manager exit."""
    await self.stop()
list_tools async
list_tools()

Get all available tools from all servers.

Uses cached tool list if within TTL, otherwise refreshes.

Returns:

Type Description
list[MCPTool]

List of MCPTool objects from all connected servers

Source code in src/marianne/bridge/mcp_proxy.py
async def list_tools(self) -> list[MCPTool]:
    """Get all available tools from all servers.

    Uses cached tool list if within TTL, otherwise refreshes.

    Returns:
        List of MCPTool objects from all connected servers
    """
    current_time = time.monotonic()
    all_tools: list[MCPTool] = []

    for name, conn in self._connections.items():
        # Check if cache is stale
        if current_time - conn.last_tool_refresh > self.tool_cache_ttl:
            try:
                await self._refresh_tools(conn)
            except Exception as e:
                _logger.warning(
                    "tool_refresh_failed",
                    server=name,
                    error=str(e),
                )

        all_tools.extend(conn.tools)

    return all_tools
execute_tool async
execute_tool(tool_name, arguments)

Execute a tool by name.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to execute

required
arguments dict[str, Any]

Arguments to pass to the tool

required

Returns:

Type Description
ToolResult

ToolResult with content and error status

Raises:

Type Description
ToolNotFoundError

If tool doesn't exist

ToolExecutionTimeout

If execution times out

Source code in src/marianne/bridge/mcp_proxy.py
async def execute_tool(
    self,
    tool_name: str,
    arguments: dict[str, Any],
) -> ToolResult:
    """Execute a tool by name.

    Args:
        tool_name: Name of the tool to execute
        arguments: Arguments to pass to the tool

    Returns:
        ToolResult with content and error status

    Raises:
        ToolNotFoundError: If tool doesn't exist
        ToolExecutionTimeout: If execution times out
    """
    # Find which server owns this tool
    server_name = self._tool_routing.get(tool_name)
    if not server_name:
        raise ToolNotFoundError(f"Tool not found: {tool_name}")

    conn = self._connections.get(server_name)
    if not conn:
        raise ToolNotFoundError(f"Server not connected: {server_name}")

    _logger.debug(
        "tool_execute_start",
        tool=tool_name,
        server=server_name,
    )

    try:
        # Send tools/call request
        result = await self._send_jsonrpc(
            conn,
            "tools/call",
            {
                "name": tool_name,
                "arguments": arguments,
            },
            timeout=conn.config.timeout_seconds or self.request_timeout,
        )

        # Parse result
        return self._parse_tool_result(result)

    except TimeoutError as e:
        raise ToolExecutionTimeout(f"Tool {tool_name} timed out") from e

MCPTool dataclass

MCPTool(name, description, input_schema, server_name, annotations=None)

Represents an MCP tool definition.

ServerCapabilities dataclass

ServerCapabilities(tools=False, resources=False, prompts=False, logging=False)

Capabilities reported by MCP server.

ToolExecutionTimeout

Bases: Exception

Raised when tool execution times out.

ToolNotFoundError

Bases: Exception

Raised when requested tool doesn't exist.

ToolResult dataclass

ToolResult(content, is_error=False)

Result from tool execution.