Skip to content

hooks

hooks

Post-success hook execution for concert orchestration.

Executes hooks after successful job completion, enabling job chaining and concert orchestration where jobs can spawn other jobs.

Security Note:

This module intentionally supports shell command execution via run_command hooks. This is a DESIGN DECISION because: 1. Commands come from user-authored YAML config files (not user input at runtime) 2. Users explicitly opt-in by adding on_success hooks to their config 3. Shell features (pipes, redirects, env vars) are needed for real-world hooks 4. Marianne runs with the same permissions as the user who invokes it

The run_script hook type uses subprocess_exec (no shell) for cases where shell features aren't needed.

Classes

HookResult dataclass

HookResult(hook_type, description, success, exit_code=None, error_message=None, duration_seconds=0.0, output=None, chained_job_path=None, chained_job_workspace=None, log_path=None, chained_job_info=None)

Result of executing a single hook.

ConcertContext dataclass

ConcertContext(concert_id, chain_depth=0, parent_job_id=None, root_workspace=None, started_at=(lambda: now(UTC))(), total_jobs_run=0, total_sheets_completed=0, jobs_in_chain=list())

Context passed through concert job chains.

Tracks the concert's progress across multiple jobs to enforce safety limits and enable coordinated logging.

HookExecutor

HookExecutor(config, workspace, concert_context=None)

Executes post-success hooks and manages concert orchestration.

Responsible for: - Executing hooks after successful job completion - Managing job chaining (run_job hooks) - Enforcing concert safety limits - Logging hook execution

Hook execution runs in Marianne's Python process, not inside Claude CLI. This allows hooks to trigger new Marianne runs without recursion issues.

Source code in src/marianne/execution/hooks.py
def __init__(
    self,
    config: JobConfig,
    workspace: Path,
    concert_context: ConcertContext | None = None,
):
    self.config = config
    self.workspace = workspace
    self.concert = config.concert
    self.concert_context = concert_context

    # Track results
    self.hook_results: list[HookResult] = []
Functions
execute_hooks async
execute_hooks()

Execute all configured on_success hooks.

Returns list of HookResults for each hook executed. Stops early if a hook fails and on_failure="abort".

Source code in src/marianne/execution/hooks.py
async def execute_hooks(self) -> list[HookResult]:
    """Execute all configured on_success hooks.

    Returns list of HookResults for each hook executed.
    Stops early if a hook fails and on_failure="abort".
    """
    if not self.config.on_success:
        _logger.debug("no_hooks_configured", job_id=self.config.name)
        return []

    _logger.info(
        "hooks.starting",
        job_id=self.config.name,
        hook_count=len(self.config.on_success),
    )

    for i, hook in enumerate(self.config.on_success):
        _logger.info(
            "hook.executing",
            hook_index=i + 1,
            hook_type=hook.type,
            description=hook.description or "(no description)",
        )

        try:
            result = await self._execute_hook(hook)
            self.hook_results.append(result)

            if result.success:
                _logger.info(
                    "hook.succeeded",
                    hook_type=hook.type,
                    duration_seconds=round(result.duration_seconds, 2),
                )
            else:
                _logger.warning(
                    "hook.failed",
                    hook_type=hook.type,
                    error=result.error_message,
                    exit_code=result.exit_code,
                )

                # Check if we should abort remaining hooks
                if hook.on_failure == "abort":
                    _logger.warning(
                        "hooks.aborted",
                        reason="hook failure with on_failure=abort",
                        remaining_hooks=len(self.config.on_success) - i - 1,
                    )
                    break

                # Check concert-level abort
                if self.concert.abort_concert_on_hook_failure:
                    _logger.warning(
                        "concert.aborted",
                        reason="hook failure with abort_concert_on_hook_failure=true",
                    )
                    break

        except (OSError, TimeoutError) as e:
            result = HookResult(
                hook_type=hook.type,
                description=hook.description,
                success=False,
                error_message=f"Exception: {e!s}",
            )
            self.hook_results.append(result)
            _logger.error(
                "hook.exception",
                hook_type=hook.type,
                error=str(e),
            )

            if hook.on_failure == "abort":
                break

    _logger.info(
        "hooks.completed",
        job_id=self.config.name,
        hooks_run=len(self.hook_results),
        hooks_succeeded=sum(1 for r in self.hook_results if r.success),
        hooks_failed=sum(1 for r in self.hook_results if not r.success),
    )

    return self.hook_results
get_next_job_to_chain
get_next_job_to_chain()

Get the next job to chain from successful run_job hooks.

Returns (job_path, workspace) for the first successful run_job hook, or None if no chaining should occur.

