Skip to content

process

process

Marianne daemon process.

Long-running orchestration service that manages job execution, resources, and cross-job coordination. Provides core functions for starting, stopping, and checking conductor status.

The entry point is mzt start/stop/restart via cli/commands/conductor.py.

Attributes

Classes

DaemonProcess

DaemonProcess(config)

Long-running Marianne daemon process.

Composes DaemonServer (IPC), JobManager (job tracking), and ResourceMonitor (limits) into a single lifecycle.

Source code in src/marianne/daemon/process.py
def __init__(self, config: DaemonConfig) -> None:
    self._config = config
    self._signal_received = asyncio.Event()
    self._pgroup = ProcessGroupManager()
    self._start_time = time.monotonic()
    self._signal_tasks: list[asyncio.Task[Any]] = []
    self._profiler: Any = None  # Set in run() step 8.5
    self._correlation: Any = None  # Set in run() step 8.6
Functions
run async
run()

Main daemon lifecycle: boot, serve, shutdown.

Source code in src/marianne/daemon/process.py
async def run(self) -> None:
    """Main daemon lifecycle: boot, serve, shutdown."""
    # 1. Write PID file
    _write_pid(self._config.pid_file)

    try:
        # 2. Set up process group (fixes issue #38 — orphan prevention)
        self._pgroup.setup()

        # 2.5. Reap orphaned backend children from previous runs.
        # Claude CLI spawns LSP/MCP servers that outlive it; if a
        # previous daemon or CLI session crashed, these accumulate.
        startup_reaped = self._pgroup.reap_orphaned_backends()
        if startup_reaped:
            _logger.info(
                "daemon.startup_orphan_reap",
                count=len(startup_reaped),
                pids=startup_reaped,
            )

        # 3. Create components — single ResourceMonitor shared
        #    between periodic monitoring and backpressure checks.
        from marianne.daemon.ipc.handler import RequestHandler
        from marianne.daemon.ipc.server import DaemonServer
        from marianne.daemon.manager import JobManager
        from marianne.daemon.monitor import ResourceMonitor

        # Create monitor first (without manager ref — set after).
        self._monitor = ResourceMonitor(
            self._config.resource_limits, pgroup=self._pgroup,
        )
        # Pass the single monitor into JobManager for backpressure.
        self._manager = JobManager(
            self._config,
            start_time=self._start_time,
            monitor=self._monitor,
            pgroup=self._pgroup,
        )
        # Now wire the manager back into the monitor for job counts.
        self._monitor.set_manager(self._manager)
        await self._manager.start()

        # Warn about unenforced / reserved config fields.
        # Each entry: (field, current_value, default, event, message)
        _unenforced_fields: list[tuple[str, object, object, str, str]] = [
            (
                "max_api_calls_per_minute",
                self._config.resource_limits.max_api_calls_per_minute, 60,
                "config.unenforced_rate_limit",
                "max_api_calls_per_minute is set but NOT YET ENFORCED. "
                "Rate limiting currently works through externally-reported "
                "events via RateLimitCoordinator.",
            ),
            (
                "state_backend_type",
                self._config.state_backend_type, "sqlite",
                "config.reserved_field_ignored",
                "state_backend_type is reserved for future use and has no "
                "effect. Daemon state persistence is not yet implemented.",
            ),
            (
                "state_db_path",
                str(self._config.state_db_path), "~/.marianne/daemon-state.db",
                "config.reserved_field_ignored",
                "state_db_path is reserved for future use and has no "
                "effect. Daemon state persistence is not yet implemented.",
            ),
            (
                "max_concurrent_sheets",
                self._config.max_concurrent_sheets, 10,
                "config.reserved_field_ignored",
                "max_concurrent_sheets is reserved for Phase 3 scheduler "
                "and has no effect. Jobs currently run monolithically "
                "via JobService.",
            ),
        ]
        for field_name, current, default, event, msg in _unenforced_fields:
            if current != default:
                _logger.warning(event, field=field_name, value=current, message=msg)

        handler = RequestHandler()

        # 4. Create health checker
        from marianne.daemon.health import HealthChecker

        health = HealthChecker(
            self._manager,
            self._monitor,
            start_time=self._start_time,
            learning_store=(
                self._manager._learning_hub.store
                if self._manager._learning_hub
                else None
            ),
        )

        # 5. Register RPC methods (adapt JobManager to handler signature)
        self._register_methods(handler, self._manager, health)

        # 6. Start server
        server = DaemonServer(
            self._config.socket.path,
            handler,
            permissions=self._config.socket.permissions,
            max_connections=self._config.socket.backlog,
        )
        await server.start()

        # 7. Install signal handlers (tracked to surface exceptions)
        loop = asyncio.get_running_loop()

        from collections.abc import Callable

        def _make_signal_callback(
            signum: signal.Signals,
        ) -> Callable[[], None]:
            """Create a signal callback that captures ``signum`` by value."""
            def _cb() -> None:
                self._track_signal_task(
                    asyncio.create_task(
                        self._handle_signal(signum, self._manager, server),
                    ),
                )
            return _cb

        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, _make_signal_callback(sig))

        def _sighup_callback() -> None:
            self._track_signal_task(
                asyncio.create_task(self._handle_sighup()),
            )

        loop.add_signal_handler(signal.SIGHUP, _sighup_callback)

        # 8. Start resource monitor
        interval = self._config.monitor_interval_seconds
        await self._monitor.start(interval_seconds=interval)

        # 8.4. Start entropy monitoring (v25 evolution)
        # Wire health checker callback into manager for job completion tracking
        self._manager._entropy_check_callback = health.on_job_completed
        await health.start_periodic_checks()

        # 8.5. Start profiler collector (after monitor, needs event bus)
        if self._config.profiler.enabled:
            from marianne.daemon.profiler.collector import ProfilerCollector

            self._profiler = ProfilerCollector(
                config=self._config.profiler,
                monitor=self._monitor,
                pgroup=self._pgroup,
                event_bus=self._manager.event_bus,
                manager=self._manager,
            )
            await self._profiler.start()

        # 8.6. Start correlation analyzer (after profiler, needs storage + learning hub)
        if self._config.profiler.enabled and self._profiler is not None:
            from marianne.daemon.profiler.correlation import CorrelationAnalyzer

            self._correlation = CorrelationAnalyzer(
                storage=self._profiler._storage,
                learning_hub=self._manager.learning_hub,
                config=self._config.profiler.correlation,
            )
            await self._correlation.start(self._manager.event_bus)

        # 9. Run until shutdown
        _logger.info(
            "daemon.started",
            pid=os.getpid(),
            socket=str(self._config.socket.path),
        )
        await self._manager.wait_for_shutdown()

        # 10. Cleanup — correlation, profiler, entropy, monitor
        if self._correlation is not None:
            await self._correlation.stop()
        if self._profiler is not None:
            await self._profiler.stop()
        await health.stop_periodic_checks()
        await self._monitor.stop()
        await server.stop()

        # 11. Kill remaining children in process group (issue #38)
        self._pgroup.kill_all_children()
        orphans = self._pgroup.cleanup_orphans()
        if orphans:
            _logger.info(
                "daemon.shutdown_orphans_cleaned",
                count=len(orphans),
            )

        _logger.info("daemon.stopped")
    finally:
        # Always remove PID file, even on crash
        self._config.pid_file.unlink(missing_ok=True)

