Skip to content

synthesizer

synthesizer

Result Synthesizer for parallel sheet execution (v18 evolution).

This module implements the "gather" phase of fan-out/gather pattern for parallel execution. When sheets run in parallel, their content outputs need to be synthesized into a unified result for downstream consumers.

Key features: - Content reference extraction from parallel sheet outputs - Synthesis strategies (merge, summarize, pass_through) - Persistent synthesis state for checkpointing - Integration with ParallelBatchResult

The ResultSynthesizer works with the existing ParallelExecutor to combine outputs after batch completion.

Classes

SynthesisStrategy

Bases: str, Enum

Strategy for synthesizing parallel sheet outputs.

Attributes:

Name Type Description
MERGE

Combine all outputs into a single result (default).

SUMMARIZE

Create a summary of all outputs.

PASS_THROUGH

No synthesis, keep individual outputs separate.

SynthesisConfig dataclass

SynthesisConfig(strategy=MERGE, include_metadata=True, max_content_bytes=1024 * 1024, fail_on_partial=False, detect_conflicts=False, conflict_key_filter=None, fail_on_conflict=False)

Configuration for result synthesis.

Attributes:

Name Type Description
strategy SynthesisStrategy

How to combine parallel outputs.

include_metadata bool

Whether to include synthesis metadata.

max_content_bytes int

Maximum content size to synthesize (prevents OOM).

fail_on_partial bool

If True, fail synthesis when some sheets failed.

detect_conflicts bool

If True, run conflict detection before synthesis.

conflict_key_filter list[str] | None

If provided, only check these keys for conflicts.

fail_on_conflict bool

If True, fail synthesis when conflicts detected.

SynthesisResult dataclass

SynthesisResult(batch_id, sheets=list(), strategy=MERGE, status='pending', created_at=utc_now(), completed_at=None, sheet_outputs=dict(), synthesized_content=None, error_message=None, metadata=dict(), conflict_detection=None)

Result of synthesizing parallel sheet outputs.

Tracks the synthesis state for a single batch of parallel sheets. This is persisted in checkpoint state for resume capability.

Attributes:

Name Type Description
batch_id str

Unique identifier for this synthesis batch.

sheets list[int]

Sheet numbers included in this synthesis.

strategy SynthesisStrategy

Strategy used for synthesis.

status Literal['pending', 'ready', 'done', 'failed']

Current synthesis status.

created_at datetime

When synthesis started.

completed_at datetime | None

When synthesis completed (None if not complete).

sheet_outputs dict[int, str]

Map of sheet_num -> output reference (path or content).

synthesized_content str | None

The final synthesized result (if complete).

error_message str | None

Error description if synthesis failed.

metadata dict[str, Any]

Additional metadata about the synthesis.

Attributes
conflict_detection class-attribute instance-attribute
conflict_detection = None

Conflict detection result if conflict detection was enabled.

is_complete property
is_complete

True if synthesis has completed (success or failure).

is_success property
is_success

True if synthesis completed successfully.

Functions
to_dict
to_dict()

Serialize to dictionary for persistence.

Source code in src/marianne/execution/synthesizer.py
def to_dict(self) -> SynthesisResultDict:
    """Serialize to dictionary for persistence."""
    result = SynthesisResultDict(
        batch_id=self.batch_id,
        sheets=self.sheets,
        strategy=self.strategy.value,
        status=self.status,
        created_at=self.created_at.isoformat() if self.created_at else None,
        completed_at=self.completed_at.isoformat() if self.completed_at else None,
        sheet_outputs=self.sheet_outputs,
        synthesized_content=self.synthesized_content,
        error_message=self.error_message,
        metadata=self.metadata,
        conflict_detection=self.conflict_detection,
    )
    return result
from_dict classmethod
from_dict(data)

Deserialize from dictionary.

