Skip to content

fleet

fleet

Fleet management for the Marianne conductor.

A fleet is a concert-of-concerts: multiple agent scores launched and managed as a unit. The FleetManager handles fleet-level lifecycle operations: detection, group dependency resolution, concurrent score launch, and fleet-level pause/resume/cancel.

Fleets are one level of nesting: fleet → score → sheet. No fleet-of-fleets.

Classes

FleetRecord

FleetRecord(fleet_id, config, config_path, member_jobs, group_order)

Tracks a running fleet's state.

Stores the mapping from fleet_id → member job_ids, group assignments, and dependency ordering for fleet-level operations.

Source code in src/marianne/daemon/fleet.py
def __init__(
    self,
    fleet_id: str,
    config: FleetConfig,
    config_path: Path,
    member_jobs: dict[str, str],
    group_order: list[set[str]],
) -> None:
    self.fleet_id = fleet_id
    self.config = config
    self.config_path = config_path
    # Maps score path → job_id for submitted scores
    self.member_jobs = member_jobs
    # Topologically sorted groups: each set runs concurrently
    self.group_order = group_order
Attributes
all_job_ids property
all_job_ids

All member job IDs in this fleet.

Functions

is_fleet_config

is_fleet_config(config_path)

Check if a YAML file is a fleet config (type: fleet).

Quick check that reads the YAML without full validation. Returns False on any error rather than raising.

Source code in src/marianne/daemon/fleet.py
def is_fleet_config(config_path: Path) -> bool:
    """Check if a YAML file is a fleet config (type: fleet).

    Quick check that reads the YAML without full validation. Returns False
    on any error rather than raising.
    """
    import yaml

    try:
        with open(config_path) as f:
            raw = yaml.safe_load(f)
        return isinstance(raw, dict) and raw.get("type") == "fleet"
    except Exception:
        return False

topological_sort_groups

topological_sort_groups(groups)

Sort fleet groups into dependency layers.

Returns a list of sets, where each set contains group names that can run concurrently. Earlier sets must complete before later sets start.

Groups not declared in the groups dict are treated as having no dependencies and are placed in the first layer.

Source code in src/marianne/daemon/fleet.py
def topological_sort_groups(
    groups: dict[str, FleetGroupConfig],
) -> list[set[str]]:
    """Sort fleet groups into dependency layers.

    Returns a list of sets, where each set contains group names that
    can run concurrently. Earlier sets must complete before later sets start.

    Groups not declared in the groups dict are treated as having no dependencies
    and are placed in the first layer.
    """
    if not groups:
        return [set()]

    # Build adjacency and in-degree
    in_degree: dict[str, int] = {}
    dependents: dict[str, list[str]] = defaultdict(list)

    for name, cfg in groups.items():
        in_degree.setdefault(name, 0)
        for dep in cfg.depends_on:
            dependents[dep].append(name)
            in_degree[name] = in_degree.get(name, 0) + 1
            in_degree.setdefault(dep, 0)

    # Kahn's algorithm with layer tracking
    layers: list[set[str]] = []
    queue = {name for name, deg in in_degree.items() if deg == 0}

    while queue:
        layers.append(queue)
        next_queue: set[str] = set()
        for name in queue:
            for dep in dependents.get(name, []):
                in_degree[dep] -= 1
                if in_degree[dep] == 0:
                    next_queue.add(dep)
        queue = next_queue

    return layers

submit_fleet async

submit_fleet(manager, config_path, fleet_config)

Submit a fleet config — launch scores in group-dependency order.

Launches all scores in each group layer concurrently, waiting for each layer to be submitted before proceeding to the next.

Parameters:

Name Type Description Default
manager JobManager

The JobManager to submit individual scores through.

required
config_path Path

Path to the fleet YAML file (for relative path resolution).

required
fleet_config FleetConfig

Parsed fleet configuration.

required

Returns:

Type Description
JobResponse

JobResponse with fleet_id and status.