Note: This is for the synchronous chaining mode where Marianne itself manages the concert. For async/background chaining, hooks execute the jobs directly.

Source code in src/marianne/execution/hooks.py
def get_next_job_to_chain(self) -> tuple[Path, Path | None] | None:
    """Get the next job to chain from successful run_job hooks.

    Returns (job_path, workspace) for the first successful run_job hook,
    or None if no chaining should occur.

    Note: This is for the synchronous chaining mode where Marianne
    itself manages the concert. For async/background chaining,
    hooks execute the jobs directly.
    """
    for result in self.hook_results:
        if (
            result.hook_type == "run_job"
            and result.success
            and result.chained_job_path
        ):
            return (result.chained_job_path, result.chained_job_workspace)
    return None

Functions

get_hook_log_path

get_hook_log_path(workspace, hook_type)

Construct log path for a hook execution.

Creates a timestamped log file in {workspace}/hooks/ for capturing detached hook output that would otherwise go to /dev/null.

Parameters:

Name Type Description Default
workspace str | Path | None

Job workspace directory. Returns None if not set.

required
hook_type str

Hook type identifier used in filename (e.g., "chain", "command").

required

Returns:

Type Description
Path | None

Path to the log file, or None if workspace is not available.

Source code in src/marianne/execution/hooks.py
def get_hook_log_path(workspace: str | Path | None, hook_type: str) -> Path | None:
    """Construct log path for a hook execution.

    Creates a timestamped log file in {workspace}/hooks/ for capturing
    detached hook output that would otherwise go to /dev/null.

    Args:
        workspace: Job workspace directory. Returns None if not set.
        hook_type: Hook type identifier used in filename (e.g., "chain", "command").

    Returns:
        Path to the log file, or None if workspace is not available.
    """
    if workspace is None:
        return None
    hook_log_dir = Path(workspace) / "hooks"
    hook_log_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
    return hook_log_dir / f"{hook_type}-{timestamp}.log"

expand_hook_variables

expand_hook_variables(template, *, workspace, job_id, sheet_count=None, for_shell=False)

Expand template variables in hook paths/commands.

Shared utility used by both HookExecutor (runner-side) and the daemon's _execute_hooks_task (daemon-side).

Known variables: {workspace}, {job_id}, {sheet_count}. Warns on unrecognized {var} patterns that remain after expansion.

Parameters:

Name Type Description Default
template str

Template string with {variable} placeholders.

required
workspace str | Path

Workspace path to substitute.

required
job_id str

Job identifier to substitute.

required
sheet_count int | None

Optional sheet count to substitute.

None
for_shell bool

When True, apply shlex.quote() to variable values before substitution. Use this when the expanded result will be passed to create_subprocess_shell. Do NOT use when the result will be used as a filesystem path (e.g., run_job hook's job_path).

False
Source code in src/marianne/execution/hooks.py
def expand_hook_variables(
    template: str,
    *,
    workspace: str | Path,
    job_id: str,
    sheet_count: int | None = None,
    for_shell: bool = False,
) -> str:
    """Expand template variables in hook paths/commands.

    Shared utility used by both HookExecutor (runner-side) and
    the daemon's _execute_hooks_task (daemon-side).

    Known variables: {workspace}, {job_id}, {sheet_count}.
    Warns on unrecognized {var} patterns that remain after expansion.

    Args:
        template: Template string with {variable} placeholders.
        workspace: Workspace path to substitute.
        job_id: Job identifier to substitute.
        sheet_count: Optional sheet count to substitute.
        for_shell: When True, apply shlex.quote() to variable values
            before substitution. Use this when the expanded result will
            be passed to create_subprocess_shell. Do NOT use when the
            result will be used as a filesystem path (e.g., run_job
            hook's job_path).
    """
    ws_str = str(workspace)
    jid_str = job_id
    if for_shell:
        ws_str = shlex.quote(ws_str)
        jid_str = shlex.quote(jid_str)

    result = template.replace("{workspace}", ws_str).replace("{job_id}", jid_str)
    if sheet_count is not None:
        sc_str = str(sheet_count)
        if for_shell:
            sc_str = shlex.quote(sc_str)
        result = result.replace("{sheet_count}", sc_str)
    # Warn about unrecognized template variables
    for match in re.finditer(r"\{(\w+)\}", result):
        var_name = match.group(1)
        if var_name not in _KNOWN_HOOK_VARS:
            _logger.warning(
                "unknown_template_variable",
                variable=var_name,
                template=template,
                known_vars=sorted(_KNOWN_HOOK_VARS),
            )
    return result