Skip to content

reconciliation

reconciliation

Config reconciliation on reload.

When a job config is reloaded (auto or explicit), derived state in the checkpoint may be stale. This module provides a declarative mapping from config sections to checkpoint fields, and a reconcile function that resets stale fields when their source config section changed.

The structural test in test_reconciliation.py ensures every new config section gets a mapping entry -- preventing future staleness bugs.

Classes

ReconciliationReport dataclass

ReconciliationReport(sections_changed=list(), sections_removed=list(), fields_reset=dict(), config_diff=dict())

Report of what was reconciled during config reload.

Functions
summary
summary()

Human-readable summary of changes.

Source code in src/marianne/execution/reconciliation.py
def summary(self) -> str:
    """Human-readable summary of changes."""
    if not self.has_changes:
        return "No config changes detected"
    parts: list[str] = []
    if self.sections_changed:
        parts.append(
            f"{len(self.sections_changed)} section(s) changed: "
            f"{', '.join(sorted(self.sections_changed))}"
        )
    if self.sections_removed:
        parts.append(
            f"{len(self.sections_removed)} section(s) removed: "
            f"{', '.join(sorted(self.sections_removed))}"
        )
    reset_count = sum(len(v) for v in self.fields_reset.values())
    if reset_count:
        parts.append(f"{reset_count} checkpoint field(s) reset")
    return "; ".join(parts)

Functions

reconcile_config

reconcile_config(state, new_config)

Reconcile checkpoint state after config reload.

Compares the old config snapshot in state with the new config, identifies changed sections, and resets stale checkpoint fields according to CONFIG_STATE_MAPPING.

Parameters:

Name Type Description Default
state CheckpointState

Current checkpoint state (mutated in place).

required
new_config JobConfig

The newly loaded config.

required

Returns:

Type Description
ReconciliationReport

ReconciliationReport describing what changed and what was reset.

Source code in src/marianne/execution/reconciliation.py
def reconcile_config(
    state: CheckpointState,
    new_config: JobConfig,
) -> ReconciliationReport:
    """Reconcile checkpoint state after config reload.

    Compares the old config snapshot in state with the new config,
    identifies changed sections, and resets stale checkpoint fields
    according to CONFIG_STATE_MAPPING.

    Args:
        state: Current checkpoint state (mutated in place).
        new_config: The newly loaded config.

    Returns:
        ReconciliationReport describing what changed and what was reset.
    """
    report = ReconciliationReport()

    old_snapshot = state.config_snapshot or {}
    new_snapshot = new_config.model_dump(mode="json")

    # Find changed and removed sections
    all_keys = set(old_snapshot) | set(new_snapshot)
    for key in all_keys:
        if key in METADATA_FIELDS:
            continue
        old_val = old_snapshot.get(key)
        new_val = new_snapshot.get(key)
        if old_val != new_val:
            if key not in new_snapshot:
                report.sections_removed.append(key)
            else:
                report.sections_changed.append(key)
            report.config_diff[key] = (old_val, new_val)

    # Build a fresh instance to read Pydantic defaults reliably
    # (avoids poking at FieldInfo.default_factory internals).
    _defaults = CheckpointState(
        job_id="", job_name="", total_sheets=1,
    )

    # Reset checkpoint fields for changed/removed sections
    for section in report.sections_changed + report.sections_removed:
        fields_to_reset = CONFIG_STATE_MAPPING.get(section, [])
        if not fields_to_reset:
            continue

        reset_fields: list[str] = []
        for field_name in fields_to_reset:
            if field_name in CheckpointState.model_fields:
                default = getattr(_defaults, field_name)
                current = getattr(state, field_name, None)
                if current != default:
                    setattr(state, field_name, default)
                    reset_fields.append(field_name)

        if reset_fields:
            report.fields_reset[section] = reset_fields
            _logger.info(
                "reconciliation.fields_reset",
                section=section,
                fields=reset_fields,
            )

    return report