Skip to content

Index

isolation

Isolation module for parallel job execution.

This module provides worktree-based isolation and process sandboxing for Marianne jobs, enabling multiple jobs to execute in parallel without interfering with each other's file modifications or processes.

Key components: - GitWorktreeManager: Manages git worktree lifecycle - BwrapSandbox: Bubblewrap process-level isolation - ResourceLimits: Optional resource caps for sandboxed processes - WorktreeInfo: Information about a created worktree - WorktreeResult: Result of worktree operations - Exception classes for error handling

Classes

BwrapSandbox

BwrapSandbox(workspace, shared_dirs, mcp_sockets, resource_limits)

Wraps subprocess execution in a bubblewrap namespace.

Given a workspace path, shared directories, MCP sockets, and optional resource limits, produces the bwrap command line that sets up isolation boundaries. The conductor uses this to wrap agent subprocess execution.

Usage::

sandbox = BwrapSandbox(
    workspace=Path("/tmp/agent-ws"),
    shared_dirs=[Path("/tmp/shared/specs")],
    mcp_sockets=[Path("/tmp/mzt/mcp/github.sock")],
    resource_limits=ResourceLimits(memory_limit_mb=512),
)
cmd = sandbox.wrap_command(["python", "agent_script.py"])
# cmd is ["bwrap", "--bind", "/tmp/agent-ws", ...]
Source code in src/marianne/isolation/sandbox.py
def __init__(
    self,
    workspace: Path,
    shared_dirs: list[Path],
    mcp_sockets: list[Path],
    resource_limits: ResourceLimits | None,
) -> None:
    if not isinstance(workspace, Path):
        raise TypeError(
            f"workspace must be a Path, got {type(workspace).__name__}"
        )
    self.workspace = workspace
    self.shared_dirs = shared_dirs
    self.mcp_sockets = mcp_sockets
    self.resource_limits = resource_limits
Functions
wrap_command
wrap_command(cmd)

Prepend bwrap args to a command.

Produces a complete bwrap invocation that isolates the inner command in a namespace with the configured bind mounts.

Parameters:

Name Type Description Default
cmd list[str]

The command to execute inside the sandbox.

required

Returns:

Type Description
list[str]

Full bwrap command line as a list of strings.

Source code in src/marianne/isolation/sandbox.py
def wrap_command(self, cmd: list[str]) -> list[str]:
    """Prepend bwrap args to a command.

    Produces a complete bwrap invocation that isolates the inner
    command in a namespace with the configured bind mounts.

    Args:
        cmd: The command to execute inside the sandbox.

    Returns:
        Full bwrap command line as a list of strings.
    """
    args: list[str] = ["bwrap"]

    # Workspace bind-mount (read-write)
    args.extend(["--bind", str(self.workspace), str(self.workspace)])

    # Standard system directories (read-only, tolerant of missing)
    for sys_dir in _SYSTEM_RO_DIRS:
        args.extend(["--ro-bind-try", sys_dir, sys_dir])

    # Proc and dev for basic functionality
    args.extend(["--proc", "/proc"])
    args.extend(["--dev", "/dev"])

    # Temporary directory
    args.extend(["--tmpfs", "/tmp"])

    # Shared directories (read-write for coordination)
    for shared_dir in self.shared_dirs:
        args.extend(["--bind", str(shared_dir), str(shared_dir)])

    # MCP socket forwarding (bind-mount each socket path)
    for socket_path in self.mcp_sockets:
        args.extend(["--bind", str(socket_path), str(socket_path)])

    # Namespace isolation
    args.append("--unshare-pid")
    args.append("--unshare-net")

    # Set working directory to workspace
    args.extend(["--chdir", str(self.workspace)])

    # Die with parent — sandbox dies if conductor dies
    args.append("--die-with-parent")

    # The inner command
    args.extend(cmd)

    _logger.debug(
        "bwrap_command_built",
        workspace=str(self.workspace),
        shared_dir_count=len(self.shared_dirs),
        mcp_socket_count=len(self.mcp_sockets),
        has_resource_limits=self.resource_limits is not None,
        inner_command=cmd[0] if cmd else "<empty>",
    )

    return args
is_available async staticmethod
is_available()

