scheduler
scheduler
¶
Global sheet scheduler — cross-job concurrency control.
Phase 3 infrastructure — fully built and tested, not yet wired into the execution path.
Currently, JobManager._run_job_task() delegates to
JobService.start_job() which runs jobs monolithically (all sheets
sequentially within one task). When this scheduler is wired in, the
manager will instead decompose jobs into individual sheets, register
them via register_job(), and use next_sheet() /
mark_complete() to drive per-sheet dispatch with cross-job
fair-share, DAG ordering, and rate-limit awareness.
Manages a priority min-heap of sheets from ALL active daemon jobs. Enforces global concurrency limits, per-job fair-share scheduling, DAG dependency awareness, and integrates with rate limiting and backpressure controllers.
Lock ordering (daemon-wide): 1. GlobalSheetScheduler._lock 2. RateLimitCoordinator._lock 3. BackpressureController (lock-free — reads are atomic) 4. CentralLearningStore._lock (future — Stage 5)
Classes¶
RateLimitChecker
¶
Bases: Protocol
Protocol for rate limit checking (satisfied by RateLimitCoordinator).
BackpressureChecker
¶
Bases: Protocol
Protocol for backpressure checking (satisfied by BackpressureController).
SheetInfo
dataclass
¶
SheetInfo(job_id, sheet_num, backend_type='claude_cli', model=None, estimated_cost=0.0, dag_depth=0, job_priority=5, retries_so_far=0)
Metadata about a sheet waiting to be scheduled.
SheetEntry
dataclass
¶
Priority queue entry for a schedulable sheet.
Ordered by (priority, submitted_at) — lower priority value = higher
urgency. The info field is excluded from comparison so that
heapq ordering is based solely on the numeric sort keys.
SchedulerStats
dataclass
¶
Statistics snapshot from the scheduler.
GlobalSheetScheduler
¶
Cross-job sheet-level scheduler using a priority min-heap.
Status: Phase 3 infrastructure — built and tested, not yet wired into the execution path.
Manages a global priority queue of sheets from all active jobs.
Enforces:
- max_concurrent_sheets from DaemonConfig
- Per-job fair-share limits (penalty-based, not hard-block)
- Per-job hard cap at fair_share * max_per_job_multiplier
- DAG dependency awareness via completed-set tracking
The scheduler does NOT own sheet execution — it decides which
sheet should run next and the JobManager performs the actual
execution.
Integration plan (future):
1. JobManager._run_job_task() calls register_job() with
all sheets parsed from the JobConfig.
2. A dispatch loop calls next_sheet() and spawns per-sheet
tasks instead of calling JobService.start_job() monolithically.
3. Each per-sheet task calls mark_complete() on finish.
4. cancel_job() / shutdown calls deregister_job() for cleanup.
Source code in src/marianne/daemon/scheduler.py
Attributes¶
Functions¶
set_rate_limiter
¶
set_backpressure
¶
register_job
async
¶
Register a job's sheets with the scheduler.
Enqueues sheets whose dependencies are already satisfied (or have
no dependencies). Remaining sheets are enqueued as their deps
complete via mark_complete().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
sheets
|
list[SheetInfo]
|
All sheets for this job. |
required |
dependencies
|
dict[int, set[int]] | None
|
Optional DAG — |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If dependencies contain a cycle (would deadlock). |
Source code in src/marianne/daemon/scheduler.py
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | |
deregister_job
async
¶
Remove all pending sheets for a cancelled/completed job.
Source code in src/marianne/daemon/scheduler.py
next_sheet
async
¶
Pop the highest-priority ready sheet, respecting limits.
Returns None if no sheet can run (concurrency full, backpressure active, queue empty, or all queued sheets are rate-limited).
Single-caller constraint: The backpressure delay (asyncio.sleep)
runs outside the scheduler lock. If multiple coroutines call
next_sheet() concurrently, they may all pass the backpressure
check before any acquires the lock, effectively bypassing the delay
for concurrent callers. The intended usage is a single dispatch
loop calling next_sheet() sequentially — the JobManager
dispatch loop satisfies this constraint.
Source code in src/marianne/daemon/scheduler.py
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 | |
mark_complete
async
¶
Mark a sheet as done and enqueue newly-ready dependents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job that owns this sheet. |
required |
sheet_num
|
int
|
Which sheet completed. |
required |
success
|
bool
|
Whether execution succeeded. |
required |
Source code in src/marianne/daemon/scheduler.py
get_stats
async
¶
Return a snapshot of scheduler state.