Source code in src/marianne/execution/synthesizer.py
@classmethod
def from_dict(cls, data: SynthesisResultDict) -> "SynthesisResult":
    """Deserialize from dictionary."""
    created_at = data.get("created_at")
    completed_at = data.get("completed_at")

    return cls(
        batch_id=data["batch_id"],
        sheets=data.get("sheets", []),
        strategy=SynthesisStrategy(data.get("strategy", "merge")),
        status=data.get("status", "pending"),
        created_at=datetime.fromisoformat(created_at) if created_at else utc_now(),
        completed_at=datetime.fromisoformat(completed_at) if completed_at else None,
        sheet_outputs=data.get("sheet_outputs", {}),
        synthesized_content=data.get("synthesized_content"),
        error_message=data.get("error_message"),
        metadata=data.get("metadata", {}),
        conflict_detection=data.get("conflict_detection"),
    )

ResultSynthesizer

ResultSynthesizer(config=None)

Synthesizes outputs from parallel sheet executions.

The synthesizer works with ParallelBatchResult to combine content outputs after a batch of parallel sheets completes.

Example
synthesizer = ResultSynthesizer(config)
result = synthesizer.prepare_synthesis(batch_result, state)
if result.status == "ready":
    result = synthesizer.execute_synthesis(result)

Attributes:

Name Type Description
config

Synthesis configuration.

Initialize the synthesizer.

Parameters:

Name Type Description Default
config SynthesisConfig | None

Synthesis configuration. Uses defaults if None.

None
Source code in src/marianne/execution/synthesizer.py
def __init__(self, config: SynthesisConfig | None = None):
    """Initialize the synthesizer.

    Args:
        config: Synthesis configuration. Uses defaults if None.
    """
    self.config = config or SynthesisConfig()
    self._logger = _logger
Functions
prepare_synthesis
prepare_synthesis(batch_sheets, completed_sheets, failed_sheets, sheet_outputs)

Prepare synthesis from batch results.

Creates a SynthesisResult ready for execution. Does not actually run the synthesis - call execute_synthesis() for that.

Parameters:

Name Type Description Default
batch_sheets list[int]

All sheet numbers in the batch.

required
completed_sheets list[int]

Sheets that completed successfully.

required
failed_sheets list[int]

Sheets that failed.

required
sheet_outputs dict[int, str]

Map of sheet_num -> output content/reference.

required

Returns:

Type Description
SynthesisResult

SynthesisResult ready for synthesis (status="ready") or

SynthesisResult

indicating failure (status="failed") if validation fails.

Source code in src/marianne/execution/synthesizer.py
def prepare_synthesis(
    self,
    batch_sheets: list[int],
    completed_sheets: list[int],
    failed_sheets: list[int],
    sheet_outputs: dict[int, str],
) -> SynthesisResult:
    """Prepare synthesis from batch results.

    Creates a SynthesisResult ready for execution. Does not actually
    run the synthesis - call execute_synthesis() for that.

    Args:
        batch_sheets: All sheet numbers in the batch.
        completed_sheets: Sheets that completed successfully.
        failed_sheets: Sheets that failed.
        sheet_outputs: Map of sheet_num -> output content/reference.

    Returns:
        SynthesisResult ready for synthesis (status="ready") or
        indicating failure (status="failed") if validation fails.
    """
    import uuid

    batch_id = f"batch_{uuid.uuid4().hex[:8]}"

    result = SynthesisResult(
        batch_id=batch_id,
        sheets=batch_sheets,
        strategy=self.config.strategy,
    )

    # Record outputs for completed sheets
    for sheet_num in completed_sheets:
        if sheet_num in sheet_outputs:
            result.sheet_outputs[sheet_num] = sheet_outputs[sheet_num]

    # Check if we can proceed with synthesis
    if failed_sheets and self.config.fail_on_partial:
        result.status = "failed"
        result.error_message = f"Synthesis requires all sheets: {len(failed_sheets)} failed"
        self._logger.warning(
            "synthesizer.partial_failure",
            batch_id=batch_id,
            failed_sheets=failed_sheets,
        )
        return result

    if not result.sheet_outputs:
        result.status = "failed"
        result.error_message = "No sheet outputs to synthesize"
        self._logger.warning(
            "synthesizer.no_outputs",
            batch_id=batch_id,
        )
        return result

    # Run conflict detection if enabled
    if self.config.detect_conflicts and len(result.sheet_outputs) >= 2:
        conflict_result = self._detect_conflicts(result.sheet_outputs)
        result.conflict_detection = conflict_result.to_dict()

        if conflict_result.has_conflicts:
            self._logger.warning(
                "synthesizer.conflicts_detected",
                batch_id=batch_id,
                conflict_count=len(conflict_result.conflicts),
                error_count=conflict_result.error_count,
            )

            if self.config.fail_on_conflict:
                result.status = "failed"
                result.error_message = (
                    f"Conflict detection found {len(conflict_result.conflicts)} "
                    f"conflicts ({conflict_result.error_count} errors)"
                )
                return result

    result.status = "ready"
    result.metadata = {
        "batch_size": len(batch_sheets),
        "completed_count": len(completed_sheets),
        "failed_count": len(failed_sheets),
        "outputs_captured": len(result.sheet_outputs),
        "conflict_detection_enabled": self.config.detect_conflicts,
    }

    self._logger.info(
        "synthesizer.prepared",
        batch_id=batch_id,
        sheets=batch_sheets,
        outputs_captured=len(result.sheet_outputs),
    )

    return result
