Skip to content

dispatch

dispatch

Baton dispatch logic — ready sheet resolution and dispatch.

Called after every event in the baton's main loop. Finds sheets that are ready to execute and dispatches them via a callback, respecting:

  • Global concurrency limit (max_concurrent_sheets)
  • Per-instrument concurrency limits
  • Instrument rate limit state
  • Circuit breaker state
  • Cost limits (future — checked but not enforced here)
  • Backpressure (checked via baton._shutting_down)

The dispatch function is stateless — it reads baton state, makes decisions, and calls the dispatch callback. It does not own any state of its own.

Design notes: - dispatch_ready() is a free function, not a method on BatonCore. This keeps the core focused on state management and event handling. The conductor calls dispatch_ready() after the baton processes each event. - The dispatch callback is async to allow backend acquisition and task creation. - Sheets are marked as 'dispatched' by the callback, not by dispatch_ready(). This ensures that only actually-dispatched sheets consume concurrency slots.

See: docs/plans/2026-03-26-baton-design.md — Dispatch Logic section

Attributes

Classes

DispatchConfig dataclass

DispatchConfig(max_concurrent_sheets=10, instrument_concurrency=dict(), model_concurrency=dict(), rate_limited_instruments=set(), open_circuit_breakers=set())

Configuration for the dispatch logic.

Attributes:

Name Type Description
max_concurrent_sheets int

Global ceiling on concurrent sheet tasks.

instrument_concurrency dict[str, int]

Per-instrument concurrency limits (fallback when no model-specific limit exists).

model_concurrency dict[str, int]

Per-(instrument, model) concurrency limits. Keys are "instrument:model" strings. Takes priority over instrument_concurrency when the sheet has a known model.

rate_limited_instruments set[str]

Set of instrument names currently rate-limited.

open_circuit_breakers set[str]

Set of instrument names with open circuit breakers.

DispatchResult dataclass

DispatchResult(dispatched_count=0, dispatched_sheets=list(), skipped_reasons=dict())

Result of a dispatch_ready() call.

Provides feedback on what happened: how many sheets were dispatched, which ones, and why others were skipped.

Functions
record_dispatch
record_dispatch(job_id, sheet_num)

Record a successful dispatch.

Source code in src/marianne/daemon/baton/dispatch.py
def record_dispatch(self, job_id: str, sheet_num: int) -> None:
    """Record a successful dispatch."""
    self.dispatched_count += 1
    self.dispatched_sheets.append((job_id, sheet_num))
record_skip
record_skip(reason)

Record a skipped sheet with reason.

Source code in src/marianne/daemon/baton/dispatch.py
def record_skip(self, reason: str) -> None:
    """Record a skipped sheet with reason."""
    self.skipped_reasons[reason] = self.skipped_reasons.get(reason, 0) + 1

Functions

dispatch_ready async

dispatch_ready(baton, config, callback)

Find and dispatch all sheets that are ready to execute.

Called after every event in the baton's main loop. This is the only place where sheets move from 'pending'/'ready' to 'dispatched'.

Parameters:

Name Type Description Default
baton BatonCore

The baton core (provides sheet state and job registry).

required
config DispatchConfig

Dispatch configuration (concurrency limits, etc.).

required
callback DispatchCallback

Async function called for each sheet to dispatch. Receives (job_id, sheet_num, sheet_state).

required

Returns:

Type Description
DispatchResult

DispatchResult with counts of dispatched and skipped sheets.

