Skip to content

jobs

jobs

Job control API endpoints.

Attributes

Classes

StartJobRequest

Bases: BaseModel

Request to start a new job.

Functions
validate_config_source
validate_config_source()

Validate that exactly one config source is provided.

Source code in src/marianne/dashboard/routes/jobs.py
def validate_config_source(self) -> None:
    """Validate that exactly one config source is provided."""
    if not self.config_content and not self.config_path:
        raise ValueError("Must provide either config_content or config_path")
    if self.config_content and self.config_path:
        raise ValueError("Cannot provide both config_content and config_path")

JobActionResponse

Bases: BaseModel

Response from job actions (pause/resume/cancel).

Functions
from_action_result classmethod
from_action_result(result)

Create from JobActionResult.

Source code in src/marianne/dashboard/routes/jobs.py
@classmethod
def from_action_result(cls, result: JobActionResult) -> JobActionResponse:
    """Create from JobActionResult."""
    return cls(
        success=result.success,
        job_id=result.job_id,
        status=result.status,
        message=result.message,
        via_daemon=result.via_daemon,
    )

StartJobResponse

Bases: BaseModel

Response from starting a job.

Functions
from_start_result classmethod
from_start_result(result)

Create from JobStartResult.

Source code in src/marianne/dashboard/routes/jobs.py
@classmethod
def from_start_result(cls, result: JobStartResult) -> StartJobResponse:
    """Create from JobStartResult."""
    return cls(
        success=True,
        job_id=result.job_id,
        job_name=result.job_name,
        status=result.status,
        workspace=str(result.workspace),
        total_sheets=result.total_sheets,
        pid=result.pid,
        message=f"Job {result.job_name} started successfully",
        via_daemon=result.via_daemon,
    )

Functions

start_job async

start_job(request)

Start a new Marianne job execution via the conductor.

Supports both inline YAML config content or path to config file.

Source code in src/marianne/dashboard/routes/jobs.py
@router.post("", response_model=StartJobResponse)
async def start_job(
    request: StartJobRequest,
) -> StartJobResponse:
    """Start a new Marianne job execution via the conductor.

    Supports both inline YAML config content or path to config file.
    """
    try:
        request.validate_config_source()

        config_path = Path(request.config_path) if request.config_path else None
        workspace = Path(request.workspace) if request.workspace else None

        service = _get_job_control_service()
        result = await service.start_job(
            config_path=config_path,
            config_content=request.config_content,
            workspace=workspace,
            start_sheet=request.start_sheet,
            self_healing=request.self_healing,
        )

        return StartJobResponse.from_start_result(result)

    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid job configuration") from None
    except FileNotFoundError:
        raise HTTPException(status_code=404, detail="Configuration file not found") from None
    except RuntimeError:
        raise HTTPException(status_code=503, detail="Conductor unavailable") from None

pause_job async

pause_job(job_id)

Pause a running job via the conductor.

Source code in src/marianne/dashboard/routes/jobs.py
@router.post("/{job_id}/pause", response_model=JobActionResponse)
async def pause_job(job_id: str) -> JobActionResponse:
    """Pause a running job via the conductor."""
    try:
        service = _get_job_control_service()
        result = await service.pause_job(job_id)
    except RuntimeError:
        raise HTTPException(status_code=503, detail="Conductor unavailable") from None

    if not result.success:
        if "not found" in result.message:
            raise HTTPException(status_code=404, detail=result.message)
        raise HTTPException(status_code=409, detail=result.message)

    return JobActionResponse.from_action_result(result)

resume_job async

resume_job(job_id)

Resume a paused job via the conductor.

Source code in src/marianne/dashboard/routes/jobs.py
@router.post("/{job_id}/resume", response_model=JobActionResponse)
async def resume_job(job_id: str) -> JobActionResponse:
    """Resume a paused job via the conductor."""
    try:
        service = _get_job_control_service()
        result = await service.resume_job(job_id)
    except RuntimeError:
        raise HTTPException(status_code=503, detail="Conductor unavailable") from None

    if not result.success:
        if "not found" in result.message:
            raise HTTPException(status_code=404, detail=result.message)
        raise HTTPException(status_code=409, detail=result.message)

    return JobActionResponse.from_action_result(result)

cancel_job async

cancel_job(job_id)

Cancel a running or paused job via the conductor.