execute_synthesis
execute_synthesis(result)

Execute synthesis on prepared result.

Combines sheet outputs according to the configured strategy.

Parameters:

Name Type Description Default
result SynthesisResult

SynthesisResult with status="ready".

required

Returns:

Type Description
SynthesisResult

Updated SynthesisResult with synthesis complete.

Source code in src/marianne/execution/synthesizer.py
def execute_synthesis(self, result: SynthesisResult) -> SynthesisResult:
    """Execute synthesis on prepared result.

    Combines sheet outputs according to the configured strategy.

    Args:
        result: SynthesisResult with status="ready".

    Returns:
        Updated SynthesisResult with synthesis complete.
    """
    if result.status != "ready":
        msg = (
            f"execute_synthesis() called with status='{result.status}' "
            f"(expected 'ready') for batch '{result.batch_id}'"
        )
        self._logger.error(
            "synthesizer.invalid_state",
            batch_id=result.batch_id,
            status=result.status,
        )
        raise ValueError(msg)

    try:
        if result.strategy == SynthesisStrategy.MERGE:
            synthesized = self._merge_outputs(result.sheet_outputs)
        elif result.strategy == SynthesisStrategy.SUMMARIZE:
            synthesized = self._summarize_outputs(result.sheet_outputs)
        else:  # PASS_THROUGH
            synthesized = self._pass_through_outputs(result.sheet_outputs)

        # Check size limit
        if len(synthesized.encode("utf-8")) > self.config.max_content_bytes:
            result.status = "failed"
            result.error_message = (
                f"Synthesized content exceeds limit: "
                f"{len(synthesized.encode('utf-8'))} > {self.config.max_content_bytes}"
            )
            self._logger.warning(
                "synthesizer.size_exceeded",
                batch_id=result.batch_id,
                size=len(synthesized.encode("utf-8")),
                limit=self.config.max_content_bytes,
            )
            return result

        result.synthesized_content = synthesized
        result.status = "done"
        result.completed_at = utc_now()

        self._logger.info(
            "synthesizer.complete",
            batch_id=result.batch_id,
            strategy=result.strategy.value,
            content_size=len(synthesized),
        )

    except Exception as e:
        result.status = "failed"
        result.error_message = f"Synthesis failed: {e}"
        self._logger.error(
            "synthesizer.error",
            batch_id=result.batch_id,
            error=str(e),
        )

    return result

OutputConflict dataclass

OutputConflict(key, sheet_a, value_a, sheet_b, value_b, severity='warning')

Represents a conflicting key-value pair between parallel sheet outputs.

When parallel sheets produce outputs with the same key but different values, this represents a potential inconsistency that may need resolution before synthesis.

Attributes:

Name Type Description
key str

The key that has conflicting values.

sheet_a int

First sheet number in the conflict.

value_a str

Value from sheet A.

sheet_b int

Second sheet number in the conflict.

value_b str

Value from sheet B.

severity str

Conflict severity (warning, error).

Functions
format_message
format_message()

Format as human-readable message.