Source code in src/marianne/daemon/fleet.py
async def submit_fleet(
    manager: JobManager,
    config_path: Path,
    fleet_config: FleetConfig,
) -> JobResponse:
    """Submit a fleet config — launch scores in group-dependency order.

    Launches all scores in each group layer concurrently, waiting for
    each layer to be submitted before proceeding to the next.

    Args:
        manager: The JobManager to submit individual scores through.
        config_path: Path to the fleet YAML file (for relative path resolution).
        fleet_config: Parsed fleet configuration.

    Returns:
        JobResponse with fleet_id and status.
    """
    fleet_id = fleet_config.name
    config_dir = config_path.parent

    # Resolve group ordering
    group_order = topological_sort_groups(fleet_config.groups)

    # Map scores to their groups
    scores_by_group: dict[str, list[str]] = defaultdict(list)
    ungrouped: list[str] = []
    for entry in fleet_config.scores:
        if entry.group:
            scores_by_group[entry.group].append(entry.path)
        else:
            ungrouped.append(entry.path)

    # All group names that appear in the order
    ordered_group_names: set[str] = set()
    for layer in group_order:
        ordered_group_names.update(layer)

    member_jobs: dict[str, str] = {}

    _logger.info(
        "fleet.submitting",
        fleet_id=fleet_id,
        total_scores=len(fleet_config.scores),
        groups=len(fleet_config.groups),
        layers=len(group_order),
    )

    # Submit ungrouped scores first (no dependencies)
    if ungrouped:
        results = await _submit_score_batch(
            manager, config_dir, ungrouped, member_jobs,
        )
        if not all(r.status == "accepted" for r in results):
            return JobResponse(
                job_id=fleet_id,
                status="rejected",
                message="One or more ungrouped scores failed to submit",
            )

    # Submit grouped scores layer by layer
    for layer in group_order:
        layer_scores: list[str] = []
        for group_name in layer:
            layer_scores.extend(scores_by_group.get(group_name, []))

        if not layer_scores:
            continue

        results = await _submit_score_batch(
            manager, config_dir, layer_scores, member_jobs,
        )
        if not all(r.status == "accepted" for r in results):
            _logger.warning(
                "fleet.layer_partial_failure",
                fleet_id=fleet_id,
                layer=sorted(layer),
                failed=[
                    s for s, r in zip(layer_scores, results, strict=False)
                    if r.status != "accepted"
                ],
            )

    # Store fleet record in manager for fleet-level operations
    record = FleetRecord(
        fleet_id=fleet_id,
        config=fleet_config,
        config_path=config_path,
        member_jobs=member_jobs,
        group_order=group_order,
    )
    manager._fleet_records[fleet_id] = record

    _logger.info(
        "fleet.submitted",
        fleet_id=fleet_id,
        member_count=len(member_jobs),
        job_ids=list(member_jobs.values()),
    )

    return JobResponse(
        job_id=fleet_id,
        status="accepted",
        message=f"Fleet '{fleet_id}' launched with {len(member_jobs)} scores",
    )

pause_fleet async

pause_fleet(manager, fleet_id)

Pause all member scores in a fleet.

Source code in src/marianne/daemon/fleet.py
async def pause_fleet(manager: JobManager, fleet_id: str) -> dict[str, Any]:
    """Pause all member scores in a fleet."""
    record = manager._fleet_records.get(fleet_id)
    if record is None:
        return {"error": f"Fleet '{fleet_id}' not found"}

    results: dict[str, bool] = {}
    for job_id in record.all_job_ids:
        try:
            ok = await manager.pause_job(job_id)
            results[job_id] = ok
        except Exception as exc:
            _logger.warning("fleet.pause_member_failed", job_id=job_id, error=str(exc))
            results[job_id] = False

    return {"fleet_id": fleet_id, "paused": results}

resume_fleet async

resume_fleet(manager, fleet_id)

Resume all paused member scores in a fleet.

Source code in src/marianne/daemon/fleet.py
async def resume_fleet(manager: JobManager, fleet_id: str) -> dict[str, Any]:
    """Resume all paused member scores in a fleet."""
    record = manager._fleet_records.get(fleet_id)
    if record is None:
        return {"error": f"Fleet '{fleet_id}' not found"}

    results: dict[str, str] = {}
    for job_id in record.all_job_ids:
        try:
            response = await manager.resume_job(job_id)
            results[job_id] = response.status
        except Exception as exc:
            _logger.warning("fleet.resume_member_failed", job_id=job_id, error=str(exc))
            results[job_id] = f"error: {exc}"

    return {"fleet_id": fleet_id, "resumed": results}

cancel_fleet async

cancel_fleet(manager, fleet_id)

Cancel all member scores in a fleet.

Source code in src/marianne/daemon/fleet.py
async def cancel_fleet(manager: JobManager, fleet_id: str) -> dict[str, Any]:
    """Cancel all member scores in a fleet."""
    record = manager._fleet_records.get(fleet_id)
    if record is None:
        return {"error": f"Fleet '{fleet_id}' not found"}

    results: dict[str, bool] = {}
    for job_id in record.all_job_ids:
        try:
            ok = await manager.cancel_job(job_id)
            results[job_id] = ok
        except Exception as exc:
            _logger.warning("fleet.cancel_member_failed", job_id=job_id, error=str(exc))
            results[job_id] = False

    return {"fleet_id": fleet_id, "cancelled": results}

get_fleet_status

get_fleet_status(manager, fleet_id)

Get status of a fleet and all its member scores.

Source code in src/marianne/daemon/fleet.py
def get_fleet_status(manager: JobManager, fleet_id: str) -> dict[str, Any]:
    """Get status of a fleet and all its member scores."""
    record = manager._fleet_records.get(fleet_id)
    if record is None:
        return {"error": f"Fleet '{fleet_id}' not found"}

    members: list[dict[str, Any]] = []
    for score_path, job_id in record.member_jobs.items():
        meta = manager._job_meta.get(job_id)
        members.append({
            "score_path": score_path,
            "job_id": job_id,
            "status": meta.status.value if meta else "unknown",
            "group": next(
                (e.group for e in record.config.scores if e.path == score_path),
                None,
            ),
        })

    return {
        "fleet_id": fleet_id,
        "name": record.config.name,
        "total_scores": len(record.config.scores),
        "members": members,
        "groups": {
            name: {"depends_on": cfg.depends_on}
            for name, cfg in record.config.groups.items()
        },
    }