Source code in src/marianne/dashboard/routes/jobs.py
@router.post("/{job_id}/cancel", response_model=JobActionResponse)
async def cancel_job(job_id: str) -> JobActionResponse:
    """Cancel a running or paused job via the conductor."""
    try:
        service = _get_job_control_service()
        result = await service.cancel_job(job_id)
    except RuntimeError:
        raise HTTPException(status_code=503, detail="Conductor unavailable") from None

    if not result.success:
        if "not found" in result.message:
            raise HTTPException(status_code=404, detail=result.message)
        raise HTTPException(status_code=409, detail=result.message)

    return JobActionResponse.from_action_result(result)

delete_job async

delete_job(job_id)

Delete a terminal job record from the conductor registry.

Source code in src/marianne/dashboard/routes/jobs.py
@router.delete("/{job_id}")
async def delete_job(job_id: str) -> dict[str, Any]:
    """Delete a terminal job record from the conductor registry."""
    try:
        service = _get_job_control_service()
        deleted = await service.delete_job(job_id)
    except RuntimeError:
        raise HTTPException(status_code=503, detail="Conductor unavailable") from None

    if not deleted:
        raise HTTPException(status_code=404, detail=f"Score not found: {job_id}")

    return {
        "success": True,
        "job_id": job_id,
        "message": f"Job {job_id} deleted successfully",
    }

get_sheet_details async

get_sheet_details(job_id, sheet_num)

Get detailed sheet information for a specific job and sheet.

Source code in src/marianne/dashboard/routes/jobs.py
@router.get("/{job_id}/sheets/{sheet_num}")
async def get_sheet_details(
    job_id: str,
    sheet_num: int,
) -> dict[str, Any]:
    """Get detailed sheet information for a specific job and sheet."""
    backend = get_state_backend()
    state = await backend.load(job_id)
    if state is None:
        raise HTTPException(status_code=404, detail=f"Score not found: {job_id}")

    sheet_state = state.sheets.get(sheet_num)
    if sheet_state is None:
        raise HTTPException(status_code=404, detail=f"Sheet {sheet_num} not found in job {job_id}")

    return {
        SHEET_NUM_KEY: sheet_state.sheet_num,
        "status": sheet_state.status.value,
        "started_at": sheet_state.started_at.isoformat() if sheet_state.started_at else None,
        "completed_at": sheet_state.completed_at.isoformat() if sheet_state.completed_at else None,
        "attempt_count": sheet_state.attempt_count,
        "exit_code": sheet_state.exit_code,
        "error_message": sheet_state.error_message,
        "error_category": sheet_state.error_category,
        "validation_passed": sheet_state.validation_passed,
        "validation_details": sheet_state.validation_details or [],
        "execution_duration_seconds": sheet_state.execution_duration_seconds,
        "exit_signal": sheet_state.exit_signal,
        "exit_reason": sheet_state.exit_reason,
        "completion_attempts": sheet_state.completion_attempts,
        "passed_validations": sheet_state.passed_validations,
        "failed_validations": sheet_state.failed_validations,
        "last_pass_percentage": sheet_state.last_pass_percentage,
        "execution_mode": sheet_state.execution_mode,
        "confidence_score": sheet_state.confidence_score,
        "outcome_category": sheet_state.outcome_category,
        "success_without_retry": sheet_state.success_without_retry,
        "stdout_tail": sheet_state.stdout_tail,
        "stderr_tail": sheet_state.stderr_tail,
        "output_truncated": sheet_state.output_truncated,
        "preflight_warnings": sheet_state.preflight_warnings,
        "applied_pattern_descriptions": sheet_state.applied_pattern_descriptions,
        "grounding_passed": sheet_state.grounding_passed,
        "grounding_confidence": sheet_state.grounding_confidence,
        "grounding_guidance": sheet_state.grounding_guidance,
        "input_tokens": sheet_state.input_tokens,
        "output_tokens": sheet_state.output_tokens,
        "estimated_cost": sheet_state.estimated_cost,
        "cost_confidence": sheet_state.cost_confidence,
    }

daemon_status async

daemon_status()

Check if the Marianne conductor is running and get its status.

Source code in src/marianne/dashboard/routes/jobs.py
@router.get("/daemon/status", tags=["Daemon"])
async def daemon_status() -> dict[str, Any]:
    """Check if the Marianne conductor is running and get its status."""
    client = DaemonClient(_resolve_socket_path(None))
    try:
        status = await client.status()
        return {
            "connected": True,
            "pid": status.pid,
            "uptime_seconds": status.uptime_seconds,
            "running_jobs": status.running_jobs,
            "total_jobs_active": status.total_jobs_active,
            "memory_usage_mb": status.memory_usage_mb,
            "version": status.version,
        }
    except DaemonNotRunningError:
        return {
            "connected": False,
            "message": "Daemon not running",
        }