Skip to content

worktree

worktree

Git worktree management for parallel job isolation.

This module provides the WorktreeManager implementation for creating, managing, and cleaning up git worktrees used for parallel job execution. Each worktree provides an isolated working directory, index, and HEAD.

Example

manager = GitWorktreeManager(repo_path) result = await manager.create_worktree("job-123", "main") if result.success: # Execute job in result.worktree.path ... await manager.remove_worktree(result.worktree.path)

Classes

WorktreeError

Bases: Exception

Base exception for worktree operations.

WorktreeCreationError

Bases: WorktreeError

Raised when worktree cannot be created.

WorktreeRemovalError

Bases: WorktreeError

Raised when worktree cannot be removed.

WorktreeLockError

Bases: WorktreeError

Raised when worktree lock/unlock fails.

BranchExistsError

Bases: WorktreeError

Raised when target branch already exists.

NotGitRepositoryError

Bases: WorktreeError

Raised when operation attempted outside git repository.

WorktreeInfo dataclass

WorktreeInfo(path, branch, commit, locked, job_id)

Information about a created worktree.

Attributes
path instance-attribute
path

Filesystem path to the worktree directory.

branch instance-attribute
branch

Git branch name checked out in the worktree.

commit instance-attribute
commit

Commit SHA the worktree is based on.

locked instance-attribute
locked

Whether the worktree is currently locked.

job_id instance-attribute
job_id

Marianne job ID associated with this worktree.

WorktreeResult dataclass

WorktreeResult(success, worktree, error)

Result of a worktree operation.

Attributes
success instance-attribute
success

Whether the operation succeeded.

worktree instance-attribute
worktree

Worktree info if operation succeeded, None otherwise.

error instance-attribute
error

Error message if operation failed, None otherwise.

GitWorktreeManager

GitWorktreeManager(repo_path)

Manages git worktree lifecycle for isolated execution.

This class handles all git worktree operations asynchronously, providing isolation for parallel job execution. Each job gets its own worktree with a dedicated branch, preventing race conditions when multiple AI agents modify code simultaneously.

Usage

manager = GitWorktreeManager(Path("/path/to/repo")) result = await manager.create_worktree( job_id="review-abc123", source_branch="main", ) if result.success: # Execute job in result.worktree.path ... await manager.remove_worktree(result.worktree.path)

Initialize the worktree manager.

Parameters:

Name Type Description Default
repo_path Path

Path to the git repository root.

required
Source code in src/marianne/isolation/worktree.py
def __init__(self, repo_path: Path) -> None:
    """Initialize the worktree manager.

    Args:
        repo_path: Path to the git repository root.
    """
    self._repo_path = repo_path.resolve()
    self._git_verified = False
Functions
is_git_repository
is_git_repository()

Check if the manager's base path is a git repository.

Returns:

Type Description
bool

True if base path is inside a git repository.

Source code in src/marianne/isolation/worktree.py
def is_git_repository(self) -> bool:
    """Check if the manager's base path is a git repository.

    Returns:
        True if base path is inside a git repository.
    """
    git_dir = self._repo_path / ".git"
    # .git can be a directory (normal repo) or a file (worktree itself)
    return git_dir.exists()
create_worktree_detached async
create_worktree_detached(job_id, source_ref=None, worktree_base=None, lock=True)

Create isolated worktree in detached HEAD mode.

Unlike create_worktree(), this creates a worktree without a branch, allowing multiple worktrees to start from the same commit without branch locking conflicts. This is the preferred method for parallel job execution.

Parameters:

Name Type Description Default
job_id str

Unique job identifier for worktree naming.

required
source_ref str | None

Commit/branch to base worktree on (default: HEAD).

None
worktree_base Path | None

Directory for worktrees (default: repo/.worktrees).

None
lock bool

Whether to lock worktree after creation (default: True).

True

Returns:

Name Type Description
WorktreeResult

WorktreeResult with worktree info on success, error on failure.

Note WorktreeResult

WorktreeInfo.branch will be "(detached)" for detached HEAD.

Raises:

Type Description
NotGitRepositoryError

