rate_coordinator
rate_coordinator
¶
Cross-job rate limit coordination.
Phase 3 infrastructure — write path active, read path not yet driving execution.
The write side (report_rate_limit()) is wired into the daemon via
JobManager._on_rate_limit → JobService →
RunnerContext.rate_limit_callback. Rate limit events from job
runners flow into the coordinator in real time.
The read side (is_rate_limited()) is consumed by
GlobalSheetScheduler, which is built and tested but not yet
driving execution (Phase 3). When the scheduler is integrated into
the execution path (see scheduler.py module docstring), the
collected rate limit data will inform cross-job scheduling decisions.
When any job hits a rate limit, ALL jobs using that backend are notified
to back off. Much faster than the SQLite cross-process approach in
marianne.learning.store.rate_limits since everything is in-process.
Satisfies the RateLimitChecker protocol defined in scheduler.py
so the GlobalSheetScheduler can query limits before dispatching.
Lock ordering (daemon-wide): 1. GlobalSheetScheduler._lock 2. RateLimitCoordinator._lock ← this module 3. BackpressureController (lock-free — reads are atomic) 4. CentralLearningStore._lock (future — Stage 5)
Classes¶
RateLimitEvent
dataclass
¶
A single rate limit event reported by a job.
RateLimitCoordinator
¶
In-memory rate limit coordination across all daemon jobs.
Status: Write path active, read path partially active.
The write side (report_rate_limit()) is wired into the daemon
via JobManager._on_rate_limit → JobService →
RunnerContext.rate_limit_callback. The read side
(is_rate_limited()) is consumed by GlobalSheetScheduler
which is built and tested but not yet driving execution (Phase 3).
When any job hits a rate limit, ALL jobs using that backend are notified to back off. The coordinator tracks per-backend resume times.
The async is_rate_limited method satisfies the
RateLimitChecker protocol so the scheduler can consult
limits before dispatching sheets.
Source code in src/marianne/daemon/rate_coordinator.py
Attributes¶
active_limits
property
¶
Currently active limits as {backend_type: seconds_remaining}.
Functions¶
report_rate_limit
async
¶
Report a rate limit hit — all jobs using this backend back off.
If a limit is already active for the backend, the resume time is extended to whichever is later (existing or newly reported).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend_type
|
str
|
Backend that was rate-limited (e.g. |
required |
wait_seconds
|
float
|
Suggested wait duration in seconds. |
required |
job_id
|
str
|
Job that encountered the limit. |
required |
sheet_num
|
int
|
Sheet that triggered the limit. |
required |
Source code in src/marianne/daemon/rate_coordinator.py
is_rate_limited
async
¶
Check if a backend is currently rate-limited.
Satisfies the RateLimitChecker protocol used by
GlobalSheetScheduler.next_sheet().
The model parameter is accepted for protocol compatibility
but currently unused — limits are tracked per backend type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend_type
|
str
|
Backend to check. |
required |
model
|
str | None
|
Unused; accepted for protocol compatibility. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
float
|
|
Source code in src/marianne/daemon/rate_coordinator.py
clear_limits
async
¶
Clear active rate limits, optionally for a specific instrument.
Removes the active limit entry so is_rate_limited() will
return False immediately. Event history is preserved for
diagnostics — only the active limit is removed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instrument
|
str | None
|
If provided, clear only this instrument's limit.
If |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of limits cleared. |
Source code in src/marianne/daemon/rate_coordinator.py
prune_stale
async
¶
Remove expired events and limits.
Called periodically by ResourceMonitor._loop() to prevent
unbounded memory growth. report_rate_limit() also prunes
on each call via the active write path, but this periodic
prune ensures cleanup even during quiet periods.
Returns:
| Type | Description |
|---|---|
int
|
Number of stale events removed. |