Source code in src/marianne/execution/synthesizer.py
def format_message(self) -> str:
    """Format as human-readable message."""
    return (
        f"Conflict on '{self.key}': "
        f"sheet {self.sheet_a}='{self.value_a}' vs "
        f"sheet {self.sheet_b}='{self.value_b}'"
    )

ConflictDetectionResult dataclass

ConflictDetectionResult(sheets_analyzed=list(), conflicts=list(), keys_checked=0, checked_at=utc_now())

Result of parallel output conflict detection.

Tracks conflicts detected before synthesis merging.

Attributes:

Name Type Description
sheets_analyzed list[int]

Sheets that were analyzed for conflicts.

conflicts list[OutputConflict]

List of detected conflicts.

keys_checked int

Total number of unique keys checked.

checked_at datetime

When the check was performed.

Attributes
has_conflicts property
has_conflicts

True if any conflicts were detected.

error_count property
error_count

Count of error-severity conflicts.

warning_count property
warning_count

Count of warning-severity conflicts.

Functions
to_dict
to_dict()

Convert to serializable dictionary.

Source code in src/marianne/execution/synthesizer.py
def to_dict(self) -> dict[str, Any]:
    """Convert to serializable dictionary."""
    return {
        "sheets_analyzed": self.sheets_analyzed,
        "conflicts": [
            {
                "key": c.key,
                "sheet_a": c.sheet_a,
                "value_a": c.value_a,
                "sheet_b": c.sheet_b,
                "value_b": c.value_b,
                "severity": c.severity,
            }
            for c in self.conflicts
        ],
        "keys_checked": self.keys_checked,
        "checked_at": self.checked_at.isoformat(),
        "has_conflicts": self.has_conflicts,
        "error_count": self.error_count,
        "warning_count": self.warning_count,
    }

ConflictDetector

ConflictDetector(key_filter=None, strict_mode=False)

Detects conflicts in parallel sheet outputs before synthesis.

Analyzes outputs from parallel sheets to identify cases where the same key has different values, which may indicate inconsistent results that need resolution before merging.

Uses KeyVariableExtractor from validation module to parse key-value pairs from sheet outputs.

Example
detector = ConflictDetector()
result = detector.detect_conflicts({
    1: "STATUS: complete\nVERSION: 1.0",
    2: "STATUS: failed\nVERSION: 1.0",  # STATUS conflicts!
})
if result.has_conflicts:
    for conflict in result.conflicts:
        print(conflict.format_message())

Initialize detector.

Parameters:

Name Type Description Default
key_filter list[str] | None

If provided, only check these keys for conflicts.

None
strict_mode bool

If True, all conflicts are errors. If False, warnings.

False
Source code in src/marianne/execution/synthesizer.py
def __init__(
    self,
    key_filter: list[str] | None = None,
    strict_mode: bool = False,
) -> None:
    """Initialize detector.

    Args:
        key_filter: If provided, only check these keys for conflicts.
        strict_mode: If True, all conflicts are errors. If False, warnings.
    """
    # Import here to avoid circular import
    from marianne.execution.validation import KeyVariableExtractor

    self.extractor = KeyVariableExtractor(key_filter=key_filter)
    self.strict_mode = strict_mode
    self._logger = _logger
Functions
detect_conflicts
detect_conflicts(sheet_outputs)

Detect conflicts across parallel sheet outputs.

Parameters:

Name Type Description Default
sheet_outputs dict[int, str]

Map of sheet_num -> output content.

required

Returns:

Type Description
ConflictDetectionResult

ConflictDetectionResult with any conflicts found.