If not in a git repository.

WorktreeCreationError

If worktree cannot be created.

Source code in src/marianne/isolation/worktree.py
async def create_worktree_detached(
    self,
    job_id: str,
    source_ref: str | None = None,
    worktree_base: Path | None = None,
    lock: bool = True,
) -> WorktreeResult:
    """Create isolated worktree in detached HEAD mode.

    Unlike create_worktree(), this creates a worktree without a branch,
    allowing multiple worktrees to start from the same commit without
    branch locking conflicts. This is the preferred method for parallel
    job execution.

    Args:
        job_id: Unique job identifier for worktree naming.
        source_ref: Commit/branch to base worktree on (default: HEAD).
        worktree_base: Directory for worktrees (default: repo/.worktrees).
        lock: Whether to lock worktree after creation (default: True).

    Returns:
        WorktreeResult with worktree info on success, error on failure.
        Note: WorktreeInfo.branch will be "(detached)" for detached HEAD.

    Raises:
        NotGitRepositoryError: If not in a git repository.
        WorktreeCreationError: If worktree cannot be created.
    """
    _logger.info(
        "creating_worktree_detached",
        job_id=job_id,
        source_ref=source_ref,
    )

    # Sanitize job_id to prevent path traversal
    if ".." in job_id or "/" in job_id or "\\" in job_id or "\0" in job_id:
        raise WorktreeCreationError(
            f"Invalid job_id '{job_id}': must not contain path separators or '..'"
        )

    # Verify prerequisites
    if not self.is_git_repository():
        error_msg = f"Not a git repository: {self._repo_path}"
        _logger.error("not_git_repo", path=str(self._repo_path))
        raise NotGitRepositoryError(error_msg)

    await self._verify_git_version()

    # Determine paths
    base_path = worktree_base or (self._repo_path / ".worktrees")
    worktree_path = base_path / job_id

    # Check if worktree path already exists
    if worktree_path.exists():
        error_msg = f"Worktree path already exists: {worktree_path}"
        _logger.error("worktree_path_exists", path=str(worktree_path))
        raise WorktreeCreationError(error_msg)

    # Create parent directory
    base_path.mkdir(parents=True, exist_ok=True)

    # Get the source commit SHA for tracking
    source_commit: str
    if source_ref:
        # Resolve source_ref to a commit
        try:
            _, stdout, _ = await self._run_git("rev-parse", "--short", source_ref)
            source_commit = stdout
        except WorktreeError as e:
            raise WorktreeCreationError(f"Cannot resolve ref '{source_ref}': {e}") from e
    else:
        source_commit = await self._get_current_commit()

    # Build git worktree add --detach command
    # Syntax: git worktree add --detach <path> [<commit>]
    cmd_args = ["worktree", "add", "--detach", str(worktree_path)]
    if source_ref:
        cmd_args.append(source_ref)

    try:
        await self._run_git(*cmd_args)
    except WorktreeError as e:
        raise WorktreeCreationError(str(e)) from e

    # Lock if requested
    locked = False
    if lock:
        lock_result = await self.lock_worktree(
            worktree_path,
            reason=f"Marianne job {job_id} in progress (pid={os.getpid()})",
        )
        locked = lock_result.success

    worktree_info = WorktreeInfo(
        path=worktree_path,
        branch="(detached)",  # Indicate detached HEAD mode
        commit=source_commit,
        locked=locked,
        job_id=job_id,
    )

    _logger.info(
        "worktree_created_detached",
        job_id=job_id,
        path=str(worktree_path),
        commit=source_commit,
        locked=locked,
    )

    return WorktreeResult(success=True, worktree=worktree_info, error=None)
create_worktree async
create_worktree(job_id, source_branch=None, branch_prefix='marianne', worktree_base=None, lock=True)

Create isolated worktree for job execution.

Creates a new git worktree with a dedicated branch for the job. The worktree provides an isolated working directory, index, and HEAD.

Parameters:

Name Type Description Default
job_id str

Unique job identifier for worktree and branch naming.

required
source_branch str | None