Source code in src/marianne/daemon/baton/dispatch.py
async def dispatch_ready(
    baton: BatonCore,
    config: DispatchConfig,
    callback: DispatchCallback,
) -> DispatchResult:
    """Find and dispatch all sheets that are ready to execute.

    Called after every event in the baton's main loop. This is the
    only place where sheets move from 'pending'/'ready' to 'dispatched'.

    Args:
        baton: The baton core (provides sheet state and job registry).
        config: Dispatch configuration (concurrency limits, etc.).
        callback: Async function called for each sheet to dispatch.
            Receives (job_id, sheet_num, sheet_state).

    Returns:
        DispatchResult with counts of dispatched and skipped sheets.
    """
    result = DispatchResult()

    # Don't dispatch during shutdown
    if baton._shutting_down:
        return result

    # Track running counts per model key (instrument:model or just instrument)
    model_running: dict[str, int] = _count_dispatched_per_model(baton)
    global_running = baton.running_sheet_count

    for job_id in list(baton._jobs.keys()):
        job = baton._jobs.get(job_id)
        if job is None:
            continue

        if job.paused:
            continue

        ready = baton.get_ready_sheets(job_id)
        if ready:
            _logger.info(
                "dispatch.ready_sheets",
                extra={
                    "job_id": job_id,
                    "ready_count": len(ready),
                    "global_running": global_running,
                    "max_concurrent": config.max_concurrent_sheets,
                    "rate_limited": list(config.rate_limited_instruments),
                },
            )
        for sheet in ready:
            # Check global concurrency
            if global_running >= config.max_concurrent_sheets:
                _logger.debug(
                    "dispatch.skip.global_concurrency",
                    extra={
                        "job_id": job_id,
                        "sheet_num": sheet.sheet_num,
                        "global_running": global_running,
                        "limit": config.max_concurrent_sheets,
                    },
                )
                result.record_skip("global_concurrency")
                return result  # Hard stop — can't dispatch more

            # Check per-model concurrency (falls back to per-instrument)
            instrument = sheet.instrument_name or ""
            model_key = f"{instrument}:{sheet.model}" if sheet.model else instrument
            model_limit = config.model_concurrency.get(model_key)
            if model_limit is None:
                model_limit = config.instrument_concurrency.get(instrument)
            model_count = model_running.get(model_key, 0)
            if model_limit is not None and model_count >= model_limit:
                _logger.info(
                    "dispatch.skip.model_concurrency",
                    extra={
                        "job_id": job_id,
                        "sheet_num": sheet.sheet_num,
                        "model_key": model_key,
                        "model_count": model_count,
                        "model_limit": model_limit,
                    },
                )
                result.record_skip(f"model_concurrency:{model_key}")
                continue

            # Check instrument rate limit (transient — don't fallback)
            if instrument in config.rate_limited_instruments:
                _logger.info(
                    "dispatch.skip.rate_limited",
                    extra={
                        "job_id": job_id,
                        "sheet_num": sheet.sheet_num,
                        "instrument": instrument,
                    },
                )
                result.record_skip(f"rate_limited:{instrument}")
                continue

            # Check instrument availability — try fallback chain when
            # circuit breaker is OPEN or instrument is unregistered.
            # Loop to handle chains where multiple fallbacks are also
            # unavailable (e.g., claude-code→gemini-cli→ollama when
            # both claude-code and gemini-cli are OPEN).
            _skipped = False
            while (
                instrument in config.open_circuit_breakers
                or instrument not in baton._instruments
            ):
                if baton._check_and_fallback_unavailable(sheet, job_id):
                    instrument = sheet.instrument_name or ""
                    # Check if the new instrument is rate-limited
                    if instrument in config.rate_limited_instruments:
                        result.record_skip(f"rate_limited:{instrument}")
                        _skipped = True
                        break
                    # Loop continues to check the new instrument
                else:
                    result.record_skip(f"circuit_breaker:{instrument}")
                    _skipped = True
                    _logger.warning(
                        "baton.dispatch.all_instruments_circuit_broken",
                        extra={
                            "job_id": job_id,
                            SHEET_NUM_KEY: sheet.sheet_num,
                            "instrument": instrument,
                        },
                    )
                    break
            if _skipped:
                continue

            # Dispatch!
            try:
                await callback(job_id, sheet.sheet_num, sheet)
                # Status set through event handler for traceability.
                # Called synchronously so concurrency counting works
                # within this dispatch cycle.
                baton._handle_sheet_dispatched(SheetDispatched(
                    job_id=job_id,
                    sheet_num=sheet.sheet_num,
                    instrument=instrument,
                ))
                result.record_dispatch(job_id, sheet.sheet_num)
                global_running += 1
                model_running[model_key] = model_count + 1
            except Exception:
                _logger.error(
                    "baton.dispatch.callback_failed",
                    extra={
                        "job_id": job_id,
                        SHEET_NUM_KEY: sheet.sheet_num,
                        "instrument": instrument,
                    },
                    exc_info=True,
                )

            # Recheck global limit after each dispatch
            if global_running >= config.max_concurrent_sheets:
                return result

    if result.dispatched_count > 0:
        _logger.info(
            "baton.dispatch.cycle_complete",
            extra={
                "dispatched": result.dispatched_count,
                "skipped": sum(result.skipped_reasons.values()),
            },
        )

    return result