Skip to content

job_service

job_service

Core job execution service — decoupled from CLI.

Extracted from CLI commands to enable both CLI and daemon usage. The CLI becomes a thin wrapper around this service.

The service encapsulates the run/resume/pause/status lifecycle without any dependency on Rich, Typer, or CLI-level globals. All user-facing output goes through OutputProtocol.

Classes

JobService

JobService(*, output=None, global_learning_store=None, rate_limit_callback=None, event_callback=None, state_publish_callback=None, registry=None, token_warning_threshold=None, token_error_threshold=None, pgroup_manager=None)

Core job execution service.

Encapsulates the logic from CLI run/resume/pause commands into a reusable service that can be called from CLI, daemon, dashboard, or MCP server.

All user-facing output goes through the OutputProtocol abstraction, allowing different frontends (Rich console, structlog, SSE, null) to receive execution events without code changes.

Source code in src/marianne/daemon/job_service.py
def __init__(
    self,
    *,
    output: OutputProtocol | None = None,
    global_learning_store: GlobalLearningStore | None = None,
    rate_limit_callback: RateLimitCallback | None = None,
    event_callback: EventCallback | None = None,
    state_publish_callback: StatePublishCallback | None = None,
    registry: JobRegistry | None = None,
    token_warning_threshold: int | None = None,
    token_error_threshold: int | None = None,
    pgroup_manager: ProcessGroupManager | None = None,
) -> None:
    self._output = output or NullOutput()
    self._learning_store = global_learning_store
    self._rate_limit_callback = rate_limit_callback
    self._event_callback = event_callback
    self._state_publish_callback = state_publish_callback
    self._registry = registry
    self._token_warning_threshold = token_warning_threshold
    self._token_error_threshold = token_error_threshold
    self._pgroup_manager = pgroup_manager
    self._notification_consecutive_failures = 0
    self._notifications_degraded = False
Attributes
notifications_degraded property
notifications_degraded

Whether notification delivery is degraded.

Returns True after _NOTIFICATION_DEGRADED_THRESHOLD consecutive notification failures. Readable by HealthChecker.readiness() to signal degraded notification capability to operators.

Functions
pause_job async
pause_job(job_id, workspace)

Pause a running job via signal file.

Mirrors the logic in cli/commands/pause.py::_pause_job(): Creates a pause signal file that the runner polls at sheet boundaries.

Parameters:

Name Type Description Default
job_id str

Job identifier to pause.

required
workspace Path

Workspace directory containing job state.

required

Returns:

Type Description
bool

True if pause signal was created successfully.

Raises:

Type Description
JobSubmissionError

If job not found or not in a pausable state.

Source code in src/marianne/daemon/job_service.py
async def pause_job(self, job_id: str, workspace: Path) -> bool:
    """Pause a running job via signal file.

    Mirrors the logic in cli/commands/pause.py::_pause_job():
    Creates a pause signal file that the runner polls at sheet boundaries.

    Args:
        job_id: Job identifier to pause.
        workspace: Workspace directory containing job state.

    Returns:
        True if pause signal was created successfully.

    Raises:
        JobSubmissionError: If job not found or not in a pausable state.
    """
    found_state, found_backend = await self._find_job_state(job_id, workspace)
    await found_backend.close()

    if found_state.status != JobStatus.RUNNING:
        raise JobSubmissionError(
            f"Job '{job_id}' is {found_state.status.value}, not running. "
            "Only running jobs can be paused."
        )

    # Create pause signal file
    signal_file = workspace / f".marianne-pause-{job_id}"
    signal_file.touch()

    self._output.job_event(
        job_id,
        "pause_signal_sent",
        {
            "signal_file": str(signal_file),
        },
    )

    return True
get_status async
get_status(job_id, workspace, backend_type='sqlite')

Get job status from state backend.

Parameters:

Name Type Description Default
job_id str

Job identifier.

required
workspace Path

Workspace directory containing job state.

required
backend_type str

State backend type (default "sqlite" for daemon).

'sqlite'

Returns:

Type Description
CheckpointState | None

CheckpointState if found, None if job doesn't exist.

Source code in src/marianne/daemon/job_service.py
async def get_status(
    self,
    job_id: str,
    workspace: Path,
    backend_type: str = "sqlite",
) -> CheckpointState | None:
    """Get job status from state backend.

    Args:
        job_id: Job identifier.
        workspace: Workspace directory containing job state.
        backend_type: State backend type (default "sqlite" for daemon).

    Returns:
        CheckpointState if found, None if job doesn't exist.
    """
    state_backend = self._create_state_backend(workspace, backend_type)
    try:
        return await state_backend.load(job_id)
    finally:
        await state_backend.close()

Functions