Source code in src/marianne/execution/synthesizer.py
def detect_conflicts(
    self,
    sheet_outputs: dict[int, str],
) -> ConflictDetectionResult:
    """Detect conflicts across parallel sheet outputs.

    Args:
        sheet_outputs: Map of sheet_num -> output content.

    Returns:
        ConflictDetectionResult with any conflicts found.
    """
    result = ConflictDetectionResult(
        sheets_analyzed=sorted(sheet_outputs.keys()),
    )

    if len(sheet_outputs) < 2:
        # Need at least 2 sheets to have conflicts
        return result

    # Extract key-value pairs from each sheet
    sheet_variables: dict[int, dict[str, str]] = {}
    for sheet_num, content in sheet_outputs.items():
        variables = self.extractor.extract(content)
        sheet_variables[sheet_num] = {v.key: v.value for v in variables}

    # Count total unique keys across all sheets
    all_keys: set[str] = set()
    for vars_dict in sheet_variables.values():
        all_keys.update(vars_dict.keys())
    result.keys_checked = len(all_keys)

    # Build canonical reference from first sheet, compare all others against it.
    # This is O(n) instead of O(n²) pairwise comparison — for 50 sheets,
    # 49 comparisons vs 1225.
    sheets_sorted = sorted(sheet_outputs.keys())
    if len(sheets_sorted) >= 2:
        reference_sheet = sheets_sorted[0]
        reference_vars = sheet_variables.get(reference_sheet, {})
        for sheet_b in sheets_sorted[1:]:
            self._compare_sheets(
                reference_sheet, reference_vars,
                sheet_b, sheet_variables.get(sheet_b, {}),
                result,
            )

    if result.has_conflicts:
        self._logger.warning(
            "conflict_detector.conflicts_found",
            sheets=result.sheets_analyzed,
            conflict_count=len(result.conflicts),
            error_count=result.error_count,
            warning_count=result.warning_count,
        )
    else:
        self._logger.debug(
            "conflict_detector.no_conflicts",
            sheets=result.sheets_analyzed,
            keys_checked=result.keys_checked,
        )

    return result

Functions

synthesize_batch

synthesize_batch(batch_sheets, completed_sheets, failed_sheets, sheet_outputs, config=None)

Convenience function to synthesize a batch in one call.

Parameters:

Name Type Description Default
batch_sheets list[int]

All sheet numbers in the batch.

required
completed_sheets list[int]

Sheets that completed successfully.

required
failed_sheets list[int]

Sheets that failed.

required
sheet_outputs dict[int, str]

Map of sheet_num -> output content.

required
config SynthesisConfig | None

Optional synthesis configuration.

None

Returns:

Type Description
SynthesisResult

SynthesisResult with synthesis complete.

Source code in src/marianne/execution/synthesizer.py
def synthesize_batch(
    batch_sheets: list[int],
    completed_sheets: list[int],
    failed_sheets: list[int],
    sheet_outputs: dict[int, str],
    config: SynthesisConfig | None = None,
) -> SynthesisResult:
    """Convenience function to synthesize a batch in one call.

    Args:
        batch_sheets: All sheet numbers in the batch.
        completed_sheets: Sheets that completed successfully.
        failed_sheets: Sheets that failed.
        sheet_outputs: Map of sheet_num -> output content.
        config: Optional synthesis configuration.

    Returns:
        SynthesisResult with synthesis complete.
    """
    synthesizer = ResultSynthesizer(config)
    result = synthesizer.prepare_synthesis(
        batch_sheets, completed_sheets, failed_sheets, sheet_outputs
    )

    if result.status == "ready":
        result = synthesizer.execute_synthesis(result)

    return result

detect_parallel_conflicts

detect_parallel_conflicts(sheet_outputs, key_filter=None, strict_mode=False)

Convenience function to detect conflicts in one call.

Parameters:

Name Type Description Default
sheet_outputs dict[int, str]

Map of sheet_num -> output content.

required
key_filter list[str] | None

If provided, only check these keys.

None
strict_mode bool

If True, all conflicts are errors.

False

Returns:

Type Description
ConflictDetectionResult

ConflictDetectionResult with any conflicts found.

Source code in src/marianne/execution/synthesizer.py
def detect_parallel_conflicts(
    sheet_outputs: dict[int, str],
    key_filter: list[str] | None = None,
    strict_mode: bool = False,
) -> ConflictDetectionResult:
    """Convenience function to detect conflicts in one call.

    Args:
        sheet_outputs: Map of sheet_num -> output content.
        key_filter: If provided, only check these keys.
        strict_mode: If True, all conflicts are errors.

    Returns:
        ConflictDetectionResult with any conflicts found.
    """
    detector = ConflictDetector(key_filter=key_filter, strict_mode=strict_mode)
    return detector.detect_conflicts(sheet_outputs)