progress
progress
¶
Execution progress tracking for long-running sheet executions.
Provides real-time visibility into sheet execution progress through: - Byte/line tracking as output streams in - Phase indicators (starting, executing, validating) - Periodic progress snapshots for state persistence - Heartbeat updates to detect stalled executions
This module enables the CLI to show "Still running... 5.2KB received, 3m elapsed" and provides the data needed for execution history analysis.
Classes¶
ExecutionProgress
dataclass
¶
ExecutionProgress(started_at, last_activity_at, bytes_received=0, lines_received=0, phase='starting')
Snapshot of execution progress at a point in time.
Tracks metrics about an ongoing execution to provide visibility during long-running sheet operations.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
When the execution started. |
last_activity_at |
datetime
|
When we last received output or status update. |
bytes_received |
int
|
Total bytes of output received so far. |
lines_received |
int
|
Total lines of output received so far. |
phase |
str
|
Current execution phase. |
elapsed_seconds |
float
|
Seconds since execution started. |
Attributes¶
Functions¶
format_bytes
¶
Format bytes_received as human-readable string.
Source code in src/marianne/execution/progress.py
format_elapsed
¶
Format elapsed time as human-readable string.
Source code in src/marianne/execution/progress.py
format_status
¶
Format complete status line for display.
Returns a string like: "Still running... 5.2KB received, 3m 15s elapsed"
Source code in src/marianne/execution/progress.py
to_dict
¶
Convert to dictionary for JSON serialization.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with progress metrics and timestamps. |
Source code in src/marianne/execution/progress.py
ProgressTracker
dataclass
¶
Tracks execution progress during a sheet execution.
Provides methods to update progress metrics as output streams in, and optionally notifies a callback for real-time updates.
Example
tracker = ProgressTracker(callback=my_update_fn) tracker.set_phase("executing") tracker.update(new_bytes=1024, new_lines=10) print(tracker.get_progress().format_status())
Attributes:
| Name | Type | Description |
|---|---|---|
callback |
ProgressCallback | None
|
Optional function called on each progress update. |
_progress |
ExecutionProgress
|
Internal ExecutionProgress state. |
_snapshot_interval_seconds |
float
|
How often to record snapshots. |
_last_snapshot_at |
datetime
|
When last snapshot was recorded. |
_snapshots |
list[dict[str, Any]]
|
List of progress snapshots for persistence. |
Functions¶
__post_init__
¶
Initialize progress tracking state.
update
¶
Update progress with new output received.
Increments byte/line counters, updates last_activity_at, and optionally records a snapshot for persistence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_bytes
|
int
|
Number of new bytes received. |
0
|
new_lines
|
int
|
Number of new lines received. |
0
|
force_snapshot
|
bool
|
Force a snapshot even if interval hasn't passed. |
False
|
Source code in src/marianne/execution/progress.py
set_phase
¶
Set the current execution phase.
Records a snapshot when phase changes to track phase transitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
phase
|
str
|
New phase (starting, executing, validating, completed). |
required |
Source code in src/marianne/execution/progress.py
get_progress
¶
Get current progress state.
Returns:
| Type | Description |
|---|---|
ExecutionProgress
|
Copy of current ExecutionProgress. |
Source code in src/marianne/execution/progress.py
get_snapshots
¶
Get all recorded progress snapshots.
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of snapshot dictionaries for persistence. |
reset
¶
Reset tracker for new execution.
Clears all counters and snapshots but preserves callback.
Source code in src/marianne/execution/progress.py
StreamingOutputTracker
¶
Helper for tracking streaming output from subprocess.
Wraps a ProgressTracker to process raw bytes/lines and update counters. Useful for tracking output from asyncio.subprocess streams.
Example
tracker = ProgressTracker() stream_tracker = StreamingOutputTracker(tracker)
async for chunk in stream: stream_tracker.process_chunk(chunk)
Initialize streaming output tracker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
progress_tracker
|
ProgressTracker
|
Underlying ProgressTracker to update. |
required |
Source code in src/marianne/execution/progress.py
Functions¶
process_chunk
¶
Process a chunk of output bytes.
Counts complete lines and updates byte/line counters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk
|
bytes
|
Raw bytes from the output stream. |
required |
Source code in src/marianne/execution/progress.py
finish
¶
Finish tracking, counting any remaining partial line.