synthesizer
synthesizer
¶
Result Synthesizer for parallel sheet execution (v18 evolution).
This module implements the "gather" phase of fan-out/gather pattern for parallel execution. When sheets run in parallel, their content outputs need to be synthesized into a unified result for downstream consumers.
Key features: - Content reference extraction from parallel sheet outputs - Synthesis strategies (merge, summarize, pass_through) - Persistent synthesis state for checkpointing - Integration with ParallelBatchResult
The ResultSynthesizer works with the existing ParallelExecutor to combine outputs after batch completion.
Classes¶
SynthesisStrategy
¶
Bases: str, Enum
Strategy for synthesizing parallel sheet outputs.
Attributes:
| Name | Type | Description |
|---|---|---|
MERGE |
Combine all outputs into a single result (default). |
|
SUMMARIZE |
Create a summary of all outputs. |
|
PASS_THROUGH |
No synthesis, keep individual outputs separate. |
SynthesisConfig
dataclass
¶
SynthesisConfig(strategy=MERGE, include_metadata=True, max_content_bytes=1024 * 1024, fail_on_partial=False, detect_conflicts=False, conflict_key_filter=None, fail_on_conflict=False)
Configuration for result synthesis.
Attributes:
| Name | Type | Description |
|---|---|---|
strategy |
SynthesisStrategy
|
How to combine parallel outputs. |
include_metadata |
bool
|
Whether to include synthesis metadata. |
max_content_bytes |
int
|
Maximum content size to synthesize (prevents OOM). |
fail_on_partial |
bool
|
If True, fail synthesis when some sheets failed. |
detect_conflicts |
bool
|
If True, run conflict detection before synthesis. |
conflict_key_filter |
list[str] | None
|
If provided, only check these keys for conflicts. |
fail_on_conflict |
bool
|
If True, fail synthesis when conflicts detected. |
SynthesisResult
dataclass
¶
SynthesisResult(batch_id, sheets=list(), strategy=MERGE, status='pending', created_at=utc_now(), completed_at=None, sheet_outputs=dict(), synthesized_content=None, error_message=None, metadata=dict(), conflict_detection=None)
Result of synthesizing parallel sheet outputs.
Tracks the synthesis state for a single batch of parallel sheets. This is persisted in checkpoint state for resume capability.
Attributes:
| Name | Type | Description |
|---|---|---|
batch_id |
str
|
Unique identifier for this synthesis batch. |
sheets |
list[int]
|
Sheet numbers included in this synthesis. |
strategy |
SynthesisStrategy
|
Strategy used for synthesis. |
status |
Literal['pending', 'ready', 'done', 'failed']
|
Current synthesis status. |
created_at |
datetime
|
When synthesis started. |
completed_at |
datetime | None
|
When synthesis completed (None if not complete). |
sheet_outputs |
dict[int, str]
|
Map of sheet_num -> output reference (path or content). |
synthesized_content |
str | None
|
The final synthesized result (if complete). |
error_message |
str | None
|
Error description if synthesis failed. |
metadata |
dict[str, Any]
|
Additional metadata about the synthesis. |
Attributes¶
conflict_detection
class-attribute
instance-attribute
¶
Conflict detection result if conflict detection was enabled.
Functions¶
to_dict
¶
Serialize to dictionary for persistence.
Source code in src/marianne/execution/synthesizer.py
from_dict
classmethod
¶
Deserialize from dictionary.
Source code in src/marianne/execution/synthesizer.py
ResultSynthesizer
¶
Synthesizes outputs from parallel sheet executions.
The synthesizer works with ParallelBatchResult to combine content outputs after a batch of parallel sheets completes.
Example
Attributes:
| Name | Type | Description |
|---|---|---|
config |
Synthesis configuration. |
Initialize the synthesizer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
SynthesisConfig | None
|
Synthesis configuration. Uses defaults if None. |
None
|
Source code in src/marianne/execution/synthesizer.py
Functions¶
prepare_synthesis
¶
Prepare synthesis from batch results.
Creates a SynthesisResult ready for execution. Does not actually run the synthesis - call execute_synthesis() for that.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_sheets
|
list[int]
|
All sheet numbers in the batch. |
required |
completed_sheets
|
list[int]
|
Sheets that completed successfully. |
required |
failed_sheets
|
list[int]
|
Sheets that failed. |
required |
sheet_outputs
|
dict[int, str]
|
Map of sheet_num -> output content/reference. |
required |
Returns:
| Type | Description |
|---|---|
SynthesisResult
|
SynthesisResult ready for synthesis (status="ready") or |
SynthesisResult
|
indicating failure (status="failed") if validation fails. |
Source code in src/marianne/execution/synthesizer.py
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | |
execute_synthesis
¶
Execute synthesis on prepared result.
Combines sheet outputs according to the configured strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result
|
SynthesisResult
|
SynthesisResult with status="ready". |
required |
Returns:
| Type | Description |
|---|---|
SynthesisResult
|
Updated SynthesisResult with synthesis complete. |
Source code in src/marianne/execution/synthesizer.py
OutputConflict
dataclass
¶
Represents a conflicting key-value pair between parallel sheet outputs.
When parallel sheets produce outputs with the same key but different values, this represents a potential inconsistency that may need resolution before synthesis.
Attributes:
| Name | Type | Description |
|---|---|---|
key |
str
|
The key that has conflicting values. |
sheet_a |
int
|
First sheet number in the conflict. |
value_a |
str
|
Value from sheet A. |
sheet_b |
int
|
Second sheet number in the conflict. |
value_b |
str
|
Value from sheet B. |
severity |
str
|
Conflict severity (warning, error). |
ConflictDetectionResult
dataclass
¶
ConflictDetectionResult(sheets_analyzed=list(), conflicts=list(), keys_checked=0, checked_at=utc_now())
Result of parallel output conflict detection.
Tracks conflicts detected before synthesis merging.
Attributes:
| Name | Type | Description |
|---|---|---|
sheets_analyzed |
list[int]
|
Sheets that were analyzed for conflicts. |
conflicts |
list[OutputConflict]
|
List of detected conflicts. |
keys_checked |
int
|
Total number of unique keys checked. |
checked_at |
datetime
|
When the check was performed. |
Attributes¶
Functions¶
to_dict
¶
Convert to serializable dictionary.
Source code in src/marianne/execution/synthesizer.py
ConflictDetector
¶
Detects conflicts in parallel sheet outputs before synthesis.
Analyzes outputs from parallel sheets to identify cases where the same key has different values, which may indicate inconsistent results that need resolution before merging.
Uses KeyVariableExtractor from validation module to parse key-value pairs from sheet outputs.
Example
Initialize detector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_filter
|
list[str] | None
|
If provided, only check these keys for conflicts. |
None
|
strict_mode
|
bool
|
If True, all conflicts are errors. If False, warnings. |
False
|
Source code in src/marianne/execution/synthesizer.py
Functions¶
detect_conflicts
¶
Detect conflicts across parallel sheet outputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sheet_outputs
|
dict[int, str]
|
Map of sheet_num -> output content. |
required |
Returns:
| Type | Description |
|---|---|
ConflictDetectionResult
|
ConflictDetectionResult with any conflicts found. |
Source code in src/marianne/execution/synthesizer.py
Functions¶
synthesize_batch
¶
Convenience function to synthesize a batch in one call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_sheets
|
list[int]
|
All sheet numbers in the batch. |
required |
completed_sheets
|
list[int]
|
Sheets that completed successfully. |
required |
failed_sheets
|
list[int]
|
Sheets that failed. |
required |
sheet_outputs
|
dict[int, str]
|
Map of sheet_num -> output content. |
required |
config
|
SynthesisConfig | None
|
Optional synthesis configuration. |
None
|
Returns:
| Type | Description |
|---|---|
SynthesisResult
|
SynthesisResult with synthesis complete. |
Source code in src/marianne/execution/synthesizer.py
detect_parallel_conflicts
¶
Convenience function to detect conflicts in one call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sheet_outputs
|
dict[int, str]
|
Map of sheet_num -> output content. |
required |
key_filter
|
list[str] | None
|
If provided, only check these keys. |
None
|
strict_mode
|
bool
|
If True, all conflicts are errors. |
False
|
Returns:
| Type | Description |
|---|---|
ConflictDetectionResult
|
ConflictDetectionResult with any conflicts found. |