dag
dag
¶
Dependency DAG for sheet execution ordering.
This module implements a Directed Acyclic Graph (DAG) for managing sheet dependencies in Marianne jobs. It enables: - Explicit dependency declarations between sheets - Topological sorting for valid execution order - Cycle detection to prevent infinite loops - Parallel group identification for concurrent execution
The DAG is a foundation for parallel sheet execution (Evolution 2 of v17).
Classes¶
DAGReadyStatus
¶
Bases: Enum
Result of querying the DAG for the next executable sheet.
Distinguishes between "all sheets are done" and "remaining sheets are blocked by failed/incomplete dependencies" — previously both returned None, causing silent deadlocks.
DAGNextResult
dataclass
¶
Result of a DAG-aware next-sheet query.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
DAGReadyStatus
|
Whether a sheet is ready, all are complete, or remaining are blocked. |
sheet_num |
int | None
|
The next sheet to execute (only set when status == READY). |
blocked_sheets |
list[int]
|
Sheets that can't run because deps failed (only when BLOCKED). |
CycleDetectedError
¶
Bases: Exception
Raised when a cycle is detected in sheet dependencies.
Attributes:
| Name | Type | Description |
|---|---|---|
cycle |
The detected cycle as a list of sheet numbers. |
|
message |
Human-readable description of the cycle. |
Source code in src/marianne/execution/dag.py
InvalidDependencyError
¶
Bases: Exception
Raised when a dependency references an invalid sheet.
Attributes:
| Name | Type | Description |
|---|---|---|
sheet_num |
The sheet containing the invalid dependency. |
|
invalid_dep |
The invalid dependency value. |
|
reason |
Why the dependency is invalid. |
Source code in src/marianne/execution/dag.py
DependencyDAG
dataclass
¶
DependencyDAG(total_sheets, edges=(lambda: defaultdict(list))(), reverse_edges=(lambda: defaultdict(list))(), in_degree=dict(), validated=False)
Directed Acyclic Graph for sheet dependencies.
Builds a DAG from sheet dependency declarations and provides methods for determining valid execution order and identifying parallelizable groups.
Example
dag = DependencyDAG.from_dependencies( ... total_sheets=5, ... dependencies={2: [1], 3: [1], 4: [2, 3], 5: [4]} ... ) dag.get_execution_order() [1, 2, 3, 4, 5] dag.get_parallel_groups() [[1], [2, 3], [4], [5]]
Attributes:
| Name | Type | Description |
|---|---|---|
total_sheets |
int
|
Total number of sheets in the job. |
edges |
dict[int, list[int]]
|
Forward edges (sheet -> sheets that depend on it). |
reverse_edges |
dict[int, list[int]]
|
Backward edges (sheet -> sheets it depends on). |
in_degree |
dict[int, int]
|
Number of dependencies for each sheet. |
validated |
bool
|
Whether the DAG has been validated for cycles. |
Functions¶
from_dependencies
classmethod
¶
Create a DAG from sheet dependency declarations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
total_sheets
|
int
|
Total number of sheets (1-indexed, so sheets 1..total_sheets). |
required |
dependencies
|
dict[int, list[int]] | None
|
Map of sheet_num -> list of sheets it depends on. If None, assumes sequential dependencies (each sheet depends on previous). |
None
|
Returns:
| Type | Description |
|---|---|
DependencyDAG
|
Validated DependencyDAG ready for use. |
Raises:
| Type | Description |
|---|---|
InvalidDependencyError
|
If a dependency references a non-existent sheet. |
CycleDetectedError
|
If dependencies contain a cycle. |
Example
Sheet 3 depends on 1 and 2, sheet 4 depends on 3¶
dag = DependencyDAG.from_dependencies( ... total_sheets=4, ... dependencies={3: [1, 2], 4: [3]} ... )
Source code in src/marianne/execution/dag.py
get_execution_order
¶
Get a valid topological execution order using Kahn's algorithm.
Returns sheets in an order where all dependencies are satisfied before a sheet executes.
Returns:
| Type | Description |
|---|---|
list[int]
|
List of sheet numbers in valid execution order. |
Example
dag = DependencyDAG.from_dependencies(4, {2: [1], 3: [1, 2], 4: [3]}) dag.get_execution_order() [1, 2, 3, 4]
Source code in src/marianne/execution/dag.py
get_parallel_groups
¶
Get groups of sheets that can execute in parallel.
Returns sheets grouped by "level" in the dependency graph. All sheets in a group have their dependencies satisfied by previous groups.
Returns:
| Type | Description |
|---|---|
list[list[int]]
|
List of lists, where each inner list contains sheet numbers |
list[list[int]]
|
that can execute concurrently. |
Example
dag = DependencyDAG.from_dependencies( ... total_sheets=5, ... dependencies={2: [1], 3: [1], 4: [2, 3], 5: [4]} ... ) dag.get_parallel_groups() [[1], [2, 3], [4], [5]] # 2 and 3 can run in parallel
Source code in src/marianne/execution/dag.py
get_ready_sheets
¶
Get sheets that are ready to execute given completed sheets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
completed
|
set[int]
|
Set of sheet numbers that have completed. |
required |
Returns:
| Type | Description |
|---|---|
list[int]
|
Sorted list of sheet numbers whose dependencies are all satisfied. |
Example
dag = DependencyDAG.from_dependencies(4, {2: [1], 3: [1], 4: [2, 3]}) dag.get_ready_sheets({1}) [2, 3] # Both 2 and 3 can run after 1 completes dag.get_ready_sheets({1, 2}) [3] # 3 is still ready (was already), 4 needs 3 too dag.get_ready_sheets({1, 2, 3}) [4] # Now 4 can run
Source code in src/marianne/execution/dag.py
get_dependencies
¶
Get direct dependencies for a sheet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sheet_num
|
int
|
The sheet to get dependencies for. |
required |
Returns:
| Type | Description |
|---|---|
list[int]
|
Sorted list of sheet numbers that must complete before this sheet. |
Source code in src/marianne/execution/dag.py
get_dependents
¶
Get sheets that depend on a given sheet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sheet_num
|
int
|
The sheet to get dependents for. |
required |
Returns:
| Type | Description |
|---|---|
list[int]
|
Sorted list of sheet numbers that depend on this sheet. |
Source code in src/marianne/execution/dag.py
has_dependencies
¶
Check if any sheet has explicit dependencies.
Returns:
| Type | Description |
|---|---|
bool
|
True if any sheet depends on another, False if all independent. |
is_parallelizable
¶
Check if the DAG allows any parallel execution.
Returns:
| Type | Description |
|---|---|
bool
|
True if any parallel group has more than one sheet. |
Source code in src/marianne/execution/dag.py
to_dict
¶
Serialize DAG to dictionary for JSON storage/display.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of the DAG. |
Source code in src/marianne/execution/dag.py
__str__
¶
Human-readable string representation.
Source code in src/marianne/execution/dag.py
Functions¶
build_dag_from_config
¶
Convenience function to build DAG from config values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
total_sheets
|
int
|
Number of sheets in the job. |
required |
sheet_dependencies
|
dict[int, list[int]] | None
|
Optional dependency declarations from config. |
None
|
Returns:
| Type | Description |
|---|---|
DependencyDAG
|
Validated DependencyDAG. |
Raises:
| Type | Description |
|---|---|
CycleDetectedError
|
If dependencies contain cycles. |
InvalidDependencyError
|
If dependencies reference invalid sheets. |