Skip to content

stream

stream

Server-Sent Events (SSE) streaming API endpoints.

Job status streams use the DaemonEventBridge when a conductor is available (real-time via daemon.monitor.stream), falling back to state-backend polling otherwise.

Classes

LogDownloadInfo

Bases: BaseModel

Information about log file for download.

Functions

stream_job_status async

stream_job_status(job_id, poll_interval=2.0, backend=Depends(get_state_backend))

Stream real-time job status updates via Server-Sent Events.

Uses the conductor event bridge for real-time updates when available, falling back to state-backend polling otherwise.

Source code in src/marianne/dashboard/routes/stream.py
@router.get("/{job_id}/stream")
async def stream_job_status(
    job_id: str,
    poll_interval: float = 2.0,
    backend: StateBackend = Depends(get_state_backend),
) -> StreamingResponse:
    """Stream real-time job status updates via Server-Sent Events.

    Uses the conductor event bridge for real-time updates when available,
    falling back to state-backend polling otherwise.
    """
    if poll_interval < 0.1 or poll_interval > 30.0:
        raise HTTPException(
            status_code=400,
            detail="Poll interval must be between 0.1 and 30.0 seconds",
        )

    state = await backend.load(job_id)
    if state is None:
        raise HTTPException(status_code=404, detail=f"Score not found: {job_id}")

    bridge = _get_event_bridge_safe()

    return StreamingResponse(
        _job_status_stream(job_id, backend, bridge, poll_interval),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

stream_logs async

stream_logs(job_id, follow=True, tail_lines=DEFAULT_TAIL_LINES, backend=Depends(get_state_backend))

Stream job logs via Server-Sent Events.

Parameters:

Name Type Description Default
job_id str

Job identifier

required
follow bool

If true, tail the log file for new content

True
tail_lines int

Number of recent lines to send initially (max MAX_TAIL_LINES)

DEFAULT_TAIL_LINES
backend StateBackend

State backend (injected)

Depends(get_state_backend)

Returns:

Type Description
StreamingResponse

SSE stream of log lines

Raises:

Type Description
HTTPException

404 if job/logs not found, 400 if invalid parameters

Source code in src/marianne/dashboard/routes/stream.py
@router.get("/{job_id}/logs")
async def stream_logs(
    job_id: str,
    follow: bool = True,
    tail_lines: int = DEFAULT_TAIL_LINES,
    backend: StateBackend = Depends(get_state_backend),
) -> StreamingResponse:
    """Stream job logs via Server-Sent Events.

    Args:
        job_id: Job identifier
        follow: If true, tail the log file for new content
        tail_lines: Number of recent lines to send initially (max MAX_TAIL_LINES)
        backend: State backend (injected)

    Returns:
        SSE stream of log lines

    Raises:
        HTTPException: 404 if job/logs not found, 400 if invalid parameters
    """
    if tail_lines < 0 or tail_lines > MAX_TAIL_LINES:
        raise HTTPException(
            status_code=400, detail=f"tail_lines must be between 0 and {MAX_TAIL_LINES}"
        )

    log_file = await _get_job_log_file(job_id, backend)

    return StreamingResponse(
        _log_stream(log_file, follow, tail_lines),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable proxy buffering
        },
    )

download_logs async

download_logs(job_id, backend=Depends(get_state_backend))

Download complete log file as plain text.

Parameters:

Name Type Description Default
job_id str

Job identifier

required
backend StateBackend

State backend (injected)

Depends(get_state_backend)

Returns:

Type Description
Response

Complete log file content as text/plain

Raises:

Type Description
HTTPException

404 if job/logs not found

Source code in src/marianne/dashboard/routes/stream.py
@router.get("/{job_id}/logs/static")
async def download_logs(
    job_id: str,
    backend: StateBackend = Depends(get_state_backend),
) -> Response:
    """Download complete log file as plain text.

    Args:
        job_id: Job identifier
        backend: State backend (injected)

    Returns:
        Complete log file content as text/plain

    Raises:
        HTTPException: 404 if job/logs not found
    """
    log_file = await _get_job_log_file(job_id, backend)

    # Guard against unbounded memory usage on large log files (50 MB limit)
    max_download_bytes = 50 * 1024 * 1024
    try:
        file_size = log_file.stat().st_size
    except OSError as e:
        raise HTTPException(status_code=403, detail=f"Cannot access log file: {e}") from e

    if file_size > max_download_bytes:
        raise HTTPException(
            status_code=413,
            detail=(
                f"Log file too large for download: "
                f"{file_size / (1024 * 1024):.1f} MB "
                f"(limit {max_download_bytes // (1024 * 1024)} MB). "
                f"Use the streaming endpoint instead."
            ),
        )

    try:
        content = log_file.read_text(encoding="utf-8", errors="replace")

        # Add informational header
        lines = content.count("\n")
        size_kb = len(content.encode("utf-8")) / 1024

        header = f"# Marianne Job Logs - Job ID: {job_id}\n"
        header += f"# Generated: {datetime.now().isoformat()}\n"
        header += f"# File: {log_file.name}\n"
        header += f"# Size: {size_kb:.1f} KB, {lines} lines\n"
        header += "#" + "=" * 60 + "\n\n"

        return PlainTextResponse(
            content=header + content,
            headers={
                "Content-Disposition": f"attachment; filename=marianne-{job_id}-logs.txt",
                "Content-Type": "text/plain; charset=utf-8",
            },
        )

    except (OSError, PermissionError) as e:
        raise HTTPException(status_code=403, detail=f"Cannot read log file: {e}") from e

get_log_info async

get_log_info(job_id, backend=Depends(get_state_backend))

Get information about job log file.

Parameters:

Name Type Description Default
job_id str

Job identifier

required
backend StateBackend

State backend (injected)

Depends(get_state_backend)

Returns:

Type Description
LogDownloadInfo

Log file information

Raises:

Type Description
HTTPException

404 if job/logs not found

Source code in src/marianne/dashboard/routes/stream.py
@router.get("/{job_id}/logs/info", response_model=LogDownloadInfo)
async def get_log_info(
    job_id: str,
    backend: StateBackend = Depends(get_state_backend),
) -> LogDownloadInfo:
    """Get information about job log file.

    Args:
        job_id: Job identifier
        backend: State backend (injected)

    Returns:
        Log file information

    Raises:
        HTTPException: 404 if job/logs not found
    """
    log_file = await _get_job_log_file(job_id, backend)

    try:
        stat = log_file.stat()

        # Count lines efficiently
        with open(log_file, "rb") as f:
            lines = sum(1 for _ in f)

        return LogDownloadInfo(
            job_id=job_id,
            log_file=log_file.name,
            size_bytes=stat.st_size,
            lines=lines,
            last_modified=datetime.fromtimestamp(stat.st_mtime),
        )

    except (OSError, PermissionError) as e:
        raise HTTPException(status_code=403, detail=f"Cannot access log file: {e}") from e