Branch to base worktree on (default: HEAD).

None
branch_prefix str

Prefix for branch name (default: "marianne").

'marianne'
worktree_base Path | None

Directory for worktrees (default: repo/.worktrees).

None
lock bool

Whether to lock worktree after creation (default: True).

True

Returns:

Type Description
WorktreeResult

WorktreeResult with worktree info on success, error on failure.

Source code in src/marianne/isolation/worktree.py
async def create_worktree(
    self,
    job_id: str,
    source_branch: str | None = None,
    branch_prefix: str = "marianne",
    worktree_base: Path | None = None,
    lock: bool = True,
) -> WorktreeResult:
    """Create isolated worktree for job execution.

    Creates a new git worktree with a dedicated branch for the job.
    The worktree provides an isolated working directory, index, and HEAD.

    Args:
        job_id: Unique job identifier for worktree and branch naming.
        source_branch: Branch to base worktree on (default: HEAD).
        branch_prefix: Prefix for branch name (default: "marianne").
        worktree_base: Directory for worktrees (default: repo/.worktrees).
        lock: Whether to lock worktree after creation (default: True).

    Returns:
        WorktreeResult with worktree info on success, error on failure.
    """
    _logger.info(
        "creating_worktree",
        job_id=job_id,
        source_branch=source_branch,
        branch_prefix=branch_prefix,
    )

    # Sanitize job_id to prevent path traversal
    if ".." in job_id or "/" in job_id or "\\" in job_id or "\0" in job_id:
        raise WorktreeCreationError(
            f"Invalid job_id '{job_id}': must not contain path separators or '..'"
        )

    # Verify prerequisites
    if not self.is_git_repository():
        error_msg = f"Not a git repository: {self._repo_path}"
        _logger.error("not_git_repo", path=str(self._repo_path))
        raise NotGitRepositoryError(error_msg)

    await self._verify_git_version()

    # Determine paths and branch name
    base_path = worktree_base or (self._repo_path / ".worktrees")
    branch_name = f"{branch_prefix}/{job_id}"
    worktree_path = base_path / job_id

    # Check if branch already exists
    if await self._branch_exists(branch_name):
        error_msg = f"Branch '{branch_name}' already exists"
        _logger.error("branch_exists", branch=branch_name)
        raise BranchExistsError(error_msg)

    # Check if worktree path already exists
    if worktree_path.exists():
        error_msg = f"Worktree path already exists: {worktree_path}"
        _logger.error("worktree_path_exists", path=str(worktree_path))
        raise WorktreeCreationError(error_msg)

    # Create parent directory
    base_path.mkdir(parents=True, exist_ok=True)

    # Build git worktree add command
    cmd_args = ["worktree", "add", "-b", branch_name, str(worktree_path)]
    if source_branch:
        cmd_args.append(source_branch)

    try:
        await self._run_git(*cmd_args)
    except WorktreeError as e:
        raise WorktreeCreationError(str(e)) from e

    # Get commit SHA
    commit = await self._get_current_commit()

    # Lock if requested
    locked = False
    if lock:
        lock_result = await self.lock_worktree(
            worktree_path,
            reason=f"Marianne job {job_id} in progress (pid={os.getpid()})",
        )
        locked = lock_result.success

    worktree_info = WorktreeInfo(
        path=worktree_path,
        branch=branch_name,
        commit=commit,
        locked=locked,
        job_id=job_id,
    )

    _logger.info(
        "worktree_created",
        job_id=job_id,
        path=str(worktree_path),
        branch=branch_name,
        commit=commit,
        locked=locked,
    )

    return WorktreeResult(success=True, worktree=worktree_info, error=None)
remove_worktree async
remove_worktree(worktree_path, force=True, delete_branch=False)

Remove worktree and optionally its branch.

Removes the worktree directory and cleans up git metadata. The associated branch is preserved by default for review/merge.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree to remove.

required
force bool

Force removal even if worktree is dirty (default: True).

True
delete_branch bool

Also delete the associated branch (default: False).

False

Returns:

Type Description
WorktreeResult