Functions

start_conductor

start_conductor(config_file=None, foreground=False, log_level='info', profile=None, clone_name=None)

Start the Marianne conductor process.

Called by mzt start via cli/commands/conductor.py.

When clone_name is provided, the conductor runs with isolated paths (socket, PID file, state DB, log) so it can coexist with the production conductor. The base config is loaded normally, then clone-specific paths are applied on top.

Source code in src/marianne/daemon/process.py
def start_conductor(
    config_file: Path | None = None,
    foreground: bool = False,
    log_level: str = "info",
    profile: str | None = None,
    clone_name: str | None = None,
) -> None:
    """Start the Marianne conductor process.

    Called by ``mzt start`` via ``cli/commands/conductor.py``.

    When *clone_name* is provided, the conductor runs with isolated
    paths (socket, PID file, state DB, log) so it can coexist with the
    production conductor.  The base config is loaded normally, then
    clone-specific paths are applied on top.
    """
    config = _load_config(config_file, profile=profile)
    config.log_level = cast(Any, log_level)

    # Apply clone path overrides when running as a clone conductor
    if clone_name is not None:
        from marianne.daemon.clone import resolve_clone_paths

        clone_paths = resolve_clone_paths(clone_name)
        config_dict = config.model_dump()
        config_dict["socket"] = {"path": str(clone_paths.socket)}
        config_dict["pid_file"] = str(clone_paths.pid_file)
        config_dict["state_db_path"] = str(clone_paths.state_db)
        config = DaemonConfig.model_validate(config_dict)
        config.log_level = cast(Any, log_level)
        config.log_file = clone_paths.log_file

    pid = _read_pid(config.pid_file)
    if pid is not None:
        if _pid_alive(pid):
            label = (
                "clone conductor"
                if clone_name is not None
                else "Marianne conductor"
            )
            typer.echo(f"{label} is already running (PID {pid})")
            raise typer.Exit(1)
        else:
            # Stale PID file — process is dead, clean up before starting
            typer.echo(
                f"Cleaned up stale PID file "
                f"(PID {pid} is no longer running)"
            )
            config.pid_file.unlink(missing_ok=True)

    # Detect concurrent start race via advisory lock
    if config.pid_file.exists() and not config.pid_file.is_symlink():
        try:
            probe_fd = os.open(str(config.pid_file), os.O_RDONLY)
            try:
                fcntl.flock(probe_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
                fcntl.flock(probe_fd, fcntl.LOCK_UN)
            finally:
                os.close(probe_fd)
        except OSError:
            typer.echo("Marianne conductor is starting up (PID file locked)")
            raise typer.Exit(1) from None

    from marianne.core.logging import configure_logging

    log_fmt = "console" if foreground else "json"
    configure_logging(
        level=log_level.upper(),  # type: ignore[arg-type]
        format=log_fmt,  # type: ignore[arg-type]
        file_path=config.log_file,
        include_timestamps=True,
    )

    if not foreground:
        _daemonize(config)
    else:
        _logger.info(
            "daemon.starting",
            pid=os.getpid(),
            foreground=True,
            clone=clone_name,
        )

    daemon = DaemonProcess(config)
    asyncio.run(daemon.run())

stop_conductor

stop_conductor(pid_file=None, force=False, socket_path=None)

Stop the running Marianne conductor (daemon) process.

Called by mzt stop via cli/commands/conductor.py.

When jobs are actively running and --force is not set, warns the user and asks for confirmation before proceeding (#94). Force mode (--force) skips the safety check entirely.

Source code in src/marianne/daemon/process.py
def stop_conductor(
    pid_file: Path | None = None,
    force: bool = False,
    socket_path: Path | None = None,
) -> None:
    """Stop the running Marianne conductor (daemon) process.

    Called by ``mzt stop`` via ``cli/commands/conductor.py``.

    When jobs are actively running and ``--force`` is not set, warns
    the user and asks for confirmation before proceeding (#94).
    Force mode (``--force``) skips the safety check entirely.
    """
    resolved_pid_file = pid_file or DaemonConfig().pid_file
    pid = _read_pid(resolved_pid_file)
    if pid is None or not _pid_alive(pid):
        typer.echo("Marianne conductor is not running")
        resolved_pid_file.unlink(missing_ok=True)
        raise typer.Exit(1)

    # Safety guard: check for running jobs unless --force (#94)
    if not force:
        running_info = _check_running_jobs(socket_path)
        if running_info is not None and running_info["running_jobs"] > 0:
            count = running_info["running_jobs"]
            typer.echo(
                f"⚠ Warning: {count} job(s) currently running. "
                "Stopping the conductor will orphan active agents and "
                "may corrupt job state.",
            )
            if not typer.confirm("Proceed with stop?", default=False):
                typer.echo("Aborted.")
                raise typer.Exit(1)

    sig = signal.SIGKILL if force else signal.SIGTERM
    os.kill(pid, sig)
    typer.echo(
        f"Sent {'SIGKILL' if force else 'SIGTERM'} to Marianne conductor (PID {pid})",
    )

get_conductor_status

get_conductor_status(pid_file=None, socket_path=None)

Check Marianne conductor (daemon) status via health probes.

Called by mzt conductor-status via cli/commands/conductor.py.

Source code in src/marianne/daemon/process.py
def get_conductor_status(
    pid_file: Path | None = None,
    socket_path: Path | None = None,
) -> None:
    """Check Marianne conductor (daemon) status via health probes.

    Called by ``mzt conductor-status`` via ``cli/commands/conductor.py``.
    """
    _defaults = DaemonConfig()
    resolved_pid_file = pid_file or _defaults.pid_file
    resolved_socket = socket_path or _defaults.socket.path

    pid = _read_pid(resolved_pid_file)
    if pid is None or not _pid_alive(pid):
        typer.echo("Marianne conductor is not running")
        resolved_pid_file.unlink(missing_ok=True)
        raise typer.Exit(1)

    typer.echo(f"Marianne conductor is running (PID {pid})")

    from marianne.daemon.ipc.client import DaemonClient

    client = DaemonClient(resolved_socket)

    from marianne.daemon.exceptions import DaemonError

    async def _probe(method: str) -> dict[str, Any] | None:
        try:
            result: dict[str, Any] = await client.call(method)
            return result
        except (OSError, DaemonError) as e:
            _logger.info(f"probe.{method.split('.')[-1]}_failed", error=str(e))
            return None

    async def _get_health() -> tuple[
        dict[str, Any] | None,
        dict[str, Any] | None,
        dict[str, Any] | None,
    ]:
        health = await _probe("daemon.health")
        ready = await _probe("daemon.ready")
        daemon_info = await _probe("daemon.status")
        return health, ready, daemon_info

    try:
        health, ready, daemon_info = asyncio.run(_get_health())
    except (OSError, DaemonError):
        typer.echo("  (Could not connect to conductor socket for details)")
        return

    from rich.console import Console
    from rich.panel import Panel

    con = Console()

    # Build uptime string
    uptime_str = ""
    if health:
        uptime = health.get("uptime_seconds", 0)
        hours, remainder = divmod(int(uptime), 3600)
        minutes, seconds = divmod(remainder, 60)
        if hours >= 24:
            days = hours // 24
            remaining_hours = hours % 24
            uptime_str = f"{days}d {remaining_hours}h {minutes}m"
        else:
            uptime_str = f"{hours}h {minutes}m {seconds}s"

    # Build panel content
    lines: list[str] = []
    lines.append(f"PID {pid} [dim]\u00b7[/dim] Up {uptime_str}" if uptime_str else f"PID {pid}")

    if ready:
        status_str = ready.get("status", "unknown")
        accepting = ready.get("accepting_work", False)
        if status_str == "ready" and accepting:
            lines.append("[green]Ready[/green] [dim]\u00b7[/dim] Accepting work")
        elif status_str == "ready":
            lines.append("[yellow]Ready[/yellow] [dim]\u00b7[/dim] Not accepting work")
        else:
            lines.append(f"Status: {status_str}")

        running_jobs = ready.get("running_jobs", 0)
        lines.append("")
        if running_jobs > 0:
            lines.append(f"Jobs: [bold]{running_jobs}[/bold] running")
        else:
            lines.append("[dim]No active jobs[/dim]")

        # Resources with context
        lines.append("")
        lines.append("[bold]Resources[/bold]")
        memory_mb = ready.get("memory_mb", 0)
        memory_limit = ready.get("memory_limit_mb", 0)
        child_procs = ready.get("child_processes", 0)
        process_limit = ready.get("process_limit", 0)

        if memory_limit and memory_limit > 0:
            pct = int(memory_mb / memory_limit * 100)
            lines.append(f"  Memory: {memory_mb} MB ({pct}% of {memory_limit} MB)")
        else:
            lines.append(f"  Memory: {memory_mb} MB")

        if process_limit and process_limit > 0:
            lines.append(f"  Processes: {child_procs} / {process_limit}")
        else:
            lines.append(f"  Processes: {child_procs}")

        # Pressure indicator
        pressure = ready.get("pressure", "NONE")
        if isinstance(pressure, str) and pressure.upper() != "NONE":
            lines.append(f"  Pressure: [yellow]{pressure}[/yellow]")
        else:
            lines.append("  Pressure: [green]NONE[/green]")

    border_style = "green" if ready and ready.get("status") == "ready" else "yellow"
    con.print(Panel("\n".join(lines), title="Conductor", border_style=border_style))

    if daemon_info:
        version = daemon_info.get("version", "?")
        con.print(f"[dim]Version: {version}[/dim]")

wait_for_conductor_exit

wait_for_conductor_exit(pid_file=None, timeout=30.0)

Wait for the conductor process to exit.

Polls _pid_alive until the process is gone or timeout elapses. Cleans up the stale PID file once the process is confirmed dead.

Returns True if the process exited within the timeout.

Source code in src/marianne/daemon/process.py
def wait_for_conductor_exit(
    pid_file: Path | None = None,
    timeout: float = 30.0,
) -> bool:
    """Wait for the conductor process to exit.

    Polls ``_pid_alive`` until the process is gone or *timeout* elapses.
    Cleans up the stale PID file once the process is confirmed dead.

    Returns ``True`` if the process exited within the timeout.
    """
    resolved = pid_file or DaemonConfig().pid_file
    pid = _read_pid(resolved)
    if pid is None:
        return True

    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        if not _pid_alive(pid):
            resolved.unlink(missing_ok=True)
            return True
        time.sleep(0.1)
    return False