Check if bwrap is installed and runnable.

Returns:

Type Description
bool

True if bwrap --version exits successfully.

Source code in src/marianne/isolation/sandbox.py
@staticmethod
async def is_available() -> bool:
    """Check if bwrap is installed and runnable.

    Returns:
        True if ``bwrap --version`` exits successfully.
    """
    try:
        proc = await asyncio.create_subprocess_exec(
            "bwrap", "--version",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        await proc.wait()
        available = proc.returncode == 0
        _logger.debug("bwrap_availability_check", available=available)
        return available
    except FileNotFoundError:
        _logger.debug("bwrap_availability_check", available=False)
        return False

ResourceLimits dataclass

ResourceLimits(memory_limit_mb=None, cpu_quota_percent=None, pid_limit=None)

Optional resource caps for sandbox processes.

These are NOT enforced by bwrap itself — they are metadata consumed by the conductor's resource governance layer (systemd-run, prlimit). BwrapSandbox stores them for the caller to apply separately.

Attributes
memory_limit_mb class-attribute instance-attribute
memory_limit_mb = None

Maximum memory in MB. None means no cap.

cpu_quota_percent class-attribute instance-attribute
cpu_quota_percent = None

CPU quota as a percentage (e.g. 50 = 50%%). None means no cap.

pid_limit class-attribute instance-attribute
pid_limit = None

Maximum number of PIDs. None means no cap.

BranchExistsError

Bases: WorktreeError

Raised when target branch already exists.

GitWorktreeManager

GitWorktreeManager(repo_path)

Manages git worktree lifecycle for isolated execution.

This class handles all git worktree operations asynchronously, providing isolation for parallel job execution. Each job gets its own worktree with a dedicated branch, preventing race conditions when multiple AI agents modify code simultaneously.

Usage

manager = GitWorktreeManager(Path("/path/to/repo")) result = await manager.create_worktree( job_id="review-abc123", source_branch="main", ) if result.success: # Execute job in result.worktree.path ... await manager.remove_worktree(result.worktree.path)

Initialize the worktree manager.

Parameters:

Name Type Description Default
repo_path Path

Path to the git repository root.

required
Source code in src/marianne/isolation/worktree.py
def __init__(self, repo_path: Path) -> None:
    """Initialize the worktree manager.

    Args:
        repo_path: Path to the git repository root.
    """
    self._repo_path = repo_path.resolve()
    self._git_verified = False
Functions
is_git_repository
is_git_repository()

Check if the manager's base path is a git repository.

Returns:

Type Description
bool

True if base path is inside a git repository.

Source code in src/marianne/isolation/worktree.py
def is_git_repository(self) -> bool:
    """Check if the manager's base path is a git repository.

    Returns:
        True if base path is inside a git repository.
    """
    git_dir = self._repo_path / ".git"
    # .git can be a directory (normal repo) or a file (worktree itself)
    return git_dir.exists()
create_worktree_detached async
create_worktree_detached(job_id, source_ref=None, worktree_base=None, lock=True)

Create isolated worktree in detached HEAD mode.

Unlike create_worktree(), this creates a worktree without a branch, allowing multiple worktrees to start from the same commit without branch locking conflicts. This is the preferred method for parallel job execution.

Parameters:

Name Type Description Default
job_id str

Unique job identifier for worktree naming.

required
source_ref str | None

Commit/branch to base worktree on (default: HEAD).

None
worktree_base Path | None

Directory for worktrees (default: repo/.worktrees).

None
lock bool

Whether to lock worktree after creation (default: True).

True

Returns:

Name Type Description
WorktreeResult

WorktreeResult with worktree info on success, error on failure.

Note WorktreeResult

WorktreeInfo.branch will be "(detached)" for detached HEAD.

Raises:

Type Description
NotGitRepositoryError

If not in a git repository.

WorktreeCreationError

If worktree cannot be created.

Source code in src/marianne/isolation/worktree.py
async def create_worktree_detached(
    self,
    job_id: str,
    source_ref: str | None = None,
    worktree_base: Path | None = None,
    lock: bool = True,
) -> WorktreeResult:
    """Create isolated worktree in detached HEAD mode.

    Unlike create_worktree(), this creates a worktree without a branch,
    allowing multiple worktrees to start from the same commit without
    branch locking conflicts. This is the preferred method for parallel
    job execution.

    Args:
        job_id: Unique job identifier for worktree naming.
        source_ref: Commit/branch to base worktree on (default: HEAD).
        worktree_base: Directory for worktrees (default: repo/.worktrees).
        lock: Whether to lock worktree after creation (default: True).

    Returns:
        WorktreeResult with worktree info on success, error on failure.
        Note: WorktreeInfo.branch will be "(detached)" for detached HEAD.

    Raises:
        NotGitRepositoryError: If not in a git repository.
        WorktreeCreationError: If worktree cannot be created.
    """
    _logger.info(
        "creating_worktree_detached",
        job_id=job_id,
        source_ref=source_ref,
    )

    # Sanitize job_id to prevent path traversal
    if ".." in job_id or "/" in job_id or "\\" in job_id or "\0" in job_id:
        raise WorktreeCreationError(
            f"Invalid job_id '{job_id}': must not contain path separators or '..'"
        )

    # Verify prerequisites
    if not self.is_git_repository():
        error_msg = f"Not a git repository: {self._repo_path}"
        _logger.error("not_git_repo", path=str(self._repo_path))
        raise NotGitRepositoryError(error_msg)

    await self._verify_git_version()

    # Determine paths
    base_path = worktree_base or (self._repo_path / ".worktrees")
    worktree_path = base_path / job_id

    # Check if worktree path already exists
    if worktree_path.exists():
        error_msg = f"Worktree path already exists: {worktree_path}"
        _logger.error("worktree_path_exists", path=str(worktree_path))
        raise WorktreeCreationError(error_msg)

    # Create parent directory
    base_path.mkdir(parents=True, exist_ok=True)

    # Get the source commit SHA for tracking
    source_commit: str
    if source_ref:
        # Resolve source_ref to a commit
        try:
            _, stdout, _ = await self._run_git("rev-parse", "--short", source_ref)
            source_commit = stdout
        except WorktreeError as e:
            raise WorktreeCreationError(f"Cannot resolve ref '{source_ref}': {e}") from e
    else:
        source_commit = await self._get_current_commit()

    # Build git worktree add --detach command
    # Syntax: git worktree add --detach <path> [<commit>]
    cmd_args = ["worktree", "add", "--detach", str(worktree_path)]
    if source_ref:
        cmd_args.append(source_ref)

    try:
        await self._run_git(*cmd_args)
    except WorktreeError as e:
        raise WorktreeCreationError(str(e)) from e

    # Lock if requested
    locked = False
    if lock:
        lock_result = await self.lock_worktree(
            worktree_path,
            reason=f"Marianne job {job_id} in progress (pid={os.getpid()})",
        )
        locked = lock_result.success

    worktree_info = WorktreeInfo(
        path=worktree_path,
        branch="(detached)",  # Indicate detached HEAD mode
        commit=source_commit,
        locked=locked,
        job_id=job_id,
    )

    _logger.info(
        "worktree_created_detached",
        job_id=job_id,
        path=str(worktree_path),
        commit=source_commit,
        locked=locked,
    )

    return WorktreeResult(success=True, worktree=worktree_info, error=None)
create_worktree async
create_worktree(job_id, source_branch=None, branch_prefix='marianne', worktree_base=None, lock=True)

Create isolated worktree for job execution.

Creates a new git worktree with a dedicated branch for the job. The worktree provides an isolated working directory, index, and HEAD.

Parameters:

Name Type Description Default
job_id str

Unique job identifier for worktree and branch naming.

required
source_branch str | None

Branch to base worktree on (default: HEAD).

None
branch_prefix str

Prefix for branch name (default: "marianne").

'marianne'
worktree_base Path | None

Directory for worktrees (default: repo/.worktrees).

None
lock bool

Whether to lock worktree after creation (default: True).

True

Returns:

Type Description
WorktreeResult

WorktreeResult with worktree info on success, error on failure.

Source code in src/marianne/isolation/worktree.py
async def create_worktree(
    self,
    job_id: str,
    source_branch: str | None = None,
    branch_prefix: str = "marianne",
    worktree_base: Path | None = None,
    lock: bool = True,
) -> WorktreeResult:
    """Create isolated worktree for job execution.

    Creates a new git worktree with a dedicated branch for the job.
    The worktree provides an isolated working directory, index, and HEAD.

    Args:
        job_id: Unique job identifier for worktree and branch naming.
        source_branch: Branch to base worktree on (default: HEAD).
        branch_prefix: Prefix for branch name (default: "marianne").
        worktree_base: Directory for worktrees (default: repo/.worktrees).
        lock: Whether to lock worktree after creation (default: True).

    Returns:
        WorktreeResult with worktree info on success, error on failure.
    """
    _logger.info(
        "creating_worktree",
        job_id=job_id,
        source_branch=source_branch,
        branch_prefix=branch_prefix,
    )

    # Sanitize job_id to prevent path traversal
    if ".." in job_id or "/" in job_id or "\\" in job_id or "\0" in job_id:
        raise WorktreeCreationError(
            f"Invalid job_id '{job_id}': must not contain path separators or '..'"
        )

    # Verify prerequisites
    if not self.is_git_repository():
        error_msg = f"Not a git repository: {self._repo_path}"
        _logger.error("not_git_repo", path=str(self._repo_path))
        raise NotGitRepositoryError(error_msg)

    await self._verify_git_version()

    # Determine paths and branch name
    base_path = worktree_base or (self._repo_path / ".worktrees")
    branch_name = f"{branch_prefix}/{job_id}"
    worktree_path = base_path / job_id

    # Check if branch already exists
    if await self._branch_exists(branch_name):
        error_msg = f"Branch '{branch_name}' already exists"
        _logger.error("branch_exists", branch=branch_name)
        raise BranchExistsError(error_msg)

    # Check if worktree path already exists
    if worktree_path.exists():
        error_msg = f"Worktree path already exists: {worktree_path}"
        _logger.error("worktree_path_exists", path=str(worktree_path))
        raise WorktreeCreationError(error_msg)

    # Create parent directory
    base_path.mkdir(parents=True, exist_ok=True)

    # Build git worktree add command
    cmd_args = ["worktree", "add", "-b", branch_name, str(worktree_path)]
    if source_branch:
        cmd_args.append(source_branch)

    try:
        await self._run_git(*cmd_args)
    except WorktreeError as e:
        raise WorktreeCreationError(str(e)) from e

    # Get commit SHA
    commit = await self._get_current_commit()

    # Lock if requested
    locked = False
    if lock:
        lock_result = await self.lock_worktree(
            worktree_path,
            reason=f"Marianne job {job_id} in progress (pid={os.getpid()})",
        )
        locked = lock_result.success

    worktree_info = WorktreeInfo(
        path=worktree_path,
        branch=branch_name,
        commit=commit,
        locked=locked,
        job_id=job_id,
    )

    _logger.info(
        "worktree_created",
        job_id=job_id,
        path=str(worktree_path),
        branch=branch_name,
        commit=commit,
        locked=locked,
    )

    return WorktreeResult(success=True, worktree=worktree_info, error=None)
remove_worktree async
remove_worktree(worktree_path, force=True, delete_branch=False)

Remove worktree and optionally its branch.

Removes the worktree directory and cleans up git metadata. The associated branch is preserved by default for review/merge.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree to remove.

required
force bool

Force removal even if worktree is dirty (default: True).

True
delete_branch bool

Also delete the associated branch (default: False).

False

Returns:

Type Description
WorktreeResult

WorktreeResult indicating success or failure.

Source code in src/marianne/isolation/worktree.py
async def remove_worktree(
    self,
    worktree_path: Path,
    force: bool = True,
    delete_branch: bool = False,
) -> WorktreeResult:
    """Remove worktree and optionally its branch.

    Removes the worktree directory and cleans up git metadata.
    The associated branch is preserved by default for review/merge.

    Args:
        worktree_path: Path to the worktree to remove.
        force: Force removal even if worktree is dirty (default: True).
        delete_branch: Also delete the associated branch (default: False).

    Returns:
        WorktreeResult indicating success or failure.
    """
    worktree_path = worktree_path.resolve()
    _logger.info(
        "removing_worktree",
        path=str(worktree_path),
        force=force,
        delete_branch=delete_branch,
    )

    # If worktree doesn't exist, consider it a success (idempotent)
    if not worktree_path.exists():
        _logger.debug("worktree_already_removed", path=str(worktree_path))
        return WorktreeResult(success=True, worktree=None, error=None)

    # Get branch name before removal (for optional deletion)
    branch_name: str | None = None
    if delete_branch:
        try:
            _, stdout, _ = await self._run_git(
                "rev-parse", "--abbrev-ref", "HEAD",
                cwd=worktree_path,
                check=False,
            )
            branch_name = stdout
        except Exception as e:
            _logger.warning(
                "worktree.branch_lookup_failed",
                worktree_path=str(worktree_path),
                error=str(e),
            )

    # Check if locking PID is still alive — detached hooks may outlive the job
    try:
        _, lock_reason, _ = await self._run_git(
            "worktree", "list", "--porcelain", check=False,
        )
        # Parse lock reason to extract PID
        for block in lock_reason.split("\n\n"):
            if str(worktree_path) in block and "locked" in block:
                pid_match = re.search(r"pid=(\d+)", block)
                if pid_match:
                    lock_pid = int(pid_match.group(1))
                    try:
                        os.kill(lock_pid, 0)  # Check if process exists
                        _logger.warning(
                            "worktree.lock_process_still_alive",
                            path=str(worktree_path),
                            pid=lock_pid,
                        )
                    except OSError:
                        _logger.debug(
                            "worktree.lock_process_dead",
                            path=str(worktree_path),
                            pid=lock_pid,
                        )
    except (WorktreeError, ValueError):
        pass  # Can't check — proceed with cleanup

    # Unlock first (ignore errors - may already be unlocked)
    await self.unlock_worktree(worktree_path)

    # Remove worktree via git
    cmd_args = ["worktree", "remove"]
    if force:
        cmd_args.append("--force")
    cmd_args.append(str(worktree_path))

    try:
        await self._run_git(*cmd_args)
    except WorktreeError:
        # If git worktree remove fails, try manual cleanup
        _logger.warning(
            "git_worktree_remove_failed_trying_manual",
            path=str(worktree_path),
        )
        try:
            shutil.rmtree(worktree_path)
            # Run git worktree prune to clean metadata
            await self._run_git("worktree", "prune", check=False)
        except Exception as e:
            error_msg = f"Failed to remove worktree: {e}"
            _logger.error("worktree_removal_failed", path=str(worktree_path), error=str(e))
            raise WorktreeRemovalError(error_msg) from e

    # Optionally delete the branch
    if delete_branch and branch_name:
        try:
            await self._run_git("branch", "-D", branch_name, check=False)
            _logger.info("branch_deleted", branch=branch_name)
        except Exception as e:
            _logger.warning("branch_deletion_failed", branch=branch_name, error=str(e))

    _logger.info("worktree_removed", path=str(worktree_path))
    return WorktreeResult(success=True, worktree=None, error=None)
lock_worktree async
lock_worktree(worktree_path, reason=None)

Lock worktree to prevent accidental removal.

Locking prevents 'git worktree remove' and 'git worktree prune' from affecting this worktree.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree to lock.

required
reason str | None

Human-readable reason for the lock.

None

Returns:

Type Description
WorktreeResult

WorktreeResult indicating success or failure.

Source code in src/marianne/isolation/worktree.py
async def lock_worktree(
    self,
    worktree_path: Path,
    reason: str | None = None,
) -> WorktreeResult:
    """Lock worktree to prevent accidental removal.

    Locking prevents 'git worktree remove' and 'git worktree prune'
    from affecting this worktree.

    Args:
        worktree_path: Path to the worktree to lock.
        reason: Human-readable reason for the lock.

    Returns:
        WorktreeResult indicating success or failure.
    """
    worktree_path = worktree_path.resolve()
    _logger.debug("locking_worktree", path=str(worktree_path), reason=reason)

    cmd_args = ["worktree", "lock"]
    if reason:
        cmd_args.extend(["--reason", reason])
    cmd_args.append(str(worktree_path))

    try:
        await self._run_git(*cmd_args)
        _logger.info("worktree_locked", path=str(worktree_path))
        return WorktreeResult(success=True, worktree=None, error=None)
    except WorktreeError as e:
        if "already locked" in str(e).lower():
            # Check if the existing lock is stale (owner process dead)
            if await self._is_lock_stale(worktree_path):
                _logger.warning(
                    "worktree_stale_lock_detected",
                    path=str(worktree_path),
                    message="Lock owner process is dead, force-unlocking",
                )
                await self.unlock_worktree(worktree_path)
                # Retry the lock
                try:
                    await self._run_git(*cmd_args)
                    _logger.info("worktree_locked_after_stale_cleanup", path=str(worktree_path))
                    return WorktreeResult(success=True, worktree=None, error=None)
                except WorktreeError as retry_err:
                    raise WorktreeLockError(str(retry_err)) from retry_err
            _logger.debug("worktree_already_locked", path=str(worktree_path))
            return WorktreeResult(success=True, worktree=None, error=None)
        raise WorktreeLockError(str(e)) from e
unlock_worktree async
unlock_worktree(worktree_path)

Unlock a previously locked worktree.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree to unlock.

required

Returns:

Type Description
WorktreeResult

WorktreeResult indicating success or failure.

Note

Returns success if worktree is already unlocked (idempotent).

Source code in src/marianne/isolation/worktree.py
async def unlock_worktree(
    self,
    worktree_path: Path,
) -> WorktreeResult:
    """Unlock a previously locked worktree.

    Args:
        worktree_path: Path to the worktree to unlock.

    Returns:
        WorktreeResult indicating success or failure.

    Note:
        Returns success if worktree is already unlocked (idempotent).
    """
    worktree_path = worktree_path.resolve()
    _logger.debug("unlocking_worktree", path=str(worktree_path))

    try:
        await self._run_git("worktree", "unlock", str(worktree_path))
        _logger.info("worktree_unlocked", path=str(worktree_path))
        return WorktreeResult(success=True, worktree=None, error=None)
    except WorktreeError as e:
        # May already be unlocked or not exist
        error_str = str(e).lower()
        if "not locked" in error_str or "is not a working tree" in error_str:
            _logger.debug("worktree_not_locked_or_missing", path=str(worktree_path))
            return WorktreeResult(success=True, worktree=None, error=None)
        raise WorktreeLockError(str(e)) from e
list_worktrees async
list_worktrees(prefix_filter=None)

List all worktrees, optionally filtered by branch prefix.

Parameters:

Name Type Description Default
prefix_filter str | None

Only return worktrees with branches matching prefix. For example, "marianne" returns all marianne/* branches.

None

Returns:

Type Description
list[WorktreeInfo]

List of WorktreeInfo for matching worktrees.

Source code in src/marianne/isolation/worktree.py
async def list_worktrees(
    self,
    prefix_filter: str | None = None,
) -> list[WorktreeInfo]:
    """List all worktrees, optionally filtered by branch prefix.

    Args:
        prefix_filter: Only return worktrees with branches matching prefix.
                      For example, "marianne" returns all marianne/* branches.

    Returns:
        List of WorktreeInfo for matching worktrees.
    """
    _logger.debug("listing_worktrees", prefix_filter=prefix_filter)

    _, stdout, _ = await self._run_git("worktree", "list", "--porcelain")

    worktrees: list[WorktreeInfo] = []
    current_path: Path | None = None
    current_branch: str | None = None
    current_commit: str | None = None
    current_locked = False

    for line in stdout.split("\n"):
        line = line.strip()

        if line.startswith("worktree "):
            # New worktree entry
            if current_path and current_branch:
                # Save previous entry
                worktrees.append(WorktreeInfo(
                    path=current_path,
                    branch=current_branch,
                    commit=current_commit or "",
                    locked=current_locked,
                    job_id=self._extract_job_id(current_branch),
                ))
            current_path = Path(line[9:])
            current_branch = None
            current_commit = None
            current_locked = False

        elif line.startswith("HEAD "):
            current_commit = line[5:]

        elif line.startswith("branch refs/heads/"):
            current_branch = line[18:]

        elif line.startswith("locked"):
            current_locked = True

        elif line == "":
            # Entry separator
            pass

    # Don't forget the last entry
    if current_path and current_branch:
        worktrees.append(WorktreeInfo(
            path=current_path,
            branch=current_branch,
            commit=current_commit or "",
            locked=current_locked,
            job_id=self._extract_job_id(current_branch),
        ))

    # Apply prefix filter
    if prefix_filter:
        prefix = f"{prefix_filter}/"
        worktrees = [wt for wt in worktrees if wt.branch.startswith(prefix)]

    _logger.debug("worktrees_found", count=len(worktrees))
    return worktrees
prune_orphaned async
prune_orphaned(prefix_filter='marianne', dry_run=False)

Clean up orphaned worktree metadata.

Removes metadata for worktrees whose directories no longer exist. Only affects worktrees with branches matching the prefix filter.

Parameters:

Name Type Description Default
prefix_filter str

Only prune worktrees with matching branch prefix.

'marianne'
dry_run bool

If True, return what would be pruned without pruning.

False

Returns:

Type Description
list[str]

List of pruned worktree names (or would-be-pruned if dry_run).

Source code in src/marianne/isolation/worktree.py
async def prune_orphaned(
    self,
    prefix_filter: str = "marianne",
    dry_run: bool = False,
) -> list[str]:
    """Clean up orphaned worktree metadata.

    Removes metadata for worktrees whose directories no longer exist.
    Only affects worktrees with branches matching the prefix filter.

    Args:
        prefix_filter: Only prune worktrees with matching branch prefix.
        dry_run: If True, return what would be pruned without pruning.

    Returns:
        List of pruned worktree names (or would-be-pruned if dry_run).
    """
    _logger.info(
        "pruning_orphaned_worktrees",
        prefix_filter=prefix_filter,
        dry_run=dry_run,
    )

    # First, list worktrees to identify marianne-prefixed ones
    worktrees = await self.list_worktrees(prefix_filter=prefix_filter)

    # Find orphaned (path doesn't exist)
    orphaned: list[str] = []
    for wt in worktrees:
        if not wt.path.exists():
            orphaned.append(str(wt.path))

    if not dry_run and orphaned:
        # Run git worktree prune to clean metadata
        await self._run_git("worktree", "prune")
        _logger.info("orphaned_worktrees_pruned", count=len(orphaned))

    return orphaned
get_worktree_info async
get_worktree_info(worktree_path)

Get information about a specific worktree.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree.

required

Returns:

Type Description
WorktreeInfo | None

WorktreeInfo if worktree exists, None otherwise.

Source code in src/marianne/isolation/worktree.py
async def get_worktree_info(
    self,
    worktree_path: Path,
) -> WorktreeInfo | None:
    """Get information about a specific worktree.

    Args:
        worktree_path: Path to the worktree.

    Returns:
        WorktreeInfo if worktree exists, None otherwise.
    """
    worktree_path = worktree_path.resolve()
    worktrees = await self.list_worktrees()

    for wt in worktrees:
        if wt.path.resolve() == worktree_path:
            return wt

    return None

NotGitRepositoryError

Bases: WorktreeError

Raised when operation attempted outside git repository.

WorktreeCreationError

Bases: WorktreeError

Raised when worktree cannot be created.

WorktreeError

Bases: Exception

Base exception for worktree operations.

WorktreeInfo dataclass

WorktreeInfo(path, branch, commit, locked, job_id)

Information about a created worktree.

Attributes
path instance-attribute
path

Filesystem path to the worktree directory.

branch instance-attribute
branch

Git branch name checked out in the worktree.

commit instance-attribute
commit

Commit SHA the worktree is based on.

locked instance-attribute
locked

Whether the worktree is currently locked.

job_id instance-attribute
job_id

Marianne job ID associated with this worktree.

WorktreeLockError

Bases: WorktreeError

Raised when worktree lock/unlock fails.

WorktreeRemovalError

Bases: WorktreeError

Raised when worktree cannot be removed.

WorktreeResult dataclass

WorktreeResult(success, worktree, error)

Result of a worktree operation.

Attributes
success instance-attribute
success

Whether the operation succeeded.

worktree instance-attribute
worktree

Worktree info if operation succeeded, None otherwise.

error instance-attribute
error

Error message if operation failed, None otherwise.