WorktreeResult indicating success or failure.

Source code in src/marianne/isolation/worktree.py
async def remove_worktree(
    self,
    worktree_path: Path,
    force: bool = True,
    delete_branch: bool = False,
) -> WorktreeResult:
    """Remove worktree and optionally its branch.

    Removes the worktree directory and cleans up git metadata.
    The associated branch is preserved by default for review/merge.

    Args:
        worktree_path: Path to the worktree to remove.
        force: Force removal even if worktree is dirty (default: True).
        delete_branch: Also delete the associated branch (default: False).

    Returns:
        WorktreeResult indicating success or failure.
    """
    worktree_path = worktree_path.resolve()
    _logger.info(
        "removing_worktree",
        path=str(worktree_path),
        force=force,
        delete_branch=delete_branch,
    )

    # If worktree doesn't exist, consider it a success (idempotent)
    if not worktree_path.exists():
        _logger.debug("worktree_already_removed", path=str(worktree_path))
        return WorktreeResult(success=True, worktree=None, error=None)

    # Get branch name before removal (for optional deletion)
    branch_name: str | None = None
    if delete_branch:
        try:
            _, stdout, _ = await self._run_git(
                "rev-parse", "--abbrev-ref", "HEAD",
                cwd=worktree_path,
                check=False,
            )
            branch_name = stdout
        except Exception as e:
            _logger.warning(
                "worktree.branch_lookup_failed",
                worktree_path=str(worktree_path),
                error=str(e),
            )

    # Check if locking PID is still alive — detached hooks may outlive the job
    try:
        _, lock_reason, _ = await self._run_git(
            "worktree", "list", "--porcelain", check=False,
        )
        # Parse lock reason to extract PID
        for block in lock_reason.split("\n\n"):
            if str(worktree_path) in block and "locked" in block:
                pid_match = re.search(r"pid=(\d+)", block)
                if pid_match:
                    lock_pid = int(pid_match.group(1))
                    try:
                        os.kill(lock_pid, 0)  # Check if process exists
                        _logger.warning(
                            "worktree.lock_process_still_alive",
                            path=str(worktree_path),
                            pid=lock_pid,
                        )
                    except OSError:
                        _logger.debug(
                            "worktree.lock_process_dead",
                            path=str(worktree_path),
                            pid=lock_pid,
                        )
    except (WorktreeError, ValueError):
        pass  # Can't check — proceed with cleanup

    # Unlock first (ignore errors - may already be unlocked)
    await self.unlock_worktree(worktree_path)

    # Remove worktree via git
    cmd_args = ["worktree", "remove"]
    if force:
        cmd_args.append("--force")
    cmd_args.append(str(worktree_path))

    try:
        await self._run_git(*cmd_args)
    except WorktreeError:
        # If git worktree remove fails, try manual cleanup
        _logger.warning(
            "git_worktree_remove_failed_trying_manual",
            path=str(worktree_path),
        )
        try:
            shutil.rmtree(worktree_path)
            # Run git worktree prune to clean metadata
            await self._run_git("worktree", "prune", check=False)
        except Exception as e:
            error_msg = f"Failed to remove worktree: {e}"
            _logger.error("worktree_removal_failed", path=str(worktree_path), error=str(e))
            raise WorktreeRemovalError(error_msg) from e

    # Optionally delete the branch
    if delete_branch and branch_name:
        try:
            await self._run_git("branch", "-D", branch_name, check=False)
            _logger.info("branch_deleted", branch=branch_name)
        except Exception as e:
            _logger.warning("branch_deletion_failed", branch=branch_name, error=str(e))

    _logger.info("worktree_removed", path=str(worktree_path))
    return WorktreeResult(success=True, worktree=None, error=None)
lock_worktree async
lock_worktree(worktree_path, reason=None)

Lock worktree to prevent accidental removal.

Locking prevents 'git worktree remove' and 'git worktree prune' from affecting this worktree.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree to lock.

required
reason str | None

Human-readable reason for the lock.

None

Returns:

Type Description
WorktreeResult

WorktreeResult indicating success or failure.

Source code in src/marianne/isolation/worktree.py
async def lock_worktree(
    self,
    worktree_path: Path,
    reason: str | None = None,
) -> WorktreeResult:
    """Lock worktree to prevent accidental removal.

    Locking prevents 'git worktree remove' and 'git worktree prune'
    from affecting this worktree.

    Args:
        worktree_path: Path to the worktree to lock.
        reason: Human-readable reason for the lock.

    Returns:
        WorktreeResult indicating success or failure.
    """
    worktree_path = worktree_path.resolve()
    _logger.debug("locking_worktree", path=str(worktree_path), reason=reason)

    cmd_args = ["worktree", "lock"]
    if reason:
        cmd_args.extend(["--reason", reason])
    cmd_args.append(str(worktree_path))

    try:
        await self._run_git(*cmd_args)
        _logger.info("worktree_locked", path=str(worktree_path))
        return WorktreeResult(success=True, worktree=None, error=None)
    except WorktreeError as e:
        if "already locked" in str(e).lower():
            # Check if the existing lock is stale (owner process dead)
            if await self._is_lock_stale(worktree_path):
                _logger.warning(
                    "worktree_stale_lock_detected",
                    path=str(worktree_path),
                    message="Lock owner process is dead, force-unlocking",
                )
                await self.unlock_worktree(worktree_path)
                # Retry the lock
                try:
                    await self._run_git(*cmd_args)
                    _logger.info("worktree_locked_after_stale_cleanup", path=str(worktree_path))
                    return WorktreeResult(success=True, worktree=None, error=None)
                except WorktreeError as retry_err:
                    raise WorktreeLockError(str(retry_err)) from retry_err
            _logger.debug("worktree_already_locked", path=str(worktree_path))
            return WorktreeResult(success=True, worktree=None, error=None)
        raise WorktreeLockError(str(e)) from e
unlock_worktree async
unlock_worktree(worktree_path)

Unlock a previously locked worktree.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree to unlock.

required

Returns:

Type Description
WorktreeResult

WorktreeResult indicating success or failure.

Note

Returns success if worktree is already unlocked (idempotent).

Source code in src/marianne/isolation/worktree.py
async def unlock_worktree(
    self,
    worktree_path: Path,
) -> WorktreeResult:
    """Unlock a previously locked worktree.

    Args:
        worktree_path: Path to the worktree to unlock.

    Returns:
        WorktreeResult indicating success or failure.

    Note:
        Returns success if worktree is already unlocked (idempotent).
    """
    worktree_path = worktree_path.resolve()
    _logger.debug("unlocking_worktree", path=str(worktree_path))

    try:
        await self._run_git("worktree", "unlock", str(worktree_path))
        _logger.info("worktree_unlocked", path=str(worktree_path))
        return WorktreeResult(success=True, worktree=None, error=None)
    except WorktreeError as e:
        # May already be unlocked or not exist
        error_str = str(e).lower()
        if "not locked" in error_str or "is not a working tree" in error_str:
            _logger.debug("worktree_not_locked_or_missing", path=str(worktree_path))
            return WorktreeResult(success=True, worktree=None, error=None)
        raise WorktreeLockError(str(e)) from e
list_worktrees async
list_worktrees(prefix_filter=None)

List all worktrees, optionally filtered by branch prefix.

Parameters:

Name Type Description Default
prefix_filter str | None

Only return worktrees with branches matching prefix. For example, "marianne" returns all marianne/* branches.

None

Returns:

Type Description
list[WorktreeInfo]

List of WorktreeInfo for matching worktrees.

Source code in src/marianne/isolation/worktree.py
async def list_worktrees(
    self,
    prefix_filter: str | None = None,
) -> list[WorktreeInfo]:
    """List all worktrees, optionally filtered by branch prefix.

    Args:
        prefix_filter: Only return worktrees with branches matching prefix.
                      For example, "marianne" returns all marianne/* branches.

    Returns:
        List of WorktreeInfo for matching worktrees.
    """
    _logger.debug("listing_worktrees", prefix_filter=prefix_filter)

    _, stdout, _ = await self._run_git("worktree", "list", "--porcelain")

    worktrees: list[WorktreeInfo] = []
    current_path: Path | None = None
    current_branch: str | None = None
    current_commit: str | None = None
    current_locked = False

    for line in stdout.split("\n"):
        line = line.strip()

        if line.startswith("worktree "):
            # New worktree entry
            if current_path and current_branch:
                # Save previous entry
                worktrees.append(WorktreeInfo(
                    path=current_path,
                    branch=current_branch,
                    commit=current_commit or "",
                    locked=current_locked,
                    job_id=self._extract_job_id(current_branch),
                ))
            current_path = Path(line[9:])
            current_branch = None
            current_commit = None
            current_locked = False

        elif line.startswith("HEAD "):
            current_commit = line[5:]

        elif line.startswith("branch refs/heads/"):
            current_branch = line[18:]

        elif line.startswith("locked"):
            current_locked = True

        elif line == "":
            # Entry separator
            pass

    # Don't forget the last entry
    if current_path and current_branch:
        worktrees.append(WorktreeInfo(
            path=current_path,
            branch=current_branch,
            commit=current_commit or "",
            locked=current_locked,
            job_id=self._extract_job_id(current_branch),
        ))

    # Apply prefix filter
    if prefix_filter:
        prefix = f"{prefix_filter}/"
        worktrees = [wt for wt in worktrees if wt.branch.startswith(prefix)]

    _logger.debug("worktrees_found", count=len(worktrees))
    return worktrees
prune_orphaned async
prune_orphaned(prefix_filter='marianne', dry_run=False)

Clean up orphaned worktree metadata.

Removes metadata for worktrees whose directories no longer exist. Only affects worktrees with branches matching the prefix filter.

Parameters:

Name Type Description Default
prefix_filter str

Only prune worktrees with matching branch prefix.

'marianne'
dry_run bool

If True, return what would be pruned without pruning.

False

Returns:

Type Description
list[str]

List of pruned worktree names (or would-be-pruned if dry_run).

Source code in src/marianne/isolation/worktree.py
async def prune_orphaned(
    self,
    prefix_filter: str = "marianne",
    dry_run: bool = False,
) -> list[str]:
    """Clean up orphaned worktree metadata.

    Removes metadata for worktrees whose directories no longer exist.
    Only affects worktrees with branches matching the prefix filter.

    Args:
        prefix_filter: Only prune worktrees with matching branch prefix.
        dry_run: If True, return what would be pruned without pruning.

    Returns:
        List of pruned worktree names (or would-be-pruned if dry_run).
    """
    _logger.info(
        "pruning_orphaned_worktrees",
        prefix_filter=prefix_filter,
        dry_run=dry_run,
    )

    # First, list worktrees to identify marianne-prefixed ones
    worktrees = await self.list_worktrees(prefix_filter=prefix_filter)

    # Find orphaned (path doesn't exist)
    orphaned: list[str] = []
    for wt in worktrees:
        if not wt.path.exists():
            orphaned.append(str(wt.path))

    if not dry_run and orphaned:
        # Run git worktree prune to clean metadata
        await self._run_git("worktree", "prune")
        _logger.info("orphaned_worktrees_pruned", count=len(orphaned))

    return orphaned
get_worktree_info async
get_worktree_info(worktree_path)

Get information about a specific worktree.

Parameters:

Name Type Description Default
worktree_path Path

Path to the worktree.

required

Returns:

Type Description
WorktreeInfo | None

WorktreeInfo if worktree exists, None otherwise.

Source code in src/marianne/isolation/worktree.py
async def get_worktree_info(
    self,
    worktree_path: Path,
) -> WorktreeInfo | None:
    """Get information about a specific worktree.

    Args:
        worktree_path: Path to the worktree.

    Returns:
        WorktreeInfo if worktree exists, None otherwise.
    """
    worktree_path = worktree_path.resolve()
    worktrees = await self.list_worktrees()

    for wt in worktrees:
        if wt.path.resolve() == worktree_path:
            return wt

    return None

Functions