Skip to content

global_store

global_store

Global learning store - re-exports from modular package.

This module provides backward-compatible imports for the GlobalLearningStore class and all related models. The implementation has been modularized into the marianne.learning.store package.

The original monolithic implementation (~5136 LOC) has been split into focused mixins for better maintainability: - models.py: All dataclasses and enums - base.py: Core connection and schema management - patterns.py: Pattern recording and quarantine lifecycle - executions.py: Execution outcome recording - rate_limits.py: Cross-workspace rate limit coordination - drift.py: Effectiveness and epistemic drift detection - escalation.py: Escalation decision recording - budget.py: Exploration budget management

Usage remains unchanged

from marianne.learning.global_store import GlobalLearningStore store = GlobalLearningStore()

Classes

BudgetMixin

Mixin providing exploration budget and entropy response functionality.

This mixin provides methods for managing the exploration budget (which controls how much the system explores vs exploits known patterns) and automatic entropy responses (which inject diversity when entropy drops).

The exploration budget uses a floor to ensure diversity never goes to zero, and a ceiling to prevent over-exploration. It adjusts dynamically based on measured entropy: low entropy triggers boosts, healthy entropy allows decay.

Requires the following from the composed class
  • _get_connection() -> context manager yielding sqlite3.Connection
Functions
get_exploration_budget
get_exploration_budget(job_hash=None)

Get the most recent exploration budget record.

v23 Evolution: Exploration Budget Maintenance - returns the current exploration budget state for pattern selection modulation.

Parameters:

Name Type Description Default
job_hash str | None

Optional job hash to filter by specific job.

None

Returns:

Type Description
ExplorationBudgetRecord | None

The most recent ExplorationBudgetRecord, or None if no budget recorded.

Source code in src/marianne/learning/store/budget.py
def get_exploration_budget(
    self,
    job_hash: str | None = None,
) -> ExplorationBudgetRecord | None:
    """Get the most recent exploration budget record.

    v23 Evolution: Exploration Budget Maintenance - returns the current
    exploration budget state for pattern selection modulation.

    Args:
        job_hash: Optional job hash to filter by specific job.

    Returns:
        The most recent ExplorationBudgetRecord, or None if no budget recorded.
    """
    where, params = self._where_job_hash(job_hash)
    with self._get_connection() as conn:
        cursor = conn.execute(
            f"""
            SELECT id, job_hash, recorded_at, budget_value,
                   entropy_at_time, adjustment_type, adjustment_reason
            FROM exploration_budget
            {where}
            ORDER BY recorded_at DESC
            LIMIT 1
            """,
            params,
        )
        row = cursor.fetchone()

        if not row:
            return None

        return ExplorationBudgetRecord(
            id=row["id"],
            job_hash=row["job_hash"],
            recorded_at=datetime.fromisoformat(row["recorded_at"]),
            budget_value=row["budget_value"],
            entropy_at_time=row["entropy_at_time"],
            adjustment_type=row["adjustment_type"],
            adjustment_reason=row["adjustment_reason"],
        )
get_exploration_budget_history
get_exploration_budget_history(job_hash=None, limit=50)

Get exploration budget history for analysis.

v23 Evolution: Exploration Budget Maintenance - returns historical budget records for visualization and trend analysis.

Parameters:

Name Type Description Default
job_hash str | None

Optional job hash to filter by specific job.

None
limit int

Maximum number of records to return.

50

Returns:

Type Description
list[ExplorationBudgetRecord]

List of ExplorationBudgetRecord objects, most recent first.

Source code in src/marianne/learning/store/budget.py
def get_exploration_budget_history(
    self,
    job_hash: str | None = None,
    limit: int = 50,
) -> list[ExplorationBudgetRecord]:
    """Get exploration budget history for analysis.

    v23 Evolution: Exploration Budget Maintenance - returns historical
    budget records for visualization and trend analysis.

    Args:
        job_hash: Optional job hash to filter by specific job.
        limit: Maximum number of records to return.

    Returns:
        List of ExplorationBudgetRecord objects, most recent first.
    """
    where, params = self._where_job_hash(job_hash)
    with self._get_connection() as conn:
        cursor = conn.execute(
            f"""
            SELECT id, job_hash, recorded_at, budget_value,
                   entropy_at_time, adjustment_type, adjustment_reason
            FROM exploration_budget
            {where}
            ORDER BY recorded_at DESC
            LIMIT ?
            """,
            (*params, limit),
        )

        return [
            ExplorationBudgetRecord(
                id=row["id"],
                job_hash=row["job_hash"],
                recorded_at=datetime.fromisoformat(row["recorded_at"]),
                budget_value=row["budget_value"],
                entropy_at_time=row["entropy_at_time"],
                adjustment_type=row["adjustment_type"],
                adjustment_reason=row["adjustment_reason"],
            )
            for row in cursor.fetchall()
        ]
update_exploration_budget
update_exploration_budget(job_hash, budget_value, adjustment_type, entropy_at_time=None, adjustment_reason=None, floor=0.05, ceiling=0.5)

Update the exploration budget with floor and ceiling enforcement.

v23 Evolution: Exploration Budget Maintenance - records budget adjustments while enforcing floor (never go to zero) and ceiling limits.

Parameters:

Name Type Description Default
job_hash str

Hash of the job updating the budget.

required
budget_value float

Proposed new budget value.

required
adjustment_type str

Type: 'initial', 'decay', 'boost', 'floor_enforced'.

required
entropy_at_time float | None

Optional entropy measurement at adjustment time.

None
adjustment_reason str | None

Human-readable reason for adjustment.

None
floor float

Minimum allowed budget (default 0.05 = 5%).

0.05
ceiling float

Maximum allowed budget (default 0.50 = 50%).

0.5

Returns:

Type Description
ExplorationBudgetRecord

The new ExplorationBudgetRecord.

Source code in src/marianne/learning/store/budget.py
def update_exploration_budget(
    self,
    job_hash: str,
    budget_value: float,
    adjustment_type: str,
    entropy_at_time: float | None = None,
    adjustment_reason: str | None = None,
    floor: float = 0.05,
    ceiling: float = 0.50,
) -> ExplorationBudgetRecord:
    """Update the exploration budget with floor and ceiling enforcement.

    v23 Evolution: Exploration Budget Maintenance - records budget
    adjustments while enforcing floor (never go to zero) and ceiling limits.

    Args:
        job_hash: Hash of the job updating the budget.
        budget_value: Proposed new budget value.
        adjustment_type: Type: 'initial', 'decay', 'boost', 'floor_enforced'.
        entropy_at_time: Optional entropy measurement at adjustment time.
        adjustment_reason: Human-readable reason for adjustment.
        floor: Minimum allowed budget (default 0.05 = 5%).
        ceiling: Maximum allowed budget (default 0.50 = 50%).

    Returns:
        The new ExplorationBudgetRecord.
    """
    # Enforce floor and ceiling
    original_value = budget_value
    budget_value = max(floor, min(ceiling, budget_value))

    # Update adjustment_type if floor was enforced
    if original_value < floor:
        adjustment_type = "floor_enforced"
        adjustment_reason = (
            f"Budget {original_value:.3f} enforced to floor {floor:.3f}"
        )
    elif original_value > ceiling:
        adjustment_type = "ceiling_enforced"
        adjustment_reason = (
            f"Budget {original_value:.3f} enforced to ceiling {ceiling:.3f}"
        )

    record_id = str(uuid.uuid4())
    recorded_at = datetime.now()

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO exploration_budget (
                id, job_hash, recorded_at, budget_value,
                entropy_at_time, adjustment_type, adjustment_reason
            ) VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
            (
                record_id,
                job_hash,
                recorded_at.isoformat(),
                budget_value,
                entropy_at_time,
                adjustment_type,
                adjustment_reason,
            ),
        )

    _logger.debug(
        f"Updated exploration budget: {budget_value:.3f} ({adjustment_type})"
    )

    return ExplorationBudgetRecord(
        id=record_id,
        job_hash=job_hash,
        recorded_at=recorded_at,
        budget_value=budget_value,
        entropy_at_time=entropy_at_time,
        adjustment_type=adjustment_type,
        adjustment_reason=adjustment_reason,
    )
calculate_budget_adjustment
calculate_budget_adjustment(job_hash, current_entropy, floor=0.05, ceiling=0.5, decay_rate=0.95, boost_amount=0.1, entropy_threshold=0.3, initial_budget=0.15)

Calculate and record the next budget adjustment based on entropy.

v23 Evolution: Exploration Budget Maintenance - implements the core budget adjustment logic: - When entropy < threshold: boost budget by boost_amount - When entropy >= threshold: decay budget by decay_rate - Budget never drops below floor or exceeds ceiling

Parameters:

Name Type Description Default
job_hash str

Hash of the job.

required
current_entropy float

Current pattern entropy (0.0-1.0).

required
floor float

Minimum budget floor (default 0.05).

0.05
ceiling float

Maximum budget ceiling (default 0.50).

0.5
decay_rate float

Decay multiplier when entropy healthy (default 0.95).

0.95
boost_amount float

Amount to add when entropy low (default 0.10).

0.1
entropy_threshold float

Entropy level that triggers boost (default 0.3).

0.3
initial_budget float

Starting budget if no history (default 0.15).

0.15

Returns:

Type Description
ExplorationBudgetRecord

The new ExplorationBudgetRecord after adjustment.

Source code in src/marianne/learning/store/budget.py
def calculate_budget_adjustment(
    self,
    job_hash: str,
    current_entropy: float,
    floor: float = 0.05,
    ceiling: float = 0.50,
    decay_rate: float = 0.95,
    boost_amount: float = 0.10,
    entropy_threshold: float = 0.3,
    initial_budget: float = 0.15,
) -> ExplorationBudgetRecord:
    """Calculate and record the next budget adjustment based on entropy.

    v23 Evolution: Exploration Budget Maintenance - implements the core
    budget adjustment logic:
    - When entropy < threshold: boost budget by boost_amount
    - When entropy >= threshold: decay budget by decay_rate
    - Budget never drops below floor or exceeds ceiling

    Args:
        job_hash: Hash of the job.
        current_entropy: Current pattern entropy (0.0-1.0).
        floor: Minimum budget floor (default 0.05).
        ceiling: Maximum budget ceiling (default 0.50).
        decay_rate: Decay multiplier when entropy healthy (default 0.95).
        boost_amount: Amount to add when entropy low (default 0.10).
        entropy_threshold: Entropy level that triggers boost (default 0.3).
        initial_budget: Starting budget if no history (default 0.15).

    Returns:
        The new ExplorationBudgetRecord after adjustment.
    """
    # Get current budget
    current = self.get_exploration_budget(job_hash)

    if current is None:
        # First budget record - initialize
        return self.update_exploration_budget(
            job_hash=job_hash,
            budget_value=initial_budget,
            adjustment_type="initial",
            entropy_at_time=current_entropy,
            adjustment_reason="Initial budget set",
            floor=floor,
            ceiling=ceiling,
        )

    # Calculate new budget based on entropy
    if current_entropy < entropy_threshold:
        # Low entropy - boost exploration to inject diversity
        new_budget = current.budget_value + boost_amount
        adjustment_type = "boost"
        reason = f"Entropy {current_entropy:.3f} < threshold {entropy_threshold:.3f}"
    else:
        # Healthy entropy - decay toward floor
        new_budget = current.budget_value * decay_rate
        adjustment_type = "decay"
        reason = f"Entropy {current_entropy:.3f} >= threshold, decaying"

    return self.update_exploration_budget(
        job_hash=job_hash,
        budget_value=new_budget,
        adjustment_type=adjustment_type,
        entropy_at_time=current_entropy,
        adjustment_reason=reason,
        floor=floor,
        ceiling=ceiling,
    )
get_exploration_budget_statistics
get_exploration_budget_statistics(job_hash=None)

Get statistics about exploration budget usage.

v23 Evolution: Exploration Budget Maintenance - provides aggregate statistics for monitoring and reporting.

Parameters:

Name Type Description Default
job_hash str | None

Optional job hash to filter by specific job.

None

Returns:

Type Description
dict[str, Any]

Dict with budget statistics:

dict[str, Any]
  • current_budget: Current budget value
dict[str, Any]
  • avg_budget: Average budget over history
dict[str, Any]
  • min_budget: Minimum recorded budget
dict[str, Any]
  • max_budget: Maximum recorded budget
dict[str, Any]
  • total_adjustments: Total number of adjustments
dict[str, Any]
  • floor_enforcements: Number of times floor was enforced
dict[str, Any]
  • boost_count: Number of boost adjustments
dict[str, Any]
  • decay_count: Number of decay adjustments
Source code in src/marianne/learning/store/budget.py
def get_exploration_budget_statistics(
    self,
    job_hash: str | None = None,
) -> dict[str, Any]:
    """Get statistics about exploration budget usage.

    v23 Evolution: Exploration Budget Maintenance - provides aggregate
    statistics for monitoring and reporting.

    Args:
        job_hash: Optional job hash to filter by specific job.

    Returns:
        Dict with budget statistics:
        - current_budget: Current budget value
        - avg_budget: Average budget over history
        - min_budget: Minimum recorded budget
        - max_budget: Maximum recorded budget
        - total_adjustments: Total number of adjustments
        - floor_enforcements: Number of times floor was enforced
        - boost_count: Number of boost adjustments
        - decay_count: Number of decay adjustments
    """
    where, params = self._where_job_hash(job_hash)
    with self._get_connection() as conn:
        cursor = conn.execute(
            f"""
            SELECT
                COUNT(*) as total,
                AVG(budget_value) as avg_val,
                MIN(budget_value) as min_val,
                MAX(budget_value) as max_val,
                SUM(CASE WHEN adjustment_type = 'floor_enforced'
                    THEN 1 ELSE 0 END) as floor_count,
                SUM(CASE WHEN adjustment_type = 'boost'
                    THEN 1 ELSE 0 END) as boost_count,
                SUM(CASE WHEN adjustment_type = 'decay'
                    THEN 1 ELSE 0 END) as decay_count
            FROM exploration_budget
            {where}
            """,
            params,
        )

        row = cursor.fetchone()

        if not row or row["total"] == 0:
            return {
                "current_budget": None,
                "avg_budget": 0.0,
                "min_budget": 0.0,
                "max_budget": 0.0,
                "total_adjustments": 0,
                "floor_enforcements": 0,
                "boost_count": 0,
                "decay_count": 0,
            }

    # Get current budget separately
    current = self.get_exploration_budget(job_hash)

    return {
        "current_budget": current.budget_value if current else None,
        "avg_budget": row["avg_val"] or 0.0,
        "min_budget": row["min_val"] or 0.0,
        "max_budget": row["max_val"] or 0.0,
        "total_adjustments": row["total"],
        "floor_enforcements": row["floor_count"] or 0,
        "boost_count": row["boost_count"] or 0,
        "decay_count": row["decay_count"] or 0,
    }
check_entropy_response_needed
check_entropy_response_needed(job_hash, entropy_threshold=0.3, cooldown_seconds=3600)

Check if an entropy response is needed based on current conditions.

v23 Evolution: Automatic Entropy Response - evaluates whether the current entropy level warrants a diversity injection response.

Parameters:

Name Type Description Default
job_hash str

Hash of the job to check.

required
entropy_threshold float

Entropy below this triggers response.

0.3
cooldown_seconds int

Minimum seconds since last response.

3600

Returns:

Type Description
bool

Tuple of (needs_response, current_entropy, reason):

float | None
  • needs_response: True if response should be triggered
str
  • current_entropy: Current diversity_index or None if not calculable
tuple[bool, float | None, str]
  • reason: Human-readable explanation of decision
Source code in src/marianne/learning/store/budget.py
def check_entropy_response_needed(
    self,
    job_hash: str,
    entropy_threshold: float = 0.3,
    cooldown_seconds: int = 3600,
) -> tuple[bool, float | None, str]:
    """Check if an entropy response is needed based on current conditions.

    v23 Evolution: Automatic Entropy Response - evaluates whether the
    current entropy level warrants a diversity injection response.

    Args:
        job_hash: Hash of the job to check.
        entropy_threshold: Entropy below this triggers response.
        cooldown_seconds: Minimum seconds since last response.

    Returns:
        Tuple of (needs_response, current_entropy, reason):
        - needs_response: True if response should be triggered
        - current_entropy: Current diversity_index or None if not calculable
        - reason: Human-readable explanation of decision
    """
    # Check cooldown - has there been a recent response?
    last_response = self.get_last_entropy_response(job_hash)
    if last_response:
        seconds_since = (datetime.now() - last_response.recorded_at).total_seconds()
        if seconds_since < cooldown_seconds:
            remaining = cooldown_seconds - seconds_since
            return (
                False,
                None,
                f"Cooldown active ({remaining:.0f}s remaining)",
            )

    # Calculate current entropy directly
    # Get patterns and calculate entropy
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT p.id, COUNT(pa.id) as app_count
            FROM patterns p
            LEFT JOIN pattern_applications pa ON p.id = pa.pattern_id
            GROUP BY p.id
            HAVING app_count > 0
            """
        )
        rows = cursor.fetchall()

    if not rows:
        return (False, None, "No pattern applications to analyze")

    total_apps = sum(row["app_count"] for row in rows)
    if total_apps == 0:
        return (False, None, "No pattern applications to analyze")

    # Calculate diversity index using Shannon entropy
    probabilities = [row["app_count"] / total_apps for row in rows]
    shannon_entropy = -sum(
        p * math.log2(p) for p in probabilities if p > 0
    )
    max_entropy = math.log2(len(rows)) if len(rows) > 1 else 1.0
    diversity_index = shannon_entropy / max_entropy if max_entropy > 0 else 0.0

    # Check if response is needed
    if diversity_index < entropy_threshold:
        return (
            True,
            diversity_index,
            f"Entropy {diversity_index:.3f} < threshold {entropy_threshold:.3f}",
        )

    return (
        False,
        diversity_index,
        f"Entropy {diversity_index:.3f} >= threshold {entropy_threshold:.3f} (healthy)",
    )
trigger_entropy_response
trigger_entropy_response(job_hash='', entropy_at_trigger=0.0, threshold_used=0.0, *, trigger=None, config=None, boost_budget=None, revisit_quarantine=None, max_quarantine_revisits=None, budget_floor=None, budget_ceiling=None, budget_boost_amount=None)

Execute an entropy response by boosting budget and/or revisiting quarantine.

v23 Evolution: Automatic Entropy Response - performs the actual response actions when entropy has dropped below threshold.

Supports two calling conventions
  1. Positional (legacy): trigger_entropy_response(job_hash, entropy, threshold, ...)
  2. Bundled (preferred): trigger_entropy_response(trigger=ctx, config=cfg)

When trigger is supplied, its fields take precedence over positional arguments.

Parameters:

Name Type Description Default
job_hash str

Hash of the job triggering response (legacy positional).

''
entropy_at_trigger float

Entropy value that triggered this response (legacy positional).

0.0
threshold_used float

The threshold that was crossed (legacy positional).

0.0
trigger EntropyTriggerContext | None

Bundled trigger context (preferred over positional args).

None
config EntropyResponseConfig | None

Configuration object grouping all response tuning params. Individual keyword arguments override config values when both are provided.

None
boost_budget bool | None

Whether to boost exploration budget.

None
revisit_quarantine bool | None

Whether to revisit quarantined patterns.

None
max_quarantine_revisits int | None

Maximum patterns to revisit.

None
budget_floor float | None

Floor for budget enforcement.

None
budget_ceiling float | None

Ceiling for budget enforcement.

None
budget_boost_amount float | None

Amount to boost budget by.

None

Returns:

Type Description
EntropyResponseRecord

The EntropyResponseRecord documenting the response.

Source code in src/marianne/learning/store/budget.py
def trigger_entropy_response(
    self,
    job_hash: str = "",
    entropy_at_trigger: float = 0.0,
    threshold_used: float = 0.0,
    *,
    trigger: EntropyTriggerContext | None = None,
    config: EntropyResponseConfig | None = None,
    boost_budget: bool | None = None,
    revisit_quarantine: bool | None = None,
    max_quarantine_revisits: int | None = None,
    budget_floor: float | None = None,
    budget_ceiling: float | None = None,
    budget_boost_amount: float | None = None,
) -> EntropyResponseRecord:
    """Execute an entropy response by boosting budget and/or revisiting quarantine.

    v23 Evolution: Automatic Entropy Response - performs the actual response
    actions when entropy has dropped below threshold.

    Supports two calling conventions:
        1. **Positional** (legacy):
           ``trigger_entropy_response(job_hash, entropy, threshold, ...)``
        2. **Bundled** (preferred): ``trigger_entropy_response(trigger=ctx, config=cfg)``

    When *trigger* is supplied, its fields take precedence over
    positional arguments.

    Args:
        job_hash: Hash of the job triggering response (legacy positional).
        entropy_at_trigger: Entropy value that triggered this response (legacy positional).
        threshold_used: The threshold that was crossed (legacy positional).
        trigger: Bundled trigger context (preferred over positional args).
        config: Configuration object grouping all response tuning params.
            Individual keyword arguments override config values when both
            are provided.
        boost_budget: Whether to boost exploration budget.
        revisit_quarantine: Whether to revisit quarantined patterns.
        max_quarantine_revisits: Maximum patterns to revisit.
        budget_floor: Floor for budget enforcement.
        budget_ceiling: Ceiling for budget enforcement.
        budget_boost_amount: Amount to boost budget by.

    Returns:
        The EntropyResponseRecord documenting the response.
    """
    # Resolve trigger context: prefer bundled object, fall back to positional args
    if trigger is not None:
        job_hash = trigger.job_hash
        entropy_at_trigger = trigger.entropy_at_trigger
        threshold_used = trigger.threshold_used

    # Build effective config: start from config (or defaults), then apply
    # any explicit keyword overrides without mutating the caller's object.
    overrides: dict[str, Any] = {
        k: v for k, v in {
            "boost_budget": boost_budget,
            "revisit_quarantine": revisit_quarantine,
            "max_quarantine_revisits": max_quarantine_revisits,
            "budget_floor": budget_floor,
            "budget_ceiling": budget_ceiling,
            "budget_boost_amount": budget_boost_amount,
        }.items() if v is not None
    }
    cfg = replace(config, **overrides) if config else EntropyResponseConfig(**overrides)

    actions_taken: list[str] = []
    patterns_revisited: list[str] = []
    budget_boosted = False
    quarantine_revisit_count = 0

    # Action 1: Boost exploration budget
    if cfg.boost_budget:
        current = self.get_exploration_budget(job_hash)
        if current:
            new_budget = current.budget_value + cfg.budget_boost_amount
        else:
            new_budget = 0.15 + cfg.budget_boost_amount  # Initial + boost

        self.update_exploration_budget(
            job_hash=job_hash,
            budget_value=new_budget,
            adjustment_type="boost",
            entropy_at_time=entropy_at_trigger,
            adjustment_reason=(
                f"Entropy response: diversity"
                f" {entropy_at_trigger:.3f} < {threshold_used:.3f}"
            ),
            floor=cfg.budget_floor,
            ceiling=cfg.budget_ceiling,
        )
        budget_boosted = True
        actions_taken.append("budget_boost")
        _logger.info("entropy_budget_boost", boost_amount=round(cfg.budget_boost_amount, 4))

    # Action 2: Revisit quarantined patterns
    if cfg.revisit_quarantine:
        # Get quarantined patterns and mark for review
        with self._get_connection() as conn:
            cursor = conn.execute(
                """
                SELECT id, pattern_name
                FROM patterns
                WHERE quarantine_status = 'quarantined'
                ORDER BY last_seen DESC
                LIMIT ?
                """,
                (cfg.max_quarantine_revisits,),
            )
            quarantined = cursor.fetchall()

            for row in quarantined:
                # Mark for review by setting status to PENDING
                conn.execute(
                    """
                    UPDATE patterns
                    SET quarantine_status = 'pending',
                        quarantine_reason = 'Entropy response: revisiting for revalidation'
                    WHERE id = ?
                    """,
                    (row["id"],),
                )
                patterns_revisited.append(row["id"])
                quarantine_revisit_count += 1
                _logger.info(
                    f"Entropy response: Revisiting quarantined pattern {row['pattern_name']}"
                )

        if quarantine_revisit_count > 0:
            actions_taken.append("quarantine_revisit")

    # Record the response
    record_id = str(uuid.uuid4())
    recorded_at = datetime.now()

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO entropy_responses (
                id, job_hash, recorded_at, entropy_at_trigger,
                threshold_used, actions_taken, budget_boosted,
                quarantine_revisits, patterns_revisited
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                record_id,
                job_hash,
                recorded_at.isoformat(),
                entropy_at_trigger,
                threshold_used,
                json.dumps(actions_taken),
                1 if budget_boosted else 0,
                quarantine_revisit_count,
                json.dumps(patterns_revisited),
            ),
        )

    _logger.info(
        f"Entropy response complete: {len(actions_taken)} actions, "
        f"budget_boosted={budget_boosted}, revisited={quarantine_revisit_count}"
    )

    return EntropyResponseRecord(
        id=record_id,
        job_hash=job_hash,
        recorded_at=recorded_at,
        entropy_at_trigger=entropy_at_trigger,
        threshold_used=threshold_used,
        actions_taken=actions_taken,
        budget_boosted=budget_boosted,
        quarantine_revisits=quarantine_revisit_count,
        patterns_revisited=patterns_revisited,
    )
get_last_entropy_response
get_last_entropy_response(job_hash=None)

Get the most recent entropy response record.

v23 Evolution: Automatic Entropy Response - used for cooldown checking.

Parameters:

Name Type Description Default
job_hash str | None

Optional job hash to filter by.

None

Returns:

Type Description
EntropyResponseRecord | None

The most recent EntropyResponseRecord, or None if none found.

Source code in src/marianne/learning/store/budget.py
def get_last_entropy_response(
    self,
    job_hash: str | None = None,
) -> EntropyResponseRecord | None:
    """Get the most recent entropy response record.

    v23 Evolution: Automatic Entropy Response - used for cooldown checking.

    Args:
        job_hash: Optional job hash to filter by.

    Returns:
        The most recent EntropyResponseRecord, or None if none found.
    """
    where, params = self._where_job_hash(job_hash)
    with self._get_connection() as conn:
        cursor = conn.execute(
            f"""
            SELECT id, job_hash, recorded_at, entropy_at_trigger,
                   threshold_used, actions_taken, budget_boosted,
                   quarantine_revisits, patterns_revisited
            FROM entropy_responses
            {where}
            ORDER BY recorded_at DESC
            LIMIT 1
            """,
            params,
        )

        row = cursor.fetchone()

        if not row:
            return None

        return EntropyResponseRecord(
            id=row["id"],
            job_hash=row["job_hash"],
            recorded_at=datetime.fromisoformat(row["recorded_at"]),
            entropy_at_trigger=row["entropy_at_trigger"],
            threshold_used=row["threshold_used"],
            actions_taken=json.loads(row["actions_taken"]),
            budget_boosted=bool(row["budget_boosted"]),
            quarantine_revisits=row["quarantine_revisits"],
            patterns_revisited=(
                json.loads(row["patterns_revisited"])
                if row["patterns_revisited"] else []
            ),
        )
get_entropy_response_history
get_entropy_response_history(job_hash=None, limit=50)

Get entropy response history for analysis.

v23 Evolution: Automatic Entropy Response - returns historical response records for visualization and trend analysis.

Parameters:

Name Type Description Default
job_hash str | None

Optional job hash to filter by.

None
limit int

Maximum number of records to return.

50

Returns:

Type Description
list[EntropyResponseRecord]

List of EntropyResponseRecord objects, most recent first.

Source code in src/marianne/learning/store/budget.py
def get_entropy_response_history(
    self,
    job_hash: str | None = None,
    limit: int = 50,
) -> list[EntropyResponseRecord]:
    """Get entropy response history for analysis.

    v23 Evolution: Automatic Entropy Response - returns historical
    response records for visualization and trend analysis.

    Args:
        job_hash: Optional job hash to filter by.
        limit: Maximum number of records to return.

    Returns:
        List of EntropyResponseRecord objects, most recent first.
    """
    where, params = self._where_job_hash(job_hash)
    with self._get_connection() as conn:
        cursor = conn.execute(
            f"""
            SELECT id, job_hash, recorded_at, entropy_at_trigger,
                   threshold_used, actions_taken, budget_boosted,
                   quarantine_revisits, patterns_revisited
            FROM entropy_responses
            {where}
            ORDER BY recorded_at DESC
            LIMIT ?
            """,
            (*params, limit),
        )

        return [
            EntropyResponseRecord(
                id=row["id"],
                job_hash=row["job_hash"],
                recorded_at=datetime.fromisoformat(row["recorded_at"]),
                entropy_at_trigger=row["entropy_at_trigger"],
                threshold_used=row["threshold_used"],
                actions_taken=json.loads(row["actions_taken"]),
                budget_boosted=bool(row["budget_boosted"]),
                quarantine_revisits=row["quarantine_revisits"],
                patterns_revisited=(
                    json.loads(row["patterns_revisited"])
                    if row["patterns_revisited"] else []
                ),
            )
            for row in cursor.fetchall()
        ]
get_entropy_response_statistics
get_entropy_response_statistics(job_hash=None)

Get statistics about entropy responses.

v23 Evolution: Automatic Entropy Response - provides aggregate statistics for monitoring and reporting.

Parameters:

Name Type Description Default
job_hash str | None

Optional job hash to filter by.

None

Returns:

Type Description
dict[str, Any]

Dict with response statistics.

Source code in src/marianne/learning/store/budget.py
def get_entropy_response_statistics(
    self,
    job_hash: str | None = None,
) -> dict[str, Any]:
    """Get statistics about entropy responses.

    v23 Evolution: Automatic Entropy Response - provides aggregate
    statistics for monitoring and reporting.

    Args:
        job_hash: Optional job hash to filter by.

    Returns:
        Dict with response statistics.
    """
    where, params = self._where_job_hash(job_hash)
    with self._get_connection() as conn:
        cursor = conn.execute(
            f"""
            SELECT
                COUNT(*) as total,
                AVG(entropy_at_trigger) as avg_entropy,
                SUM(budget_boosted) as budget_boosts,
                SUM(quarantine_revisits) as total_revisits
            FROM entropy_responses
            {where}
            """,
            params,
        )

        row = cursor.fetchone()

        if not row or row["total"] == 0:
            return {
                "total_responses": 0,
                "avg_entropy_at_trigger": 0.0,
                "budget_boosts": 0,
                "quarantine_revisits": 0,
                "last_response": None,
            }

    # Get last response time
    last = self.get_last_entropy_response(job_hash)

    return {
        "total_responses": row["total"],
        "avg_entropy_at_trigger": row["avg_entropy"] or 0.0,
        "budget_boosts": row["budget_boosts"] or 0,
        "quarantine_revisits": row["total_revisits"] or 0,
        "last_response": last.recorded_at.isoformat() if last else None,
    }
calculate_pattern_entropy
calculate_pattern_entropy()

Calculate current Shannon entropy of the pattern population.

Queries all patterns with at least one application and computes Shannon entropy over the application-count distribution. This reuses the same algorithm as check_entropy_response_needed but returns a structured result for CLI display and recording.

Returns:

Type Description
PatternEntropyMetrics

PatternEntropyMetrics with the current entropy snapshot.

Source code in src/marianne/learning/store/budget.py
def calculate_pattern_entropy(self) -> PatternEntropyMetrics:
    """Calculate current Shannon entropy of the pattern population.

    Queries all patterns with at least one application and computes
    Shannon entropy over the application-count distribution. This
    reuses the same algorithm as ``check_entropy_response_needed``
    but returns a structured result for CLI display and recording.

    Returns:
        PatternEntropyMetrics with the current entropy snapshot.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT p.id, COUNT(pa.id) as app_count
            FROM patterns p
            LEFT JOIN pattern_applications pa ON p.id = pa.pattern_id
            GROUP BY p.id
            """
        )
        rows = cursor.fetchall()

    # Count totals
    unique_count = len(rows)
    effective_rows = [r for r in rows if r["app_count"] > 0]
    effective_count = len(effective_rows)
    total_apps = sum(r["app_count"] for r in rows)

    now = datetime.now()

    if total_apps == 0 or effective_count == 0:
        return PatternEntropyMetrics(
            calculated_at=now,
            shannon_entropy=0.0,
            max_possible_entropy=0.0,
            diversity_index=0.0,
            unique_pattern_count=unique_count,
            effective_pattern_count=0,
            total_applications=0,
            dominant_pattern_share=0.0,
        )

    # Shannon entropy: H = -sum(p_i * log2(p_i))
    probabilities = [r["app_count"] / total_apps for r in effective_rows]
    shannon_entropy = -sum(
        p * math.log2(p) for p in probabilities if p > 0
    )
    max_entropy = math.log2(effective_count) if effective_count > 1 else 1.0
    diversity_index = shannon_entropy / max_entropy if max_entropy > 0 else 0.0
    dominant_share = max(probabilities)

    return PatternEntropyMetrics(
        calculated_at=now,
        shannon_entropy=shannon_entropy,
        max_possible_entropy=max_entropy,
        diversity_index=diversity_index,
        unique_pattern_count=unique_count,
        effective_pattern_count=effective_count,
        total_applications=total_apps,
        dominant_pattern_share=dominant_share,
    )
record_pattern_entropy
record_pattern_entropy(metrics)

Persist a pattern entropy snapshot for historical trend analysis.

Parameters:

Name Type Description Default
metrics PatternEntropyMetrics

The entropy metrics to record.

required

Returns:

Type Description
str

The record ID of the persisted snapshot.

Source code in src/marianne/learning/store/budget.py
def record_pattern_entropy(self, metrics: PatternEntropyMetrics) -> str:
    """Persist a pattern entropy snapshot for historical trend analysis.

    Args:
        metrics: The entropy metrics to record.

    Returns:
        The record ID of the persisted snapshot.
    """
    record_id = str(uuid.uuid4())

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO pattern_entropy_history (
                id, calculated_at, shannon_entropy, max_possible_entropy,
                diversity_index, unique_pattern_count, effective_pattern_count,
                total_applications, dominant_pattern_share, threshold_exceeded
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                record_id,
                metrics.calculated_at.isoformat(),
                metrics.shannon_entropy,
                metrics.max_possible_entropy,
                metrics.diversity_index,
                metrics.unique_pattern_count,
                metrics.effective_pattern_count,
                metrics.total_applications,
                metrics.dominant_pattern_share,
                1 if metrics.threshold_exceeded else 0,
            ),
        )

    _logger.debug(
        f"Recorded entropy snapshot {record_id[:10]}: "
        f"H={metrics.shannon_entropy:.3f}, diversity={metrics.diversity_index:.3f}"
    )
    return record_id
get_pattern_entropy_history
get_pattern_entropy_history(limit=50)

Retrieve historical entropy snapshots for trend analysis.

Parameters:

Name Type Description Default
limit int

Maximum number of records to return.

50

Returns:

Type Description
list[PatternEntropyMetrics]

List of PatternEntropyMetrics, most recent first.

Source code in src/marianne/learning/store/budget.py
def get_pattern_entropy_history(
    self,
    limit: int = 50,
) -> list[PatternEntropyMetrics]:
    """Retrieve historical entropy snapshots for trend analysis.

    Args:
        limit: Maximum number of records to return.

    Returns:
        List of PatternEntropyMetrics, most recent first.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT calculated_at, shannon_entropy, max_possible_entropy,
                   diversity_index, unique_pattern_count, effective_pattern_count,
                   total_applications, dominant_pattern_share, threshold_exceeded
            FROM pattern_entropy_history
            ORDER BY calculated_at DESC
            LIMIT ?
            """,
            (limit,),
        )

        records: list[PatternEntropyMetrics] = []
        for row in cursor.fetchall():
            records.append(
                PatternEntropyMetrics(
                    calculated_at=datetime.fromisoformat(row["calculated_at"]),
                    shannon_entropy=row["shannon_entropy"],
                    max_possible_entropy=row["max_possible_entropy"],
                    diversity_index=row["diversity_index"],
                    unique_pattern_count=row["unique_pattern_count"],
                    effective_pattern_count=row["effective_pattern_count"],
                    total_applications=row["total_applications"],
                    dominant_pattern_share=row["dominant_pattern_share"],
                    threshold_exceeded=bool(row["threshold_exceeded"]),
                )
            )
        return records

DriftMetrics dataclass

DriftMetrics(pattern_id, pattern_name, window_size, effectiveness_before, effectiveness_after, grounding_confidence_avg, drift_magnitude, drift_direction, applications_analyzed, threshold_exceeded=False)

Metrics for pattern effectiveness drift detection.

v12 Evolution: Goal Drift Detection - tracks how pattern effectiveness changes over time to detect drifting patterns that may need attention.

Attributes
pattern_id instance-attribute
pattern_id

Pattern ID being analyzed.

pattern_name instance-attribute
pattern_name

Human-readable pattern name.

window_size instance-attribute
window_size

Number of applications in each comparison window.

effectiveness_before instance-attribute
effectiveness_before

Effectiveness score in the older window (applications N-2W to N-W).

effectiveness_after instance-attribute
effectiveness_after

Effectiveness score in the recent window (applications N-W to N).

grounding_confidence_avg instance-attribute
grounding_confidence_avg

Average grounding confidence across all applications in analysis.

drift_magnitude instance-attribute
drift_magnitude

Absolute magnitude of drift: |effectiveness_after - effectiveness_before|.

drift_direction instance-attribute
drift_direction

Direction of drift: 'positive', 'negative', or 'stable'.

applications_analyzed instance-attribute
applications_analyzed

Total number of applications analyzed (should be 2 × window_size).

threshold_exceeded class-attribute instance-attribute
threshold_exceeded = False

Whether drift_magnitude exceeds the alert threshold.

DriftMixin

Mixin providing drift detection and pattern retirement functionality.

This mixin provides methods for detecting effectiveness drift and epistemic drift in patterns, as well as automatic retirement of drifting patterns.

Effectiveness drift tracks changes in success rates over time. Epistemic drift tracks changes in confidence/belief levels over time.

Requires the following from the composed class
  • _get_connection() -> context manager yielding sqlite3.Connection
Functions
calculate_effectiveness_drift
calculate_effectiveness_drift(pattern_id, window_size=5, drift_threshold=0.2)

Calculate effectiveness drift for a pattern.

Compares the effectiveness of a pattern in its recent applications vs older applications to detect drift. Patterns that were once effective but are now declining may need investigation.

v12 Evolution: Goal Drift Detection - enables proactive pattern health monitoring.

Formula

drift = effectiveness_after - effectiveness_before drift_magnitude = |drift| weighted_drift = drift_magnitude / avg_grounding_confidence

A positive drift means the pattern is improving, negative means declining. The weighted drift amplifies the signal when grounding confidence is low.

Parameters:

Name Type Description Default
pattern_id str

Pattern to analyze.

required
window_size int

Number of applications per window (default 5). Total applications needed = 2 × window_size.

5
drift_threshold float

Threshold for flagging drift (default 0.2 = 20%).

0.2

Returns:

Type Description
DriftMetrics | None

DriftMetrics if enough data exists, None otherwise.

Source code in src/marianne/learning/store/drift.py
def calculate_effectiveness_drift(
    self,
    pattern_id: str,
    window_size: int = 5,
    drift_threshold: float = 0.2,
) -> DriftMetrics | None:
    """Calculate effectiveness drift for a pattern.

    Compares the effectiveness of a pattern in its recent applications
    vs older applications to detect drift. Patterns that were once
    effective but are now declining may need investigation.

    v12 Evolution: Goal Drift Detection - enables proactive pattern
    health monitoring.

    Formula:
        drift = effectiveness_after - effectiveness_before
        drift_magnitude = |drift|
        weighted_drift = drift_magnitude / avg_grounding_confidence

    A positive drift means the pattern is improving, negative means declining.
    The weighted drift amplifies the signal when grounding confidence is low.

    Args:
        pattern_id: Pattern to analyze.
        window_size: Number of applications per window (default 5).
                    Total applications needed = 2 × window_size.
        drift_threshold: Threshold for flagging drift (default 0.2 = 20%).

    Returns:
        DriftMetrics if enough data exists, None otherwise.
    """
    with self._get_connection() as conn:
        # Get pattern name
        cursor = conn.execute(
            "SELECT pattern_name FROM patterns WHERE id = ?",
            (pattern_id,),
        )
        row = cursor.fetchone()
        if not row:
            return None
        pattern_name = row["pattern_name"]

        # Fetch 2 × window_size recent applications
        # Ordered by applied_at DESC to get most recent first
        cursor = conn.execute(
            """
            SELECT pattern_led_to_success, grounding_confidence, applied_at
            FROM pattern_applications
            WHERE pattern_id = ?
            ORDER BY applied_at DESC
            LIMIT ?
            """,
            (pattern_id, window_size * 2),
        )
        applications = cursor.fetchall()

        # Need at least 2 × window_size applications for comparison
        if len(applications) < window_size * 2:
            _logger.debug(
                f"Pattern {pattern_id} has {len(applications)} applications, "
                f"need {window_size * 2} for drift analysis"
            )
            return None

        # Split into recent (first window_size) and older (second window_size)
        recent_apps = applications[:window_size]
        older_apps = applications[window_size : window_size * 2]

        # Calculate effectiveness for each window
        # effectiveness = success_rate with Laplace smoothing
        def calc_effectiveness(apps: list[dict[str, Any]]) -> tuple[float, list[float]]:
            successes = sum(1 for a in apps if a["pattern_led_to_success"])
            eff = (successes + 0.5) / (len(apps) + 1)  # Laplace smoothing
            grounding_vals = [
                a["grounding_confidence"]
                for a in apps
                if a["grounding_confidence"] is not None
            ]
            return eff, grounding_vals

        eff_after, grounding_recent = calc_effectiveness(recent_apps)
        eff_before, grounding_older = calc_effectiveness(older_apps)

        # Calculate average grounding confidence across all applications
        all_grounding = grounding_recent + grounding_older
        avg_grounding = sum(all_grounding) / len(all_grounding) if all_grounding else 1.0

        # Calculate drift
        drift = eff_after - eff_before
        drift_magnitude = abs(drift)

        # Determine direction
        if drift > 0.05:  # Small threshold to avoid noise
            drift_direction = "positive"
        elif drift < -0.05:
            drift_direction = "negative"
        else:
            drift_direction = "stable"

        # Check if threshold exceeded (weighted by grounding)
        # Lower grounding confidence amplifies the drift signal
        weighted_magnitude = drift_magnitude / max(avg_grounding, 0.5)
        threshold_exceeded = weighted_magnitude > drift_threshold

        return DriftMetrics(
            pattern_id=pattern_id,
            pattern_name=pattern_name,
            window_size=window_size,
            effectiveness_before=eff_before,
            effectiveness_after=eff_after,
            grounding_confidence_avg=avg_grounding,
            drift_magnitude=drift_magnitude,
            drift_direction=drift_direction,
            applications_analyzed=len(applications),
            threshold_exceeded=threshold_exceeded,
        )
get_drifting_patterns
get_drifting_patterns(drift_threshold=0.2, window_size=5, limit=20)

Get all patterns with significant drift.

Scans all patterns with enough application history and returns those that exceed the drift threshold.

v12 Evolution: Goal Drift Detection - enables CLI display of drifting patterns for operator review.

Parameters:

Name Type Description Default
drift_threshold float

Minimum drift to include (default 0.2).

0.2
window_size int

Applications per window (default 5).

5
limit int

Maximum patterns to return.

20

Returns:

Type Description
list[DriftMetrics]

List of DriftMetrics for drifting patterns, sorted by

list[DriftMetrics]

drift_magnitude descending.

Source code in src/marianne/learning/store/drift.py
def get_drifting_patterns(
    self,
    drift_threshold: float = 0.2,
    window_size: int = 5,
    limit: int = 20,
) -> list[DriftMetrics]:
    """Get all patterns with significant drift.

    Scans all patterns with enough application history and returns
    those that exceed the drift threshold.

    v12 Evolution: Goal Drift Detection - enables CLI display of
    drifting patterns for operator review.

    Args:
        drift_threshold: Minimum drift to include (default 0.2).
        window_size: Applications per window (default 5).
        limit: Maximum patterns to return.

    Returns:
        List of DriftMetrics for drifting patterns, sorted by
        drift_magnitude descending.
    """
    drifting: list[DriftMetrics] = []

    with self._get_connection() as conn:
        # Find patterns with at least 2 × window_size applications
        cursor = conn.execute(
            """
            SELECT pattern_id, COUNT(*) as app_count
            FROM pattern_applications
            GROUP BY pattern_id
            HAVING app_count >= ?
            """,
            (window_size * 2,),
        )
        pattern_ids = [row["pattern_id"] for row in cursor.fetchall()]

    # Calculate drift for each pattern
    for pattern_id in pattern_ids:
        metrics = self.calculate_effectiveness_drift(
            pattern_id=pattern_id,
            window_size=window_size,
            drift_threshold=drift_threshold,
        )
        if metrics and metrics.threshold_exceeded:
            drifting.append(metrics)

    # Sort by drift magnitude descending
    drifting.sort(key=lambda m: m.drift_magnitude, reverse=True)

    return drifting[:limit]
get_pattern_drift_summary
get_pattern_drift_summary()

Get a summary of pattern drift across all patterns.

Provides aggregate statistics for monitoring pattern health.

v12 Evolution: Goal Drift Detection - supports dashboard/reporting.

Returns:

Type Description
dict[str, Any]

Dict with drift statistics:

dict[str, Any]
  • total_patterns: Total patterns in the store
dict[str, Any]
  • patterns_analyzed: Patterns with enough history for analysis
dict[str, Any]
  • patterns_drifting: Patterns exceeding drift threshold
dict[str, Any]
  • avg_drift_magnitude: Average drift across analyzed patterns
dict[str, Any]
  • most_drifted: ID of pattern with highest drift
Source code in src/marianne/learning/store/drift.py
def get_pattern_drift_summary(self) -> dict[str, Any]:
    """Get a summary of pattern drift across all patterns.

    Provides aggregate statistics for monitoring pattern health.

    v12 Evolution: Goal Drift Detection - supports dashboard/reporting.

    Returns:
        Dict with drift statistics:
        - total_patterns: Total patterns in the store
        - patterns_analyzed: Patterns with enough history for analysis
        - patterns_drifting: Patterns exceeding drift threshold
        - avg_drift_magnitude: Average drift across analyzed patterns
        - most_drifted: ID of pattern with highest drift
    """
    with self._get_connection() as conn:
        # Total patterns
        cursor = conn.execute("SELECT COUNT(*) as count FROM patterns")
        total_patterns = cursor.fetchone()["count"]

        # Patterns with enough applications (10+ for analysis)
        cursor = conn.execute(
            """
            SELECT pattern_id, COUNT(*) as app_count
            FROM pattern_applications
            GROUP BY pattern_id
            HAVING app_count >= 10
            """
        )
        analyzable_patterns = [row["pattern_id"] for row in cursor.fetchall()]

    patterns_analyzed = len(analyzable_patterns)
    if patterns_analyzed == 0:
        return {
            "total_patterns": total_patterns,
            "patterns_analyzed": 0,
            "patterns_drifting": 0,
            "avg_drift_magnitude": 0.0,
            "most_drifted": None,
        }

    # Calculate drift for each
    all_metrics: list[DriftMetrics] = []
    for pattern_id in analyzable_patterns:
        metrics = self.calculate_effectiveness_drift(pattern_id)
        if metrics:
            all_metrics.append(metrics)

    if not all_metrics:
        return {
            "total_patterns": total_patterns,
            "patterns_analyzed": patterns_analyzed,
            "patterns_drifting": 0,
            "avg_drift_magnitude": 0.0,
            "most_drifted": None,
        }

    drifting_count = sum(1 for m in all_metrics if m.threshold_exceeded)
    avg_drift = sum(m.drift_magnitude for m in all_metrics) / len(all_metrics)
    most_drifted = max(all_metrics, key=lambda m: m.drift_magnitude)

    return {
        "total_patterns": total_patterns,
        "patterns_analyzed": len(all_metrics),
        "patterns_drifting": drifting_count,
        "avg_drift_magnitude": avg_drift,
        "most_drifted": most_drifted.pattern_id if most_drifted else None,
    }
calculate_epistemic_drift
calculate_epistemic_drift(pattern_id, window_size=5, drift_threshold=0.15)

Calculate epistemic drift for a pattern - how belief/confidence changes over time.

Unlike effectiveness drift (which tracks outcome success rates), epistemic drift tracks how our CONFIDENCE in the pattern changes. This enables detecting belief degradation before effectiveness actually declines.

v21 Evolution: Epistemic Drift Detection - complements effectiveness drift with belief-level monitoring.

Formula

belief_change = avg_confidence_after - avg_confidence_before belief_entropy = std_dev(all_confidence_values) / mean(all_confidence_values) weighted_change = |belief_change| × (1 + belief_entropy)

A positive belief_change means growing confidence, negative means declining. High entropy indicates unstable beliefs (variance in confidence).

Parameters:

Name Type Description Default
pattern_id str

Pattern to analyze.

required
window_size int

Number of applications per window (default 5). Total applications needed = 2 × window_size.

5
drift_threshold float

Threshold for flagging epistemic drift (default 0.15 = 15%).

0.15

Returns:

Type Description
EpistemicDriftMetrics | None

EpistemicDriftMetrics if enough data exists, None otherwise.

Source code in src/marianne/learning/store/drift.py
def calculate_epistemic_drift(
    self,
    pattern_id: str,
    window_size: int = 5,
    drift_threshold: float = 0.15,
) -> EpistemicDriftMetrics | None:
    """Calculate epistemic drift for a pattern - how belief/confidence changes over time.

    Unlike effectiveness drift (which tracks outcome success rates), epistemic drift
    tracks how our CONFIDENCE in the pattern changes. This enables detecting belief
    degradation before effectiveness actually declines.

    v21 Evolution: Epistemic Drift Detection - complements effectiveness drift
    with belief-level monitoring.

    Formula:
        belief_change = avg_confidence_after - avg_confidence_before
        belief_entropy = std_dev(all_confidence_values) / mean(all_confidence_values)
        weighted_change = |belief_change| × (1 + belief_entropy)

    A positive belief_change means growing confidence, negative means declining.
    High entropy indicates unstable beliefs (variance in confidence).

    Args:
        pattern_id: Pattern to analyze.
        window_size: Number of applications per window (default 5).
                    Total applications needed = 2 × window_size.
        drift_threshold: Threshold for flagging epistemic drift (default 0.15 = 15%).

    Returns:
        EpistemicDriftMetrics if enough data exists, None otherwise.
    """
    with self._get_connection() as conn:
        # Get pattern name
        cursor = conn.execute(
            "SELECT pattern_name FROM patterns WHERE id = ?",
            (pattern_id,),
        )
        row = cursor.fetchone()
        if not row:
            return None
        pattern_name = row["pattern_name"]

        # Fetch 2 × window_size recent applications with grounding confidence
        cursor = conn.execute(
            """
            SELECT grounding_confidence, applied_at
            FROM pattern_applications
            WHERE pattern_id = ? AND grounding_confidence IS NOT NULL
            ORDER BY applied_at DESC
            LIMIT ?
            """,
            (pattern_id, window_size * 2),
        )
        applications = cursor.fetchall()

        # Need at least 2 × window_size applications for comparison
        if len(applications) < window_size * 2:
            _logger.debug(
                f"Pattern {pattern_id} has {len(applications)} applications with confidence, "
                f"need {window_size * 2} for epistemic drift analysis"
            )
            return None

        # Split into recent (first window_size) and older (second window_size)
        recent_apps = applications[:window_size]
        older_apps = applications[window_size : window_size * 2]

        # Extract confidence values
        recent_confidences = [a["grounding_confidence"] for a in recent_apps]
        older_confidences = [a["grounding_confidence"] for a in older_apps]
        all_confidences = recent_confidences + older_confidences

        # Calculate average confidence for each window
        avg_confidence_after = sum(recent_confidences) / len(recent_confidences)
        avg_confidence_before = sum(older_confidences) / len(older_confidences)

        # Calculate belief change
        belief_change = avg_confidence_after - avg_confidence_before

        # Calculate belief entropy (normalized standard deviation)
        mean_confidence = sum(all_confidences) / len(all_confidences)
        if mean_confidence > 0:
            variance = sum(
                (c - mean_confidence) ** 2 for c in all_confidences
            ) / len(all_confidences)
            std_dev = math.sqrt(variance)
            # Normalize by mean to get coefficient of variation
            belief_entropy = min(1.0, std_dev / mean_confidence)
        else:
            belief_entropy = 0.0

        # Determine direction
        if belief_change > 0.05:  # Small threshold to avoid noise
            drift_direction = "strengthening"
        elif belief_change < -0.05:
            drift_direction = "weakening"
        else:
            drift_direction = "stable"

        # Check if threshold exceeded (weighted by entropy)
        # High entropy amplifies the signal - unstable beliefs are concerning
        weighted_change = abs(belief_change) * (1 + belief_entropy)
        threshold_exceeded = weighted_change > drift_threshold

        return EpistemicDriftMetrics(
            pattern_id=pattern_id,
            pattern_name=pattern_name,
            window_size=window_size,
            confidence_before=avg_confidence_before,
            confidence_after=avg_confidence_after,
            belief_change=belief_change,
            belief_entropy=belief_entropy,
            applications_analyzed=len(applications),
            threshold_exceeded=threshold_exceeded,
            drift_direction=drift_direction,
        )
get_epistemic_drifting_patterns
get_epistemic_drifting_patterns(drift_threshold=0.15, window_size=5, limit=20)

Get all patterns with significant epistemic drift.

Scans all patterns with enough application history and returns those that exceed the epistemic drift threshold.

v21 Evolution: Epistemic Drift Detection - enables CLI display of patterns with changing beliefs for operator review.

Parameters:

Name Type Description Default
drift_threshold float

Minimum epistemic drift to include (default 0.15).

0.15
window_size int

Applications per window (default 5).

5
limit int

Maximum patterns to return.

20

Returns:

Type Description
list[EpistemicDriftMetrics]

List of EpistemicDriftMetrics for drifting patterns, sorted by

list[EpistemicDriftMetrics]

belief_change magnitude descending.

Source code in src/marianne/learning/store/drift.py
def get_epistemic_drifting_patterns(
    self,
    drift_threshold: float = 0.15,
    window_size: int = 5,
    limit: int = 20,
) -> list[EpistemicDriftMetrics]:
    """Get all patterns with significant epistemic drift.

    Scans all patterns with enough application history and returns
    those that exceed the epistemic drift threshold.

    v21 Evolution: Epistemic Drift Detection - enables CLI display of
    patterns with changing beliefs for operator review.

    Args:
        drift_threshold: Minimum epistemic drift to include (default 0.15).
        window_size: Applications per window (default 5).
        limit: Maximum patterns to return.

    Returns:
        List of EpistemicDriftMetrics for drifting patterns, sorted by
        belief_change magnitude descending.
    """
    drifting: list[EpistemicDriftMetrics] = []

    with self._get_connection() as conn:
        # Find patterns with at least 2 × window_size applications WITH confidence
        cursor = conn.execute(
            """
            SELECT pattern_id, COUNT(*) as app_count
            FROM pattern_applications
            WHERE grounding_confidence IS NOT NULL
            GROUP BY pattern_id
            HAVING app_count >= ?
            """,
            (window_size * 2,),
        )
        pattern_ids = [row["pattern_id"] for row in cursor.fetchall()]

    # Calculate epistemic drift for each pattern
    for pattern_id in pattern_ids:
        metrics = self.calculate_epistemic_drift(
            pattern_id=pattern_id,
            window_size=window_size,
            drift_threshold=drift_threshold,
        )
        if metrics and metrics.threshold_exceeded:
            drifting.append(metrics)

    # Sort by belief change magnitude descending
    drifting.sort(key=lambda m: abs(m.belief_change), reverse=True)

    return drifting[:limit]
get_epistemic_drift_summary
get_epistemic_drift_summary()

Get a summary of epistemic drift across all patterns.

Provides aggregate statistics for monitoring belief/confidence health.

v21 Evolution: Epistemic Drift Detection - supports dashboard/reporting.

Returns:

Type Description
dict[str, Any]

Dict with epistemic drift statistics:

dict[str, Any]
  • total_patterns: Total patterns in the store
dict[str, Any]
  • patterns_analyzed: Patterns with enough confidence history for analysis
dict[str, Any]
  • patterns_with_epistemic_drift: Patterns exceeding epistemic drift threshold
dict[str, Any]
  • avg_belief_change: Average belief change across analyzed patterns
dict[str, Any]
  • avg_belief_entropy: Average belief entropy (stability measure)
dict[str, Any]
  • most_unstable: ID of pattern with highest epistemic drift
Source code in src/marianne/learning/store/drift.py
def get_epistemic_drift_summary(self) -> dict[str, Any]:
    """Get a summary of epistemic drift across all patterns.

    Provides aggregate statistics for monitoring belief/confidence health.

    v21 Evolution: Epistemic Drift Detection - supports dashboard/reporting.

    Returns:
        Dict with epistemic drift statistics:
        - total_patterns: Total patterns in the store
        - patterns_analyzed: Patterns with enough confidence history for analysis
        - patterns_with_epistemic_drift: Patterns exceeding epistemic drift threshold
        - avg_belief_change: Average belief change across analyzed patterns
        - avg_belief_entropy: Average belief entropy (stability measure)
        - most_unstable: ID of pattern with highest epistemic drift
    """
    with self._get_connection() as conn:
        # Total patterns
        cursor = conn.execute("SELECT COUNT(*) as count FROM patterns")
        total_patterns = cursor.fetchone()["count"]

        # Patterns with enough applications with confidence (10+ for analysis)
        cursor = conn.execute(
            """
            SELECT pattern_id, COUNT(*) as app_count
            FROM pattern_applications
            WHERE grounding_confidence IS NOT NULL
            GROUP BY pattern_id
            HAVING app_count >= 10
            """
        )
        analyzable_patterns = [row["pattern_id"] for row in cursor.fetchall()]

    patterns_analyzed = len(analyzable_patterns)
    if patterns_analyzed == 0:
        return {
            "total_patterns": total_patterns,
            "patterns_analyzed": 0,
            "patterns_with_epistemic_drift": 0,
            "avg_belief_change": 0.0,
            "avg_belief_entropy": 0.0,
            "most_unstable": None,
        }

    # Calculate epistemic drift for each
    all_metrics: list[EpistemicDriftMetrics] = []
    for pattern_id in analyzable_patterns:
        metrics = self.calculate_epistemic_drift(pattern_id)
        if metrics:
            all_metrics.append(metrics)

    if not all_metrics:
        return {
            "total_patterns": total_patterns,
            "patterns_analyzed": patterns_analyzed,
            "patterns_with_epistemic_drift": 0,
            "avg_belief_change": 0.0,
            "avg_belief_entropy": 0.0,
            "most_unstable": None,
        }

    drifting_count = sum(1 for m in all_metrics if m.threshold_exceeded)
    avg_change = sum(m.belief_change for m in all_metrics) / len(all_metrics)
    avg_entropy = sum(m.belief_entropy for m in all_metrics) / len(all_metrics)
    most_unstable = max(all_metrics, key=lambda m: abs(m.belief_change))

    return {
        "total_patterns": total_patterns,
        "patterns_analyzed": len(all_metrics),
        "patterns_with_epistemic_drift": drifting_count,
        "avg_belief_change": avg_change,
        "avg_belief_entropy": avg_entropy,
        "most_unstable": most_unstable.pattern_id if most_unstable else None,
    }
retire_drifting_patterns
retire_drifting_patterns(drift_threshold=0.2, window_size=5, require_negative_drift=True)

Retire patterns that are drifting negatively.

Connects the drift detection infrastructure (DriftMetrics) to action. Patterns that have drifted significantly AND in a negative direction are retired by setting their priority_score to 0.

v14 Evolution: Pattern Auto-Retirement - enables automated pattern lifecycle management based on empirical effectiveness drift.

Parameters:

Name Type Description Default
drift_threshold float

Minimum drift magnitude to consider (default 0.2).

0.2
window_size int

Applications per window for drift calculation.

5
require_negative_drift bool

If True, only retire patterns with negative drift (getting worse). If False, also retire patterns with positive anomalous drift.

True

Returns:

Type Description
list[tuple[str, str, float]]

List of (pattern_id, pattern_name, drift_magnitude) tuples for

list[tuple[str, str, float]]

patterns that were retired.

Source code in src/marianne/learning/store/drift.py
def retire_drifting_patterns(
    self,
    drift_threshold: float = 0.2,
    window_size: int = 5,
    require_negative_drift: bool = True,
) -> list[tuple[str, str, float]]:
    """Retire patterns that are drifting negatively.

    Connects the drift detection infrastructure (DriftMetrics) to action.
    Patterns that have drifted significantly AND in a negative direction
    are retired by setting their priority_score to 0.

    v14 Evolution: Pattern Auto-Retirement - enables automated pattern
    lifecycle management based on empirical effectiveness drift.

    Args:
        drift_threshold: Minimum drift magnitude to consider (default 0.2).
        window_size: Applications per window for drift calculation.
        require_negative_drift: If True, only retire patterns with
            negative drift (getting worse). If False, also retire
            patterns with positive anomalous drift.

    Returns:
        List of (pattern_id, pattern_name, drift_magnitude) tuples for
        patterns that were retired.
    """
    retired: list[tuple[str, str, float]] = []

    # Get all patterns exceeding drift threshold
    drifting = self.get_drifting_patterns(
        drift_threshold=drift_threshold,
        window_size=window_size,
        limit=100,  # Process up to 100 drifting patterns
    )

    if not drifting:
        _logger.debug("No drifting patterns found - nothing to retire")
        return retired

    with self._get_connection() as conn:
        for metrics in drifting:
            # Only retire if negative drift (getting worse)
            if require_negative_drift and metrics.drift_direction != "negative":
                _logger.debug(
                    f"Skipping {metrics.pattern_name}: drift is {metrics.drift_direction}, "
                    f"not negative"
                )
                continue

            # threshold_exceeded should already be True from get_drifting_patterns()
            # but double-check for safety
            if not metrics.threshold_exceeded:
                continue

            # Retire by setting priority_score to 0
            # Also update suggested_action to document the retirement
            retirement_reason = (
                f"Auto-retired: drift {metrics.drift_direction} "
                f"({metrics.drift_magnitude:.2f}), "
                f"effectiveness {metrics.effectiveness_before:.2f} → "
                f"{metrics.effectiveness_after:.2f}"
            )

            conn.execute(
                """
                UPDATE patterns
                SET priority_score = 0,
                    suggested_action = ?
                WHERE id = ?
                """,
                (retirement_reason, metrics.pattern_id),
            )

            retired.append((
                metrics.pattern_id,
                metrics.pattern_name,
                metrics.drift_magnitude,
            ))

            _logger.info(
                f"Retired pattern '{metrics.pattern_name}': {retirement_reason}"
            )

    if retired:
        _logger.info(
            f"Pattern auto-retirement complete: {len(retired)} patterns retired"
        )

    return retired
get_retired_patterns
get_retired_patterns(limit=50)

Get patterns that have been retired (priority_score = 0).

Returns patterns that were retired through auto-retirement or manual deprecation, useful for review and potential recovery.

Parameters:

Name Type Description Default
limit int

Maximum number of patterns to return.

50

Returns:

Type Description
list[PatternRecord]

List of PatternRecord objects with priority_score = 0.

Source code in src/marianne/learning/store/drift.py
def get_retired_patterns(self, limit: int = 50) -> list[PatternRecord]:
    """Get patterns that have been retired (priority_score = 0).

    Returns patterns that were retired through auto-retirement or
    manual deprecation, useful for review and potential recovery.

    Args:
        limit: Maximum number of patterns to return.

    Returns:
        List of PatternRecord objects with priority_score = 0.
    """
    from .models import QuarantineStatus, SuccessFactors

    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT * FROM patterns
            WHERE priority_score = 0
            ORDER BY last_seen DESC
            LIMIT ?
            """,
            (limit,),
        )

        records = []
        for row in cursor.fetchall():
            # Construct PatternRecord with all v19/v22 fields
            records.append(
                PatternRecord(
                    id=row["id"],
                    pattern_type=row["pattern_type"],
                    pattern_name=row["pattern_name"],
                    description=row["description"],
                    occurrence_count=row["occurrence_count"],
                    first_seen=datetime.fromisoformat(row["first_seen"]),
                    last_seen=datetime.fromisoformat(row["last_seen"]),
                    last_confirmed=datetime.fromisoformat(row["last_confirmed"]),
                    led_to_success_count=row["led_to_success_count"],
                    led_to_failure_count=row["led_to_failure_count"],
                    effectiveness_score=row["effectiveness_score"],
                    variance=row["variance"],
                    suggested_action=row["suggested_action"],
                    context_tags=json.loads(row["context_tags"] or "[]"),
                    priority_score=row["priority_score"],
                    # v19: Quarantine & Provenance fields
                    quarantine_status=QuarantineStatus(row["quarantine_status"])
                    if row["quarantine_status"]
                    else QuarantineStatus.PENDING,
                    provenance_job_hash=row["provenance_job_hash"],
                    provenance_sheet_num=row["provenance_sheet_num"],
                    quarantined_at=datetime.fromisoformat(row["quarantined_at"])
                    if row["quarantined_at"]
                    else None,
                    validated_at=datetime.fromisoformat(row["validated_at"])
                    if row["validated_at"]
                    else None,
                    quarantine_reason=row["quarantine_reason"],
                    # v19: Trust Scoring fields
                    trust_score=row["trust_score"] if row["trust_score"] is not None else 0.5,
                    trust_calculation_date=datetime.fromisoformat(row["trust_calculation_date"])
                    if row["trust_calculation_date"]
                    else None,
                    # v22: Metacognitive Pattern Reflection fields
                    success_factors=SuccessFactors.from_dict(json.loads(row["success_factors"]))
                    if row["success_factors"]
                    else None,
                    success_factors_updated_at=datetime.fromisoformat(row["success_factors_updated_at"])
                    if row["success_factors_updated_at"]
                    else None,
                )
            )

        return records
record_evolution_entry
record_evolution_entry(cycle=None, evolutions_completed=None, evolutions_deferred=None, issue_classes=None, cv_avg=None, implementation_loc=None, test_loc=None, loc_accuracy=None, research_candidates_resolved=0, research_candidates_created=0, notes='', *, entry=None)

Record an evolution cycle entry to the trajectory.

v16 Evolution: Evolution Trajectory Tracking - enables Marianne to track its own evolution history for recursive self-improvement analysis.

Accepts either individual keyword args (backward compatible) or a bundled EvolutionEntryInput dataclass via the entry kwarg.

Parameters:

Name Type Description Default
cycle int | None

Evolution cycle number (e.g., 16 for v16).

None
evolutions_completed int | None

Number of evolutions completed in this cycle.

None
evolutions_deferred int | None

Number of evolutions deferred in this cycle.

None
issue_classes list[str] | None

Issue classes addressed (e.g., ['infrastructure_activation']).

None
cv_avg float | None

Average Consciousness Volume of selected evolutions.

None
implementation_loc int | None

Total implementation LOC for this cycle.

None
test_loc int | None

Total test LOC for this cycle.

None
loc_accuracy float | None

LOC estimation accuracy (actual/estimated as ratio).

None
research_candidates_resolved int

Number of research candidates resolved.

0
research_candidates_created int

Number of new research candidates created.

0
notes str

Optional notes about this evolution cycle.

''
entry EvolutionEntryInput | None

Bundled input parameters (overrides individual args if provided).

None

Returns:

Type Description
str

The ID of the created trajectory entry.

Raises:

Type Description
IntegrityError

If an entry for this cycle already exists.

Source code in src/marianne/learning/store/drift.py
def record_evolution_entry(
    self,
    cycle: int | None = None,
    evolutions_completed: int | None = None,
    evolutions_deferred: int | None = None,
    issue_classes: list[str] | None = None,
    cv_avg: float | None = None,
    implementation_loc: int | None = None,
    test_loc: int | None = None,
    loc_accuracy: float | None = None,
    research_candidates_resolved: int = 0,
    research_candidates_created: int = 0,
    notes: str = "",
    *,
    entry: EvolutionEntryInput | None = None,
) -> str:
    """Record an evolution cycle entry to the trajectory.

    v16 Evolution: Evolution Trajectory Tracking - enables Marianne to track
    its own evolution history for recursive self-improvement analysis.

    Accepts either individual keyword args (backward compatible) or
    a bundled EvolutionEntryInput dataclass via the ``entry`` kwarg.

    Args:
        cycle: Evolution cycle number (e.g., 16 for v16).
        evolutions_completed: Number of evolutions completed in this cycle.
        evolutions_deferred: Number of evolutions deferred in this cycle.
        issue_classes: Issue classes addressed (e.g., ['infrastructure_activation']).
        cv_avg: Average Consciousness Volume of selected evolutions.
        implementation_loc: Total implementation LOC for this cycle.
        test_loc: Total test LOC for this cycle.
        loc_accuracy: LOC estimation accuracy (actual/estimated as ratio).
        research_candidates_resolved: Number of research candidates resolved.
        research_candidates_created: Number of new research candidates created.
        notes: Optional notes about this evolution cycle.
        entry: Bundled input parameters (overrides individual args if provided).

    Returns:
        The ID of the created trajectory entry.

    Raises:
        sqlite3.IntegrityError: If an entry for this cycle already exists.
    """
    import uuid

    if entry is not None:
        cycle = entry.cycle
        evolutions_completed = entry.evolutions_completed
        evolutions_deferred = entry.evolutions_deferred
        issue_classes = entry.issue_classes
        cv_avg = entry.cv_avg
        implementation_loc = entry.implementation_loc
        test_loc = entry.test_loc
        loc_accuracy = entry.loc_accuracy
        research_candidates_resolved = entry.research_candidates_resolved
        research_candidates_created = entry.research_candidates_created
        notes = entry.notes

    # Validate required fields
    if cycle is None or evolutions_completed is None or evolutions_deferred is None:
        raise TypeError("cycle, evolutions_completed, evolutions_deferred are required")
    if issue_classes is None or cv_avg is None:
        raise TypeError("issue_classes, cv_avg are required")
    if implementation_loc is None or test_loc is None or loc_accuracy is None:
        raise TypeError("implementation_loc, test_loc, loc_accuracy are required")

    entry_id = str(uuid.uuid4())
    now = datetime.now()

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO evolution_trajectory (
                id, cycle, recorded_at, evolutions_completed, evolutions_deferred,
                issue_classes, cv_avg, implementation_loc, test_loc, loc_accuracy,
                research_candidates_resolved, research_candidates_created, notes
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                entry_id,
                cycle,
                now.isoformat(),
                evolutions_completed,
                evolutions_deferred,
                json.dumps(issue_classes),
                cv_avg,
                implementation_loc,
                test_loc,
                loc_accuracy,
                research_candidates_resolved,
                research_candidates_created,
                notes,
            ),
        )

    _logger.info(
        f"Recorded evolution trajectory entry for cycle {cycle}: "
        f"{evolutions_completed} completed, {evolutions_deferred} deferred"
    )

    return entry_id
get_trajectory
get_trajectory(start_cycle=None, end_cycle=None, limit=50)

Retrieve evolution trajectory history.

v16 Evolution: Evolution Trajectory Tracking - enables analysis of Marianne's evolution history over time.

Parameters:

Name Type Description Default
start_cycle int | None

Optional minimum cycle number to include.

None
end_cycle int | None

Optional maximum cycle number to include.

None
limit int

Maximum number of entries to return (default: 50).

50

Returns:

Type Description
list[EvolutionTrajectoryEntry]

List of EvolutionTrajectoryEntry objects, ordered by cycle descending.

Source code in src/marianne/learning/store/drift.py
def get_trajectory(
    self,
    start_cycle: int | None = None,
    end_cycle: int | None = None,
    limit: int = 50,
) -> list[EvolutionTrajectoryEntry]:
    """Retrieve evolution trajectory history.

    v16 Evolution: Evolution Trajectory Tracking - enables analysis of
    Marianne's evolution history over time.

    Args:
        start_cycle: Optional minimum cycle number to include.
        end_cycle: Optional maximum cycle number to include.
        limit: Maximum number of entries to return (default: 50).

    Returns:
        List of EvolutionTrajectoryEntry objects, ordered by cycle descending.
    """
    with self._get_connection() as conn:
        query = "SELECT * FROM evolution_trajectory WHERE 1=1"
        params: list[int] = []

        if start_cycle is not None:
            query += " AND cycle >= ?"
            params.append(start_cycle)

        if end_cycle is not None:
            query += " AND cycle <= ?"
            params.append(end_cycle)

        query += " ORDER BY cycle DESC LIMIT ?"
        params.append(limit)

        cursor = conn.execute(query, params)
        entries = []

        for row in cursor.fetchall():
            entries.append(
                EvolutionTrajectoryEntry(
                    id=row["id"],
                    cycle=row["cycle"],
                    recorded_at=datetime.fromisoformat(row["recorded_at"]),
                    evolutions_completed=row["evolutions_completed"],
                    evolutions_deferred=row["evolutions_deferred"],
                    issue_classes=json.loads(row["issue_classes"]),
                    cv_avg=row["cv_avg"],
                    implementation_loc=row["implementation_loc"],
                    test_loc=row["test_loc"],
                    loc_accuracy=row["loc_accuracy"],
                    research_candidates_resolved=row["research_candidates_resolved"] or 0,
                    research_candidates_created=row["research_candidates_created"] or 0,
                    notes=row["notes"] or "",
                )
            )

        return entries
get_recurring_issues
get_recurring_issues(min_occurrences=2, window_cycles=None)

Identify recurring issue classes across evolution cycles.

v16 Evolution: Evolution Trajectory Tracking - enables identification of patterns in what types of issues Marianne addresses repeatedly.

Parameters:

Name Type Description Default
min_occurrences int

Minimum number of occurrences to consider recurring.

2
window_cycles int | None

Optional limit to analyze only recent N cycles.

None

Returns:

Type Description
dict[str, list[int]]

Dict mapping issue class names to list of cycles where they appeared.

dict[str, list[int]]

Only includes issue classes that meet the min_occurrences threshold.

Source code in src/marianne/learning/store/drift.py
def get_recurring_issues(
    self,
    min_occurrences: int = 2,
    window_cycles: int | None = None,
) -> dict[str, list[int]]:
    """Identify recurring issue classes across evolution cycles.

    v16 Evolution: Evolution Trajectory Tracking - enables identification
    of patterns in what types of issues Marianne addresses repeatedly.

    Args:
        min_occurrences: Minimum number of occurrences to consider recurring.
        window_cycles: Optional limit to analyze only recent N cycles.

    Returns:
        Dict mapping issue class names to list of cycles where they appeared.
        Only includes issue classes that meet the min_occurrences threshold.
    """
    with self._get_connection() as conn:
        query = "SELECT cycle, issue_classes FROM evolution_trajectory"
        params: list[int] = []

        if window_cycles is not None:
            # Get recent N cycles
            query += " ORDER BY cycle DESC LIMIT ?"
            params.append(window_cycles)
        else:
            query += " ORDER BY cycle DESC"

        cursor = conn.execute(query, params)

        # Count issue class occurrences
        issue_cycles: dict[str, list[int]] = {}

        for row in cursor.fetchall():
            cycle = row["cycle"]
            issues = json.loads(row["issue_classes"])

            for issue in issues:
                if issue not in issue_cycles:
                    issue_cycles[issue] = []
                issue_cycles[issue].append(cycle)

        # Filter by min_occurrences
        recurring = {
            issue: sorted(cycles, reverse=True)
            for issue, cycles in issue_cycles.items()
            if len(cycles) >= min_occurrences
        }

        return recurring
record_evolution_cycle
record_evolution_cycle(cycle_number, candidates_generated, candidates_applied, changes_summary, outcome, learning_snapshot)

Record evolution cycle metadata to trajectory table.

v25 Evolution: Simplified wrapper for recording evolution cycles with essential metadata. Maps to the more detailed record_evolution_entry() internal method.

This method provides a simpler interface focused on what evolution cycles need to record: how many candidates were generated/applied, what changed, and the outcome.

Parameters:

Name Type Description Default
cycle_number int

Evolution cycle number (e.g., 25 for v25).

required
candidates_generated int

Number of evolution candidates generated.

required
candidates_applied int

Number of candidates successfully applied.

required
changes_summary str

Git diff summary or description of changes.

required
outcome Literal['SUCCESS', 'PARTIAL', 'DEFERRED']

Evolution outcome - SUCCESS, PARTIAL, or DEFERRED.

required
learning_snapshot dict[str, Any]

Dict containing learning metrics at time of cycle.

required

Returns:

Type Description
str

The ID of the created trajectory entry.

Raises:

Type Description
IntegrityError

If an entry for this cycle already exists.

Example

store = GlobalLearningStore() entry_id = store.record_evolution_cycle( ... cycle_number=25, ... candidates_generated=5, ... candidates_applied=3, ... changes_summary="Fixed learning export, wired pattern lifecycle", ... outcome="SUCCESS", ... learning_snapshot={ ... "patterns": 6, ... "entropy": 0.000, ... "recovery_rate": 0.0 ... } ... )

Source code in src/marianne/learning/store/drift.py
def record_evolution_cycle(
    self,
    cycle_number: int,
    candidates_generated: int,
    candidates_applied: int,
    changes_summary: str,
    outcome: Literal["SUCCESS", "PARTIAL", "DEFERRED"],
    learning_snapshot: dict[str, Any],
) -> str:
    """Record evolution cycle metadata to trajectory table.

    v25 Evolution: Simplified wrapper for recording evolution cycles with
    essential metadata. Maps to the more detailed record_evolution_entry()
    internal method.

    This method provides a simpler interface focused on what evolution
    cycles need to record: how many candidates were generated/applied,
    what changed, and the outcome.

    Args:
        cycle_number: Evolution cycle number (e.g., 25 for v25).
        candidates_generated: Number of evolution candidates generated.
        candidates_applied: Number of candidates successfully applied.
        changes_summary: Git diff summary or description of changes.
        outcome: Evolution outcome - SUCCESS, PARTIAL, or DEFERRED.
        learning_snapshot: Dict containing learning metrics at time of cycle.

    Returns:
        The ID of the created trajectory entry.

    Raises:
        sqlite3.IntegrityError: If an entry for this cycle already exists.

    Example:
        >>> store = GlobalLearningStore()
        >>> entry_id = store.record_evolution_cycle(
        ...     cycle_number=25,
        ...     candidates_generated=5,
        ...     candidates_applied=3,
        ...     changes_summary="Fixed learning export, wired pattern lifecycle",
        ...     outcome="SUCCESS",
        ...     learning_snapshot={
        ...         "patterns": 6,
        ...         "entropy": 0.000,
        ...         "recovery_rate": 0.0
        ...     }
        ... )
    """
    # Map to the detailed record_evolution_entry method
    # Use defaults for fields not in the simplified interface
    return self.record_evolution_entry(
        cycle=cycle_number,
        evolutions_completed=candidates_applied,
        evolutions_deferred=candidates_generated - candidates_applied,
        issue_classes=[outcome.lower()],
        cv_avg=0.0,  # Not used in new cycles
        implementation_loc=0,  # Can be computed from git diff if needed
        test_loc=0,
        loc_accuracy=1.0,  # Default to perfect accuracy
        research_candidates_resolved=0,
        research_candidates_created=0,
        notes=(
            f"{changes_summary}\n\nOutcome: {outcome}"
            f"\nLearning snapshot: {json.dumps(learning_snapshot)}"
        ),
    )
get_evolution_history
get_evolution_history(last_n=10)

Retrieve last N evolution cycles for context.

v25 Evolution: Simplified wrapper for retrieving evolution history. Maps to the more detailed get_trajectory() method.

This provides a simpler interface focused on getting recent evolution history for context in future cycles.

Parameters:

Name Type Description Default
last_n int

Number of recent cycles to retrieve (default: 10).

10

Returns:

Type Description
list[EvolutionTrajectoryEntry]

List of EvolutionTrajectoryEntry objects, ordered by cycle descending

list[EvolutionTrajectoryEntry]

(most recent first).

Example

store = GlobalLearningStore() recent_cycles = store.get_evolution_history(last_n=5) for entry in recent_cycles: ... print(f"Cycle {entry.cycle}: {entry.evolutions_completed} completed")

Source code in src/marianne/learning/store/drift.py
def get_evolution_history(self, last_n: int = 10) -> list[EvolutionTrajectoryEntry]:
    """Retrieve last N evolution cycles for context.

    v25 Evolution: Simplified wrapper for retrieving evolution history.
    Maps to the more detailed get_trajectory() method.

    This provides a simpler interface focused on getting recent evolution
    history for context in future cycles.

    Args:
        last_n: Number of recent cycles to retrieve (default: 10).

    Returns:
        List of EvolutionTrajectoryEntry objects, ordered by cycle descending
        (most recent first).

    Example:
        >>> store = GlobalLearningStore()
        >>> recent_cycles = store.get_evolution_history(last_n=5)
        >>> for entry in recent_cycles:
        ...     print(f"Cycle {entry.cycle}: {entry.evolutions_completed} completed")
    """
    return self.get_trajectory(limit=last_n)

EntropyResponseRecord dataclass

EntropyResponseRecord(id, job_hash, recorded_at, entropy_at_trigger, threshold_used, actions_taken, budget_boosted=False, quarantine_revisits=0, patterns_revisited=list())

A record of an automatic entropy response event.

v23 Evolution: Automatic Entropy Response - records when the system automatically responded to low entropy conditions by injecting diversity.

Attributes
id instance-attribute
id

Unique identifier for this response record.

job_hash instance-attribute
job_hash

Hash of the job that triggered this response.

recorded_at instance-attribute
recorded_at

When this response was triggered.

entropy_at_trigger instance-attribute
entropy_at_trigger

The entropy value that triggered this response.

threshold_used instance-attribute
threshold_used

The threshold that was crossed.

actions_taken instance-attribute
actions_taken

List of actions taken: 'budget_boost', 'quarantine_revisit', etc.

budget_boosted class-attribute instance-attribute
budget_boosted = False

Whether the exploration budget was boosted.

quarantine_revisits class-attribute instance-attribute
quarantine_revisits = 0

Number of quarantined patterns revisited.

patterns_revisited class-attribute instance-attribute
patterns_revisited = field(default_factory=list)

IDs of patterns that were marked for revisit.

EpistemicDriftMetrics dataclass

EpistemicDriftMetrics(pattern_id, pattern_name, window_size, confidence_before, confidence_after, belief_change, belief_entropy, applications_analyzed, threshold_exceeded=False, drift_direction='stable')

Metrics for epistemic drift detection - tracking belief changes about patterns.

v21 Evolution: Epistemic Drift Detection - tracks how confidence/belief in patterns changes over time, complementing effectiveness drift. While effectiveness drift measures outcome changes, epistemic drift measures belief evolution.

This enables detection of belief degradation before effectiveness actually declines.

Attributes
pattern_id instance-attribute
pattern_id

Pattern ID being analyzed.

pattern_name instance-attribute
pattern_name

Human-readable pattern name.

window_size instance-attribute
window_size

Number of applications in each comparison window.

confidence_before instance-attribute
confidence_before

Average grounding confidence in the older window (applications N-2W to N-W).

confidence_after instance-attribute
confidence_after

Average grounding confidence in the recent window (applications N-W to N).

belief_change instance-attribute
belief_change

Change in belief/confidence: confidence_after - confidence_before.

belief_entropy instance-attribute
belief_entropy

Entropy of confidence values (0 = consistent beliefs, 1 = high variance).

applications_analyzed instance-attribute
applications_analyzed

Total number of applications analyzed (should be 2 × window_size).

threshold_exceeded class-attribute instance-attribute
threshold_exceeded = False

Whether belief_change magnitude exceeds the alert threshold.

drift_direction class-attribute instance-attribute
drift_direction = 'stable'

Direction of belief drift: 'strengthening', 'weakening', or 'stable'.

ErrorRecoveryRecord dataclass

ErrorRecoveryRecord(id, error_code, suggested_wait, actual_wait, recovery_success, recorded_at, model, time_of_day)

A record of error recovery timing for learning adaptive waits.

EscalationDecisionRecord dataclass

EscalationDecisionRecord(id, job_hash, sheet_num, confidence, action, guidance, validation_pass_rate, retry_count, outcome_after_action=None, recorded_at=(lambda: now(tz=UTC))(), model=None)

A record of a human/AI escalation decision.

Evolution v11: Escalation Learning Loop - records escalation decisions to learn from feedback over time and potentially suggest actions for similar future escalations.

Attributes
id instance-attribute
id

Unique identifier for this escalation decision.

job_hash instance-attribute
job_hash

Hash of the job that triggered escalation.

sheet_num instance-attribute
sheet_num

Sheet number that triggered escalation.

confidence instance-attribute
confidence

Aggregate confidence score at time of escalation (0.0-1.0).

action instance-attribute
action

Action taken: retry, skip, abort, modify_prompt.

guidance instance-attribute
guidance

Optional guidance/notes from the escalation handler.

validation_pass_rate instance-attribute
validation_pass_rate

Pass percentage of validations at escalation time.

retry_count instance-attribute
retry_count

Number of retries attempted before escalation.

outcome_after_action class-attribute instance-attribute
outcome_after_action = None

What happened after the action: success, failed, aborted, skipped.

recorded_at class-attribute instance-attribute
recorded_at = field(default_factory=lambda: now(tz=UTC))

When the escalation decision was recorded.

model class-attribute instance-attribute
model = None

Model used for execution (if relevant).

EscalationMixin

Mixin providing escalation decision functionality.

This mixin provides methods for recording and querying escalation decisions. When a sheet triggers escalation and receives a response, the decision is recorded so Marianne can learn from it and potentially suggest similar actions for future escalations with similar contexts.

Requires the following from the composed class
  • _get_connection() -> context manager yielding sqlite3.Connection
  • hash_job(job_id: str) -> str (static method)
Functions
record_escalation_decision
record_escalation_decision(job_id, sheet_num, confidence, action, validation_pass_rate, retry_count, guidance=None, outcome_after_action=None, model=None)

Record an escalation decision for learning.

When a sheet triggers escalation and receives a response from a human or AI handler, this method records the decision so that Marianne can learn from it and potentially suggest similar actions for future escalations with similar contexts.

Evolution v11: Escalation Learning Loop - closes the loop between escalation handlers and learning system.

Parameters:

Name Type Description Default
job_id str

ID of the job that triggered escalation.

required
sheet_num int

Sheet number that triggered escalation.

required
confidence float

Aggregate confidence score at escalation time (0.0-1.0).

required
action str

Action taken (retry, skip, abort, modify_prompt).

required
validation_pass_rate float

Pass percentage at escalation time.

required
retry_count int

Number of retries before escalation.

required
guidance str | None

Optional guidance/notes from the handler.

None
outcome_after_action str | None

What happened after (success, failed, etc.).

None
model str | None

Optional model name used for execution.

None

Returns:

Type Description
str

The escalation decision record ID.

Source code in src/marianne/learning/store/escalation.py
def record_escalation_decision(
    self,
    job_id: str,
    sheet_num: int,
    confidence: float,
    action: str,
    validation_pass_rate: float,
    retry_count: int,
    guidance: str | None = None,
    outcome_after_action: str | None = None,
    model: str | None = None,
) -> str:
    """Record an escalation decision for learning.

    When a sheet triggers escalation and receives a response from
    a human or AI handler, this method records the decision so that
    Marianne can learn from it and potentially suggest similar actions
    for future escalations with similar contexts.

    Evolution v11: Escalation Learning Loop - closes the loop between
    escalation handlers and learning system.

    Args:
        job_id: ID of the job that triggered escalation.
        sheet_num: Sheet number that triggered escalation.
        confidence: Aggregate confidence score at escalation time (0.0-1.0).
        action: Action taken (retry, skip, abort, modify_prompt).
        validation_pass_rate: Pass percentage at escalation time.
        retry_count: Number of retries before escalation.
        guidance: Optional guidance/notes from the handler.
        outcome_after_action: What happened after (success, failed, etc.).
        model: Optional model name used for execution.

    Returns:
        The escalation decision record ID.
    """
    record_id = str(uuid.uuid4())
    job_hash = self.hash_job(job_id)
    now = datetime.now()

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO escalation_decisions (
                id, job_hash, sheet_num, confidence, action,
                guidance, validation_pass_rate, retry_count,
                outcome_after_action, recorded_at, model
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                record_id,
                job_hash,
                sheet_num,
                confidence,
                action,
                guidance,
                validation_pass_rate,
                retry_count,
                outcome_after_action,
                now.isoformat(),
                model,
            ),
        )

    _logger.info(
        f"Recorded escalation decision {record_id}: sheet={sheet_num}, "
        f"action={action}, confidence={confidence:.1%}"
    )
    return record_id
get_escalation_history
get_escalation_history(job_id=None, action=None, limit=20)

Get historical escalation decisions.

Retrieves past escalation decisions for analysis or display. Can filter by job or action type.

Parameters:

Name Type Description Default
job_id str | None

Optional job ID to filter by.

None
action str | None

Optional action type to filter by.

None
limit int

Maximum number of records to return.

20

Returns:

Type Description
list[EscalationDecisionRecord]

List of EscalationDecisionRecord objects.

Source code in src/marianne/learning/store/escalation.py
def get_escalation_history(
    self,
    job_id: str | None = None,
    action: str | None = None,
    limit: int = 20,
) -> list[EscalationDecisionRecord]:
    """Get historical escalation decisions.

    Retrieves past escalation decisions for analysis or display.
    Can filter by job or action type.

    Args:
        job_id: Optional job ID to filter by.
        action: Optional action type to filter by.
        limit: Maximum number of records to return.

    Returns:
        List of EscalationDecisionRecord objects.
    """
    with self._get_connection() as conn:
        wb = WhereBuilder()
        if job_id is not None:
            wb.add("job_hash = ?", self.hash_job(job_id))
        if action is not None:
            wb.add("action = ?", action)

        where_sql, params = wb.build()
        cursor = conn.execute(
            f"""
            SELECT * FROM escalation_decisions
            WHERE {where_sql}
            ORDER BY recorded_at DESC
            LIMIT ?
            """,
            (*params, limit),
        )

        records = []
        for row in cursor.fetchall():
            records.append(
                EscalationDecisionRecord(
                    id=row["id"],
                    job_hash=row["job_hash"],
                    sheet_num=row[SHEET_NUM_KEY],
                    confidence=row["confidence"],
                    action=row["action"],
                    guidance=row["guidance"],
                    validation_pass_rate=row[VALIDATION_PASS_RATE_KEY],
                    retry_count=row["retry_count"],
                    outcome_after_action=row["outcome_after_action"],
                    recorded_at=datetime.fromisoformat(row["recorded_at"]),
                    model=row["model"],
                )
            )

        return records
get_similar_escalation
get_similar_escalation(confidence, validation_pass_rate, confidence_tolerance=0.15, pass_rate_tolerance=15.0, limit=5)

Get similar past escalation decisions for guidance.

Finds historical escalations with similar context (confidence and pass rate) to help inform the current escalation decision. Can be used to suggest actions or provide guidance to human operators.

Evolution v11: Escalation Learning Loop - enables pattern-based suggestions for similar escalation contexts.

Parameters:

Name Type Description Default
confidence float

Current confidence level (0.0-1.0).

required
validation_pass_rate float

Current validation pass percentage.

required
confidence_tolerance float

How much confidence can differ (default 0.15).

0.15
pass_rate_tolerance float

How much pass rate can differ (default 15%).

15.0
limit int

Maximum number of similar records to return.

5

Returns:

Type Description
list[EscalationDecisionRecord]

List of EscalationDecisionRecord from similar past escalations,

list[EscalationDecisionRecord]

ordered by outcome success (successful outcomes first).

Source code in src/marianne/learning/store/escalation.py
def get_similar_escalation(
    self,
    confidence: float,
    validation_pass_rate: float,
    confidence_tolerance: float = 0.15,
    pass_rate_tolerance: float = 15.0,
    limit: int = 5,
) -> list[EscalationDecisionRecord]:
    """Get similar past escalation decisions for guidance.

    Finds historical escalations with similar context (confidence and
    pass rate) to help inform the current escalation decision. Can be
    used to suggest actions or provide guidance to human operators.

    Evolution v11: Escalation Learning Loop - enables pattern-based
    suggestions for similar escalation contexts.

    Args:
        confidence: Current confidence level (0.0-1.0).
        validation_pass_rate: Current validation pass percentage.
        confidence_tolerance: How much confidence can differ (default 0.15).
        pass_rate_tolerance: How much pass rate can differ (default 15%).
        limit: Maximum number of similar records to return.

    Returns:
        List of EscalationDecisionRecord from similar past escalations,
        ordered by outcome success (successful outcomes first).
    """
    with self._get_connection() as conn:
        # Find escalations with similar confidence and pass rate
        # Order by: successful outcomes first, then by how close the match is
        cursor = conn.execute(
            """
            SELECT *,
                   ABS(confidence - ?) as conf_diff,
                   ABS(validation_pass_rate - ?) as rate_diff
            FROM escalation_decisions
            WHERE ABS(confidence - ?) <= ?
              AND ABS(validation_pass_rate - ?) <= ?
            ORDER BY
                CASE WHEN outcome_after_action = 'success' THEN 0
                     WHEN outcome_after_action = 'skipped' THEN 1
                     WHEN outcome_after_action IS NULL THEN 2
                     ELSE 3 END,
                conf_diff + (rate_diff / 100.0)
            LIMIT ?
            """,
            (
                confidence,
                validation_pass_rate,
                confidence,
                confidence_tolerance,
                validation_pass_rate,
                pass_rate_tolerance,
                limit,
            ),
        )

        records = []
        for row in cursor.fetchall():
            records.append(
                EscalationDecisionRecord(
                    id=row["id"],
                    job_hash=row["job_hash"],
                    sheet_num=row[SHEET_NUM_KEY],
                    confidence=row["confidence"],
                    action=row["action"],
                    guidance=row["guidance"],
                    validation_pass_rate=row[VALIDATION_PASS_RATE_KEY],
                    retry_count=row["retry_count"],
                    outcome_after_action=row["outcome_after_action"],
                    recorded_at=datetime.fromisoformat(row["recorded_at"]),
                    model=row["model"],
                )
            )

        return records
update_escalation_outcome
update_escalation_outcome(escalation_id, outcome_after_action)

Update the outcome of an escalation decision.

Called after an escalation action is taken and the result is known. This closes the feedback loop by recording whether the action led to success or failure.

Parameters:

Name Type Description Default
escalation_id str

The escalation record ID to update.

required
outcome_after_action str

What happened (success, failed, aborted, skipped).

required

Returns:

Type Description
bool

True if the record was updated, False if not found.

Source code in src/marianne/learning/store/escalation.py
def update_escalation_outcome(
    self,
    escalation_id: str,
    outcome_after_action: str,
) -> bool:
    """Update the outcome of an escalation decision.

    Called after an escalation action is taken and the result is known.
    This closes the feedback loop by recording whether the action led
    to success or failure.

    Args:
        escalation_id: The escalation record ID to update.
        outcome_after_action: What happened (success, failed, aborted, skipped).

    Returns:
        True if the record was updated, False if not found.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            UPDATE escalation_decisions
            SET outcome_after_action = ?
            WHERE id = ?
            """,
            (outcome_after_action, escalation_id),
        )
        updated = cursor.rowcount > 0

    if updated:
        _logger.debug(
            f"Updated escalation {escalation_id} outcome: {outcome_after_action}"
        )

    return updated

EvolutionTrajectoryEntry dataclass

EvolutionTrajectoryEntry(id, cycle, recorded_at, evolutions_completed, evolutions_deferred, issue_classes, cv_avg, implementation_loc, test_loc, loc_accuracy, research_candidates_resolved=0, research_candidates_created=0, notes='')

A record of a single evolution cycle in Marianne's self-improvement trajectory.

v16 Evolution: Evolution Trajectory Tracking - enables Marianne to track its own evolution history, identifying recurring issue classes and measuring improvement over time.

Attributes
id instance-attribute
id

Unique identifier for this trajectory entry.

cycle instance-attribute
cycle

Evolution cycle number (e.g., 16 for v16).

recorded_at instance-attribute
recorded_at

When this entry was recorded.

evolutions_completed instance-attribute
evolutions_completed

Number of evolutions completed in this cycle.

evolutions_deferred instance-attribute
evolutions_deferred

Number of evolutions deferred in this cycle.

issue_classes instance-attribute
issue_classes

Issue classes addressed (e.g., 'infrastructure_activation', 'epistemic_drift').

cv_avg instance-attribute
cv_avg

Average Consciousness Volume of selected evolutions.

implementation_loc instance-attribute
implementation_loc

Total implementation LOC for this cycle.

test_loc instance-attribute
test_loc

Total test LOC for this cycle.

loc_accuracy instance-attribute
loc_accuracy

LOC estimation accuracy (actual/estimated as ratio).

research_candidates_resolved class-attribute instance-attribute
research_candidates_resolved = 0

Number of research candidates resolved in this cycle.

research_candidates_created class-attribute instance-attribute
research_candidates_created = 0

Number of new research candidates created in this cycle.

notes class-attribute instance-attribute
notes = ''

Optional notes about this evolution cycle.

ExecutionMixin

Mixin providing execution-related methods for GlobalLearningStore.

This mixin requires that the composed class provides: - _get_connection(): Context manager yielding sqlite3.Connection - _logger: Logger instance for logging - hash_workspace(workspace_path): Static method to hash workspace paths - hash_job(job_name, config_hash): Static method to hash job identifiers

Execution Recording Methods: - record_outcome: Record a sheet execution outcome - _extract_sheet_num: Helper to parse sheet numbers from IDs - _calculate_confidence: Calculate confidence score for an outcome

Execution Statistics Methods: - get_execution_stats: Get aggregate statistics from the global store - get_recent_executions: Get recent execution records

Similar Executions Methods (Learning Activation): - get_similar_executions: Find similar historical executions - get_optimal_execution_window: Analyze optimal times for execution

Workspace Clustering Methods: - get_workspace_cluster: Get cluster ID for a workspace - assign_workspace_cluster: Assign a workspace to a cluster - get_similar_workspaces: Get workspaces in the same cluster

Functions
record_outcome
record_outcome(outcome, workspace_path, model=None, error_codes=None)

Record a sheet outcome to the global store.

Parameters:

Name Type Description Default
outcome SheetOutcome

The SheetOutcome to record.

required
workspace_path Path

Path to the workspace for hashing.

required
model str | None

Optional model name used for execution.

None
error_codes list[str] | None

Optional list of error codes encountered.

None

Returns:

Type Description
str

The execution record ID.

Source code in src/marianne/learning/store/executions.py
def record_outcome(
    self,
    outcome: SheetOutcome,
    workspace_path: Path,
    model: str | None = None,
    error_codes: list[str] | None = None,
) -> str:
    """Record a sheet outcome to the global store.

    Args:
        outcome: The SheetOutcome to record.
        workspace_path: Path to the workspace for hashing.
        model: Optional model name used for execution.
        error_codes: Optional list of error codes encountered.

    Returns:
        The execution record ID.
    """
    execution_id = str(uuid.uuid4())
    workspace_hash = self.hash_workspace(workspace_path)
    job_hash = self.hash_job(outcome.job_id)

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO executions (
                id, workspace_hash, job_hash, sheet_num,
                started_at, completed_at, duration_seconds,
                status, retry_count, success_without_retry,
                validation_pass_rate, confidence_score, model, error_codes
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                execution_id,
                workspace_hash,
                job_hash,
                self._extract_sheet_num(outcome.sheet_id),
                outcome.timestamp.isoformat(),
                datetime.now().isoformat(),
                outcome.execution_duration,
                outcome.final_status.value,
                outcome.retry_count,
                outcome.success_without_retry,
                outcome.validation_pass_rate,
                self._calculate_confidence(outcome),
                model,
                json.dumps(error_codes or []),
            ),
        )

    _logger.debug(
        f"Recorded outcome {execution_id} for sheet {outcome.sheet_id}"
    )
    return execution_id
get_execution_stats
get_execution_stats()

Get aggregate statistics from the global store.

Returns:

Type Description
dict[str, Any]

Dictionary with stats like total_executions, success_rate, etc.

Source code in src/marianne/learning/store/executions.py
def get_execution_stats(self) -> dict[str, Any]:
    """Get aggregate statistics from the global store.

    Returns:
        Dictionary with stats like total_executions, success_rate, etc.
    """
    with self._get_connection() as conn:
        stats: dict[str, Any] = {}

        # Total executions
        cursor = conn.execute("SELECT COUNT(*) as count FROM executions")
        stats["total_executions"] = cursor.fetchone()["count"]

        # First-attempt success rate
        cursor = conn.execute(
            """
            SELECT
                SUM(CASE WHEN success_without_retry THEN 1 ELSE 0 END) as successes,
                COUNT(*) as total
            FROM executions
            """
        )
        row = cursor.fetchone()
        if row["total"] > 0:
            stats["success_without_retry_rate"] = row["successes"] / row["total"]
        else:
            stats["success_without_retry_rate"] = 0.0

        # Total patterns
        cursor = conn.execute("SELECT COUNT(*) as count FROM patterns")
        stats["total_patterns"] = cursor.fetchone()["count"]

        # Average pattern effectiveness
        cursor = conn.execute(
            "SELECT AVG(effectiveness_score) as avg FROM patterns"
        )
        row = cursor.fetchone()
        stats["avg_pattern_effectiveness"] = row["avg"] or 0.0

        # Total error recoveries
        cursor = conn.execute("SELECT COUNT(*) as count FROM error_recoveries")
        stats["total_error_recoveries"] = cursor.fetchone()["count"]

        # Unique workspaces
        cursor = conn.execute(
            "SELECT COUNT(DISTINCT workspace_hash) as count FROM executions"
        )
        stats["unique_workspaces"] = cursor.fetchone()["count"]

        return stats
get_recent_executions
get_recent_executions(limit=20, workspace_hash=None)

Get recent execution records.

Parameters:

Name Type Description Default
limit int

Maximum number of records to return.

20
workspace_hash str | None

Optional filter by workspace.

None

Returns:

Type Description
list[ExecutionRecord]

List of ExecutionRecord objects.

Source code in src/marianne/learning/store/executions.py
def get_recent_executions(
    self,
    limit: int = 20,
    workspace_hash: str | None = None,
) -> list[ExecutionRecord]:
    """Get recent execution records.

    Args:
        limit: Maximum number of records to return.
        workspace_hash: Optional filter by workspace.

    Returns:
        List of ExecutionRecord objects.
    """
    with self._get_connection() as conn:
        if workspace_hash:
            cursor = conn.execute(
                """
                SELECT * FROM executions
                WHERE workspace_hash = ?
                ORDER BY completed_at DESC
                LIMIT ?
                """,
                (workspace_hash, limit),
            )
        else:
            cursor = conn.execute(
                """
                SELECT * FROM executions
                ORDER BY completed_at DESC
                LIMIT ?
                """,
                (limit,),
            )

        records = []
        for row in cursor.fetchall():
            records.append(
                ExecutionRecord(
                    id=row["id"],
                    workspace_hash=row["workspace_hash"],
                    job_hash=row["job_hash"],
                    sheet_num=row[SHEET_NUM_KEY],
                    started_at=datetime.fromisoformat(row["started_at"])
                    if row["started_at"]
                    else None,
                    completed_at=datetime.fromisoformat(row["completed_at"])
                    if row["completed_at"]
                    else None,
                    duration_seconds=row["duration_seconds"] or 0.0,
                    status=row["status"] or "",
                    retry_count=row["retry_count"] or 0,
                    success_without_retry=bool(row["success_without_retry"]),
                    validation_pass_rate=row[VALIDATION_PASS_RATE_KEY] or 0.0,
                    confidence_score=row["confidence_score"] or 0.0,
                    model=row["model"],
                    error_codes=json.loads(row["error_codes"] or "[]"),
                )
            )

        return records
get_similar_executions
get_similar_executions(job_hash=None, workspace_hash=None, sheet_num=None, limit=10)

Get similar historical executions for learning.

Learning Activation: Enables querying executions that are similar to the current context, supporting pattern-based decision making.

Parameters:

Name Type Description Default
job_hash str | None

Optional job hash to filter by similar jobs.

None
workspace_hash str | None

Optional workspace hash to filter by.

None
sheet_num int | None

Optional sheet number to filter by.

None
limit int

Maximum number of records to return.

10

Returns:

Type Description
list[ExecutionRecord]

List of ExecutionRecord objects matching the criteria.

Source code in src/marianne/learning/store/executions.py
def get_similar_executions(
    self,
    job_hash: str | None = None,
    workspace_hash: str | None = None,
    sheet_num: int | None = None,
    limit: int = 10,
) -> list[ExecutionRecord]:
    """Get similar historical executions for learning.

    Learning Activation: Enables querying executions that are similar to
    the current context, supporting pattern-based decision making.

    Args:
        job_hash: Optional job hash to filter by similar jobs.
        workspace_hash: Optional workspace hash to filter by.
        sheet_num: Optional sheet number to filter by.
        limit: Maximum number of records to return.

    Returns:
        List of ExecutionRecord objects matching the criteria.
    """
    with self._get_connection() as conn:
        wb = WhereBuilder()
        if job_hash is not None:
            wb.add("job_hash = ?", job_hash)
        if workspace_hash is not None:
            wb.add("workspace_hash = ?", workspace_hash)
        if sheet_num is not None:
            wb.add("sheet_num = ?", sheet_num)

        where_sql, params = wb.build()
        cursor = conn.execute(
            f"""
            SELECT * FROM executions
            WHERE {where_sql}
            ORDER BY completed_at DESC
            LIMIT ?
            """,
            (*params, limit),
        )

        records = []
        for row in cursor.fetchall():
            records.append(
                ExecutionRecord(
                    id=row["id"],
                    workspace_hash=row["workspace_hash"],
                    job_hash=row["job_hash"],
                    sheet_num=row[SHEET_NUM_KEY],
                    started_at=datetime.fromisoformat(row["started_at"])
                    if row["started_at"]
                    else None,
                    completed_at=datetime.fromisoformat(row["completed_at"])
                    if row["completed_at"]
                    else None,
                    duration_seconds=row["duration_seconds"] or 0.0,
                    status=row["status"] or "",
                    retry_count=row["retry_count"] or 0,
                    success_without_retry=bool(row["success_without_retry"]),
                    validation_pass_rate=row[VALIDATION_PASS_RATE_KEY] or 0.0,
                    confidence_score=row["confidence_score"] or 0.0,
                    model=row["model"],
                    error_codes=json.loads(row["error_codes"] or "[]"),
                )
            )

        return records
get_optimal_execution_window
get_optimal_execution_window(error_code=None, model=None)

Analyze historical data to find optimal execution windows.

Learning Activation: Identifies times of day when executions are most likely to succeed, enabling time-aware scheduling recommendations.

Parameters:

Name Type Description Default
error_code str | None

Optional error code to analyze (e.g., for rate limits).

None
model str | None

Optional model to filter by.

None

Returns:

Type Description
dict[str, Any]

Dict with optimal window analysis:

dict[str, Any]
  • optimal_hours: List of hours (0-23) with best success rates
dict[str, Any]
  • avoid_hours: List of hours with high failure/rate limit rates
dict[str, Any]
  • confidence: Confidence in the recommendation (0.0-1.0)
dict[str, Any]
  • sample_count: Number of samples analyzed
Source code in src/marianne/learning/store/executions.py
def get_optimal_execution_window(
    self,
    error_code: str | None = None,
    model: str | None = None,  # noqa: ARG002 - reserved for future model filtering
) -> dict[str, Any]:
    """Analyze historical data to find optimal execution windows.

    Learning Activation: Identifies times of day when executions are most
    likely to succeed, enabling time-aware scheduling recommendations.

    Args:
        error_code: Optional error code to analyze (e.g., for rate limits).
        model: Optional model to filter by.

    Returns:
        Dict with optimal window analysis:
        - optimal_hours: List of hours (0-23) with best success rates
        - avoid_hours: List of hours with high failure/rate limit rates
        - confidence: Confidence in the recommendation (0.0-1.0)
        - sample_count: Number of samples analyzed
    """
    with self._get_connection() as conn:
        # Query success rate by hour of day from error_recoveries
        if error_code:
            cursor = conn.execute(
                """
                SELECT
                    time_of_day,
                    COUNT(*) as total,
                    SUM(CASE WHEN recovery_success THEN 1 ELSE 0 END) as successes
                FROM error_recoveries
                WHERE error_code = ?
                GROUP BY time_of_day
                ORDER BY time_of_day
                """,
                (error_code,),
            )
        else:
            cursor = conn.execute(
                """
                SELECT
                    time_of_day,
                    COUNT(*) as total,
                    SUM(CASE WHEN recovery_success THEN 1 ELSE 0 END) as successes
                FROM error_recoveries
                GROUP BY time_of_day
                ORDER BY time_of_day
                """
            )

        rows = cursor.fetchall()

        if not rows:
            return {
                "optimal_hours": [],
                "avoid_hours": [],
                "confidence": 0.0,
                "sample_count": 0,
            }

        # Analyze success rates by hour
        hour_stats: dict[int, tuple[int, int]] = {}  # hour -> (successes, total)
        total_samples = 0

        for row in rows:
            hour = row["time_of_day"]
            total = row["total"]
            successes = row["successes"]
            hour_stats[hour] = (successes, total)
            total_samples += total

        # Find optimal and avoid hours
        optimal_hours: list[int] = []
        avoid_hours: list[int] = []

        for hour, (successes, total) in hour_stats.items():
            if total >= 3:  # Minimum samples for confidence
                success_rate = successes / total
                if success_rate >= 0.7:
                    optimal_hours.append(hour)
                elif success_rate <= 0.3:
                    avoid_hours.append(hour)

        # Calculate confidence based on sample size
        confidence = min(total_samples / 50.0, 1.0)

        return {
            "optimal_hours": sorted(optimal_hours),
            "avoid_hours": sorted(avoid_hours),
            "confidence": confidence,
            "sample_count": total_samples,
        }
get_workspace_cluster
get_workspace_cluster(workspace_hash)

Get the cluster ID for a workspace.

Learning Activation: Supports workspace similarity by grouping workspaces with similar patterns into clusters.

Parameters:

Name Type Description Default
workspace_hash str

Hash of the workspace to query.

required

Returns:

Type Description
str | None

Cluster ID if assigned, None otherwise.

Source code in src/marianne/learning/store/executions.py
def get_workspace_cluster(
    self,
    workspace_hash: str,
) -> str | None:
    """Get the cluster ID for a workspace.

    Learning Activation: Supports workspace similarity by grouping
    workspaces with similar patterns into clusters.

    Args:
        workspace_hash: Hash of the workspace to query.

    Returns:
        Cluster ID if assigned, None otherwise.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            "SELECT cluster_id FROM workspace_clusters WHERE workspace_hash = ?",
            (workspace_hash,),
        )
        row = cursor.fetchone()
        return row["cluster_id"] if row else None
assign_workspace_cluster
assign_workspace_cluster(workspace_hash, cluster_id)

Assign a workspace to a cluster.

Learning Activation: Groups workspaces with similar execution patterns for targeted pattern recommendations.

Parameters:

Name Type Description Default
workspace_hash str

Hash of the workspace.

required
cluster_id str

ID of the cluster to assign to.

required
Source code in src/marianne/learning/store/executions.py
def assign_workspace_cluster(
    self,
    workspace_hash: str,
    cluster_id: str,
) -> None:
    """Assign a workspace to a cluster.

    Learning Activation: Groups workspaces with similar execution
    patterns for targeted pattern recommendations.

    Args:
        workspace_hash: Hash of the workspace.
        cluster_id: ID of the cluster to assign to.
    """
    now = datetime.now().isoformat()

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT OR REPLACE INTO workspace_clusters
            (workspace_hash, cluster_id, assigned_at)
            VALUES (?, ?, ?)
            """,
            (workspace_hash, cluster_id, now),
        )
get_similar_workspaces
get_similar_workspaces(cluster_id, limit=10)

Get workspace hashes in the same cluster.

Learning Activation: Enables cross-workspace learning by identifying workspaces with similar patterns.

Parameters:

Name Type Description Default
cluster_id str

Cluster ID to query.

required
limit int

Maximum number of workspace hashes to return.

10

Returns:

Type Description
list[str]

List of workspace hashes in the cluster.

Source code in src/marianne/learning/store/executions.py
def get_similar_workspaces(
    self,
    cluster_id: str,
    limit: int = 10,
) -> list[str]:
    """Get workspace hashes in the same cluster.

    Learning Activation: Enables cross-workspace learning by identifying
    workspaces with similar patterns.

    Args:
        cluster_id: Cluster ID to query.
        limit: Maximum number of workspace hashes to return.

    Returns:
        List of workspace hashes in the cluster.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT workspace_hash FROM workspace_clusters
            WHERE cluster_id = ?
            ORDER BY assigned_at DESC
            LIMIT ?
            """,
            (cluster_id, limit),
        )
        return [row["workspace_hash"] for row in cursor.fetchall()]
record_error_recovery
record_error_recovery(error_code, suggested_wait, actual_wait, recovery_success, model=None)

Record an error recovery for learning adaptive wait times.

Parameters:

Name Type Description Default
error_code str

The error code (e.g., 'E103').

required
suggested_wait float

The initially suggested wait time in seconds.

required
actual_wait float

The actual wait time used in seconds.

required
recovery_success bool

Whether recovery after waiting succeeded.

required
model str | None

Optional model name.

None

Returns:

Type Description
str

The recovery record ID.

Source code in src/marianne/learning/store/executions.py
def record_error_recovery(
    self,
    error_code: str,
    suggested_wait: float,
    actual_wait: float,
    recovery_success: bool,
    model: str | None = None,
) -> str:
    """Record an error recovery for learning adaptive wait times.

    Args:
        error_code: The error code (e.g., 'E103').
        suggested_wait: The initially suggested wait time in seconds.
        actual_wait: The actual wait time used in seconds.
        recovery_success: Whether recovery after waiting succeeded.
        model: Optional model name.

    Returns:
        The recovery record ID.
    """
    record_id = str(uuid.uuid4())
    now = datetime.now()

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO error_recoveries (
                id, error_code, suggested_wait, actual_wait,
                recovery_success, recorded_at, model, time_of_day
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                record_id,
                error_code,
                suggested_wait,
                actual_wait,
                recovery_success,
                now.isoformat(),
                model,
                now.hour,
            ),
        )

    return record_id
get_learned_wait_time
get_learned_wait_time(error_code, model=None, min_samples=3)

Get the learned optimal wait time for an error code.

Analyzes past error recoveries to suggest an adaptive wait time.

Parameters:

Name Type Description Default
error_code str

The error code to look up.

required
model str | None

Optional model to filter by.

None
min_samples int

Minimum samples required before learning.

3

Returns:

Type Description
float | None

Suggested wait time in seconds, or None if not enough data.

Source code in src/marianne/learning/store/executions.py
def get_learned_wait_time(
    self,
    error_code: str,
    model: str | None = None,
    min_samples: int = 3,
) -> float | None:
    """Get the learned optimal wait time for an error code.

    Analyzes past error recoveries to suggest an adaptive wait time.

    Args:
        error_code: The error code to look up.
        model: Optional model to filter by.
        min_samples: Minimum samples required before learning.

    Returns:
        Suggested wait time in seconds, or None if not enough data.
    """
    with self._get_connection() as conn:
        if model:
            cursor = conn.execute(
                """
                SELECT actual_wait, recovery_success
                FROM error_recoveries
                WHERE error_code = ? AND model = ? AND recovery_success = 1
                ORDER BY recorded_at DESC
                LIMIT 20
                """,
                (error_code, model),
            )
        else:
            cursor = conn.execute(
                """
                SELECT actual_wait, recovery_success
                FROM error_recoveries
                WHERE error_code = ? AND recovery_success = 1
                ORDER BY recorded_at DESC
                LIMIT 20
                """,
                (error_code,),
            )

        rows = cursor.fetchall()
        if len(rows) < min_samples:
            return None

        # Use weighted average favoring shorter successful waits
        waits: list[float] = [float(row["actual_wait"]) for row in rows]
        # Lower waits are better, so we use harmonic mean-like weighting
        avg_wait = sum(waits) / len(waits)
        min_successful = min(waits)

        # Blend toward shorter waits that still work
        return (avg_wait + min_successful) / 2
get_learned_wait_time_with_fallback
get_learned_wait_time_with_fallback(error_code, static_delay, model=None, min_samples=3, min_confidence=0.7)

Get learned wait time with fallback to static delay and confidence.

This method bridges the global learning store with retry strategies. It returns a delay value along with a confidence score indicating how much to trust the learned value.

Evolution #3: Learned Wait Time Injection - provides the bridge between global_store's cross-workspace learned delays and retry_strategy's blend_historical_delay() method.

Parameters:

Name Type Description Default
error_code str

The error code to look up (e.g., 'E101').

required
static_delay float

Fallback static delay if no learned data available.

required
model str | None

Optional model to filter by.

None
min_samples int

Minimum samples required for learning.

3
min_confidence float

Minimum confidence threshold for using learned delay.

0.7

Returns:

Type Description
float

Tuple of (delay_seconds, confidence, strategy_name).

float
  • delay_seconds: The recommended delay (learned or static).
str
  • confidence: Confidence in the recommendation (0.0-1.0). High confidence (>=0.7) means learned delay is reliable. Low confidence (<0.7) means learned delay should be blended with static.
tuple[float, float, str]
  • strategy_name: "global_learned" | "global_learned_blend" | "static_fallback"
Source code in src/marianne/learning/store/executions.py
def get_learned_wait_time_with_fallback(
    self,
    error_code: str,
    static_delay: float,
    model: str | None = None,
    min_samples: int = 3,
    min_confidence: float = 0.7,
) -> tuple[float, float, str]:
    """Get learned wait time with fallback to static delay and confidence.

    This method bridges the global learning store with retry strategies.
    It returns a delay value along with a confidence score indicating
    how much to trust the learned value.

    Evolution #3: Learned Wait Time Injection - provides the bridge between
    global_store's cross-workspace learned delays and retry_strategy's
    blend_historical_delay() method.

    Args:
        error_code: The error code to look up (e.g., 'E101').
        static_delay: Fallback static delay if no learned data available.
        model: Optional model to filter by.
        min_samples: Minimum samples required for learning.
        min_confidence: Minimum confidence threshold for using learned delay.

    Returns:
        Tuple of (delay_seconds, confidence, strategy_name).
        - delay_seconds: The recommended delay (learned or static).
        - confidence: Confidence in the recommendation (0.0-1.0).
          High confidence (>=0.7) means learned delay is reliable.
          Low confidence (<0.7) means learned delay should be blended with static.
        - strategy_name: "global_learned" | "global_learned_blend" | "static_fallback"
    """
    # Query learned wait time from global store
    learned_wait = self.get_learned_wait_time(
        error_code=error_code,
        model=model,
        min_samples=min_samples,
    )

    if learned_wait is None:
        # No learned data - fall back to static
        return static_delay, 0.0, "static_fallback"

    # Calculate confidence based on sample count
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT COUNT(*) as count
            FROM error_recoveries
            WHERE error_code = ? AND recovery_success = 1
            """,
            (error_code,),
        )
        sample_count = cursor.fetchone()["count"]

    # Confidence scales with sample count: 10 samples = 1.0 confidence
    confidence = min(sample_count / 10.0, 1.0)

    # Apply floor: learned delay shouldn't be less than 50% of static
    # This prevents overly aggressive waits that might fail
    delay_floor = static_delay * 0.5

    if confidence >= min_confidence:
        # High confidence: use learned delay (with floor)
        final_delay = max(learned_wait, delay_floor)
        return final_delay, confidence, "global_learned"
    else:
        # Low confidence: blend learned with static
        # weight = confidence / min_confidence (0.0 to 1.0)
        weight = confidence / min_confidence
        blended = weight * max(learned_wait, delay_floor) + (1 - weight) * static_delay
        return blended, confidence, "global_learned_blend"
get_error_recovery_sample_count
get_error_recovery_sample_count(error_code)

Get the number of successful recovery samples for an error code.

Parameters:

Name Type Description Default
error_code str

The error code to query.

required

Returns:

Type Description
int

Number of successful recovery samples.

Source code in src/marianne/learning/store/executions.py
def get_error_recovery_sample_count(self, error_code: str) -> int:
    """Get the number of successful recovery samples for an error code.

    Args:
        error_code: The error code to query.

    Returns:
        Number of successful recovery samples.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT COUNT(*) as count
            FROM error_recoveries
            WHERE error_code = ? AND recovery_success = 1
            """,
            (error_code,),
        )
        result: int = cursor.fetchone()["count"]
        return result

ExecutionRecord dataclass

ExecutionRecord(id, workspace_hash, job_hash, sheet_num, started_at, completed_at, duration_seconds, status, retry_count, success_without_retry, validation_pass_rate, confidence_score, model, error_codes=list())

A record of a sheet execution stored in the global database.

Functions
__post_init__
__post_init__()

Clamp fields to valid ranges.

Source code in src/marianne/learning/store/models.py
def __post_init__(self) -> None:
    """Clamp fields to valid ranges."""
    self.duration_seconds = max(0.0, self.duration_seconds)
    self.retry_count = max(0, self.retry_count)
    self.validation_pass_rate = max(0.0, min(1.0, self.validation_pass_rate))
    self.confidence_score = max(0.0, min(1.0, self.confidence_score))

ExplorationBudgetRecord dataclass

ExplorationBudgetRecord(id, job_hash, recorded_at, budget_value, entropy_at_time, adjustment_type, adjustment_reason=None)

A record of exploration budget state over time.

v23 Evolution: Exploration Budget Maintenance - tracks the dynamic exploration budget to prevent convergence to zero. The budget adjusts based on pattern entropy observations.

Attributes
id instance-attribute
id

Unique identifier for this budget record.

job_hash instance-attribute
job_hash

Hash of the job this budget adjustment belongs to.

recorded_at instance-attribute
recorded_at

When this budget state was recorded.

budget_value instance-attribute
budget_value

Current budget value (0.0-1.0).

entropy_at_time instance-attribute
entropy_at_time

Pattern entropy at time of recording (if measured).

adjustment_type instance-attribute
adjustment_type

Type of adjustment: 'initial', 'decay', 'boost', 'floor_enforced'.

adjustment_reason class-attribute instance-attribute
adjustment_reason = None

Human-readable reason for this adjustment.

GlobalLearningStore

GlobalLearningStore(db_path=None)

Bases: PatternMixin, ExecutionMixin, RateLimitMixin, DriftMixin, EscalationMixin, BudgetMixin, PatternLifecycleMixin, GlobalLearningStoreBase

Global learning store combining all mixins.

This is the primary interface for Marianne's cross-workspace learning system. It provides persistent storage for execution outcomes, detected patterns, error recovery data, and learning metrics across all Marianne workspaces.

The class is composed from multiple mixins, each providing domain-specific functionality. The base class (listed last for proper MRO) provides: - SQLite connection management with WAL mode for concurrent access - Schema creation and version migration - Hashing utilities for workspace and job identification

Mixin Capabilities

PatternMixin: - record_pattern(), get_patterns(), get_pattern_by_id() - record_pattern_application(), update_pattern_effectiveness() - quarantine lifecycle: quarantine_pattern(), validate_pattern() - trust scoring, success factor analysis - pattern discovery broadcasting

ExecutionMixin: - record_outcome() for sheet execution outcomes - get_execution_stats(), get_recent_executions() - get_similar_executions() for learning activation - workspace clustering for cross-workspace correlation

RateLimitMixin: - record_rate_limit_event() for cross-workspace coordination - get_recent_rate_limits() to check before API calls - Enables parallel jobs to avoid hitting same limits

DriftMixin: - calculate_drift_metrics() for effectiveness drift - detect_epistemic_drift() for belief-level monitoring - auto_retire_drifting_patterns() for lifecycle management - get_pattern_evolution_trajectory() for historical analysis

EscalationMixin: - record_escalation_decision() when handlers respond - get_similar_escalation() for pattern-based suggestions - Closes the learning loop for escalation handling

BudgetMixin: - get_exploration_budget(), update_exploration_budget() - record_entropy_response() for diversity injection - Dynamic budget with floor/ceiling to prevent over-convergence

Example

from marianne.learning.store import GlobalLearningStore store = GlobalLearningStore()

Record a pattern

store.record_pattern( ... pattern_type="rate_limit_recovery", ... pattern_content={"action": "exponential_backoff"}, ... context={"error_code": "E101"}, ... source_job="job-123", ... )

Query execution statistics

stats = store.get_execution_stats() print(f"Total executions: {stats['total_executions']}")

Attributes:

Name Type Description
db_path

Path to the SQLite database file.

_logger MarianneLogger

Module logger instance for consistent logging.

Note

The database uses WAL mode for safe concurrent access from multiple Marianne jobs. Schema migrations are applied automatically when the store is initialized.

Source code in src/marianne/learning/store/base.py
def __init__(self, db_path: Path | None = None) -> None:
    """Initialize the global learning store.

    Creates the database directory if needed, establishes the connection,
    and runs any necessary migrations.

    Args:
        db_path: Path to the SQLite database file.
                Defaults to ~/.marianne/global-learning.db
    """
    self.db_path = db_path or DEFAULT_GLOBAL_STORE_PATH
    self._logger = _logger
    # ContextVar scopes the batch connection per-asyncio-task, preventing
    # a race where Task A's batch_connection() leaks into Task B's
    # _get_connection() calls.  Each task sees its own (or no) batch conn.
    self._batch_conn: contextvars.ContextVar[sqlite3.Connection | None] = (
        contextvars.ContextVar("_batch_conn", default=None)
    )
    self._ensure_db_exists()
    self._migrate_if_needed()

GlobalLearningStoreBase

GlobalLearningStoreBase(db_path=None)

SQLite-based global learning store base class.

Provides persistent storage infrastructure for execution outcomes, detected patterns, and error recovery data across all Marianne workspaces. Uses WAL mode for safe concurrent access.

This base class handles: - Database connection lifecycle - Schema version management - Migration and schema creation - Hashing utilities

Subclasses (via mixins) add domain-specific methods for patterns, executions, rate limits, drift detection, escalation, and budget management.

Attributes:

Name Type Description
db_path

Path to the SQLite database file.

_logger

Module logger instance for consistent logging.

Initialize the global learning store.

Creates the database directory if needed, establishes the connection, and runs any necessary migrations.

Parameters:

Name Type Description Default
db_path Path | None

Path to the SQLite database file. Defaults to ~/.marianne/global-learning.db

None
Source code in src/marianne/learning/store/base.py
def __init__(self, db_path: Path | None = None) -> None:
    """Initialize the global learning store.

    Creates the database directory if needed, establishes the connection,
    and runs any necessary migrations.

    Args:
        db_path: Path to the SQLite database file.
                Defaults to ~/.marianne/global-learning.db
    """
    self.db_path = db_path or DEFAULT_GLOBAL_STORE_PATH
    self._logger = _logger
    # ContextVar scopes the batch connection per-asyncio-task, preventing
    # a race where Task A's batch_connection() leaks into Task B's
    # _get_connection() calls.  Each task sees its own (or no) batch conn.
    self._batch_conn: contextvars.ContextVar[sqlite3.Connection | None] = (
        contextvars.ContextVar("_batch_conn", default=None)
    )
    self._ensure_db_exists()
    self._migrate_if_needed()
Functions
batch_connection
batch_connection()

Reuse a single connection across multiple operations.

While this context manager is active, all _get_connection() calls will reuse the same connection, avoiding repeated open/close overhead. The connection is committed once on successful exit or rolled back on error.

Example::

with store.batch_connection():
    patterns = store.get_patterns(min_priority=0.01)
    for p in patterns:
        store.update_trust_score(p.pattern_id, ...)

Yields:

Type Description
Connection

The shared sqlite3.Connection instance.

Source code in src/marianne/learning/store/base.py
@contextmanager
def batch_connection(self) -> Generator[sqlite3.Connection, None, None]:
    """Reuse a single connection across multiple operations.

    While this context manager is active, all ``_get_connection()`` calls
    will reuse the same connection, avoiding repeated open/close overhead.
    The connection is committed once on successful exit or rolled back on error.

    Example::

        with store.batch_connection():
            patterns = store.get_patterns(min_priority=0.01)
            for p in patterns:
                store.update_trust_score(p.pattern_id, ...)

    Yields:
        The shared sqlite3.Connection instance.
    """
    conn = sqlite3.connect(str(self.db_path), timeout=30.0)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA foreign_keys=ON")
    conn.execute("PRAGMA busy_timeout=30000")
    conn.row_factory = sqlite3.Row
    token = self._batch_conn.set(conn)
    try:
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        _logger.warning(
            f"Batch operation failed on {self.db_path}: {type(e).__name__}: {e}"
        )
        raise
    finally:
        self._batch_conn.reset(token)
        conn.close()
close
close()

Close any persistent resources.

No-op: connections are managed per-operation via _get_connection(). This method exists for API compatibility so callers can unconditionally call store.close() without checking the backend type.

Source code in src/marianne/learning/store/base.py
def close(self) -> None:  # noqa: B027 — intentional concrete no-op default
    """Close any persistent resources.

    No-op: connections are managed per-operation via _get_connection().
    This method exists for API compatibility so callers can unconditionally
    call ``store.close()`` without checking the backend type.
    """
hash_workspace staticmethod
hash_workspace(workspace_path)

Generate a stable hash for a workspace path.

Creates a reproducible 16-character hex hash from the resolved absolute path. This allows pattern matching across sessions while preserving privacy (paths are not stored directly).

Parameters:

Name Type Description Default
workspace_path Path

The absolute path to the workspace.

required

Returns:

Type Description
str

A hex string hash of the workspace path (16 characters).

Source code in src/marianne/learning/store/base.py
@staticmethod
def hash_workspace(workspace_path: Path) -> str:
    """Generate a stable hash for a workspace path.

    Creates a reproducible 16-character hex hash from the resolved
    absolute path. This allows pattern matching across sessions
    while preserving privacy (paths are not stored directly).

    Args:
        workspace_path: The absolute path to the workspace.

    Returns:
        A hex string hash of the workspace path (16 characters).
    """
    normalized = str(workspace_path.resolve())
    return hashlib.sha256(normalized.encode()).hexdigest()[:16]
hash_job staticmethod
hash_job(job_name, config_hash=None)

Generate a stable hash for a job.

Creates a reproducible 16-character hex hash from the job name and optional config hash. The config hash enables version-awareness: the same job with different configs will have different hashes.

Parameters:

Name Type Description Default
job_name str

The job name.

required
config_hash str | None

Optional hash of the job config for versioning.

None

Returns:

Type Description
str

A hex string hash of the job (16 characters).

Source code in src/marianne/learning/store/base.py
@staticmethod
def hash_job(job_name: str, config_hash: str | None = None) -> str:
    """Generate a stable hash for a job.

    Creates a reproducible 16-character hex hash from the job name
    and optional config hash. The config hash enables version-awareness:
    the same job with different configs will have different hashes.

    Args:
        job_name: The job name.
        config_hash: Optional hash of the job config for versioning.

    Returns:
        A hex string hash of the job (16 characters).
    """
    combined = f"{job_name}:{config_hash or ''}"
    return hashlib.sha256(combined.encode()).hexdigest()[:16]
clear_all
clear_all()

Clear all data from the global store.

WARNING: This is destructive and should only be used for testing.

Source code in src/marianne/learning/store/base.py
def clear_all(self) -> None:
    """Clear all data from the global store.

    WARNING: This is destructive and should only be used for testing.
    """
    with self._get_connection() as conn:
        for table in self._DATA_TABLES:
            conn.execute(f"DELETE FROM {table}")  # noqa: S608 -- table names are hardcoded constants

    _logger.warning("Cleared all data from global learning store")

PatternDiscoveryEvent dataclass

PatternDiscoveryEvent(id, pattern_id, pattern_name, pattern_type, source_job_hash, recorded_at, expires_at, effectiveness_score, context_tags=list())

A pattern discovery event for cross-job broadcasting.

v14 Evolution: Real-time Pattern Broadcasting - enables jobs to share newly discovered patterns with other concurrent jobs, so knowledge propagates across the ecosystem without waiting for aggregation.

Attributes
id instance-attribute
id

Unique identifier for this discovery event.

pattern_id instance-attribute
pattern_id

ID of the pattern that was discovered.

pattern_name instance-attribute
pattern_name

Human-readable name of the pattern.

pattern_type instance-attribute
pattern_type

Type of pattern (validation_failure, retry_pattern, etc.).

source_job_hash instance-attribute
source_job_hash

Hash of the job that discovered the pattern.

recorded_at instance-attribute
recorded_at

When the discovery was recorded.

expires_at instance-attribute
expires_at

When this broadcast expires (TTL-based).

effectiveness_score instance-attribute
effectiveness_score

Effectiveness score at time of discovery.

context_tags class-attribute instance-attribute
context_tags = field(default_factory=list)

Context tags for pattern matching.

PatternMixin

Bases: PatternCrudMixin, PatternQueryMixin, PatternQuarantineMixin, PatternTrustMixin, PatternSuccessFactorsMixin, PatternBroadcastMixin, PatternLifecycleMixin

Mixin providing all pattern-related methods for GlobalLearningStore.

This mixin requires that the composed class provides: - _get_connection(): Context manager yielding sqlite3.Connection - _logger: Logger instance for logging

Composed from focused sub-mixins: - PatternQueryMixin: get_patterns, get_pattern_by_id, get_pattern_provenance - PatternCrudMixin: record_pattern, record_pattern_application, effectiveness - PatternQuarantineMixin: quarantine_pattern, validate_pattern, retire_pattern - PatternTrustMixin: calculate_trust_score, get_high/low_trust_patterns - PatternSuccessFactorsMixin: update_success_factors, analyze_pattern_why - PatternBroadcastMixin: record_pattern_discovery, check_recent_pattern_discoveries - PatternLifecycleMixin: promote_ready_patterns, update_quarantine_status (v25)

PatternRecord dataclass

PatternRecord(id, pattern_type, pattern_name, description, occurrence_count, first_seen, last_seen, last_confirmed, led_to_success_count, led_to_failure_count, effectiveness_score, variance, suggested_action, context_tags, priority_score, quarantine_status=PENDING, provenance_job_hash=None, provenance_sheet_num=None, quarantined_at=None, validated_at=None, quarantine_reason=None, trust_score=0.5, trust_calculation_date=None, success_factors=None, success_factors_updated_at=None, active=True, content_hash=None, instrument_name=None)

A pattern record stored in the global database.

v19 Evolution: Extended with quarantine_status, provenance, and trust_score fields to support the Pattern Quarantine & Provenance and Pattern Trust Scoring evolutions.

Attributes
quarantine_status class-attribute instance-attribute
quarantine_status = PENDING

Current status in the quarantine lifecycle.

provenance_job_hash class-attribute instance-attribute
provenance_job_hash = None

Hash of the job that first created this pattern.

provenance_sheet_num class-attribute instance-attribute
provenance_sheet_num = None

Sheet number where this pattern was first observed.

quarantined_at class-attribute instance-attribute
quarantined_at = None

When the pattern was moved to QUARANTINED status.

validated_at class-attribute instance-attribute
validated_at = None

When the pattern was moved to VALIDATED status.

quarantine_reason class-attribute instance-attribute
quarantine_reason = None

Reason for quarantine (if quarantined).

trust_score class-attribute instance-attribute
trust_score = 0.5

Trust score (0.0-1.0). 0.5 is neutral, >0.7 is high trust.

trust_calculation_date class-attribute instance-attribute
trust_calculation_date = None

When trust_score was last calculated.

success_factors class-attribute instance-attribute
success_factors = None

WHY this pattern succeeds - captured context conditions and factors.

success_factors_updated_at class-attribute instance-attribute
success_factors_updated_at = None

When success_factors were last updated.

active class-attribute instance-attribute
active = True

Whether this pattern is active (False = soft-deleted).

content_hash class-attribute instance-attribute
content_hash = None

SHA-256 hash of pattern content for cross-name deduplication.

instrument_name class-attribute instance-attribute
instrument_name = None

Backend instrument that produced this pattern (e.g., 'claude_cli').

Functions
__post_init__
__post_init__()

Clamp scored fields to valid ranges.

Source code in src/marianne/learning/store/models.py
def __post_init__(self) -> None:
    """Clamp scored fields to valid ranges."""
    self.trust_score = max(0.0, min(1.0, self.trust_score))
    self.effectiveness_score = max(0.0, min(1.0, self.effectiveness_score))
    self.variance = max(0.0, self.variance)
    self.priority_score = max(0.0, self.priority_score)

QuarantineStatus

Bases: str, Enum

Status of a pattern in the quarantine lifecycle.

v19 Evolution: Pattern Quarantine & Provenance - patterns transition through these states as they are validated through successful applications:

  • PENDING: New patterns start here, awaiting initial validation
  • QUARANTINED: Explicitly marked for review due to concerns
  • VALIDATED: Proven effective through repeated successful applications
  • RETIRED: No longer active, kept for historical reference
Attributes
PENDING class-attribute instance-attribute
PENDING = 'pending'

New pattern awaiting validation through application.

QUARANTINED class-attribute instance-attribute
QUARANTINED = 'quarantined'

Pattern under review - may have caused issues or needs investigation.

VALIDATED class-attribute instance-attribute
VALIDATED = 'validated'

Pattern has proven effective and is trusted for autonomous application.

RETIRED class-attribute instance-attribute
RETIRED = 'retired'

Pattern no longer in active use, retained for history.

RateLimitEvent dataclass

RateLimitEvent(id, error_code, model, recorded_at, expires_at, source_job_hash, duration_seconds)

A rate limit event for cross-workspace coordination.

Evolution #8: Tracks rate limit events across workspaces so that parallel jobs can coordinate and avoid hitting the same rate limits.

RateLimitMixin

Mixin providing rate limit event functionality.

This mixin provides methods for recording and querying rate limit events across workspaces. When one job hits a rate limit, it records the event so that other parallel jobs can check and avoid hitting the same limit.

Requires the following from the composed class
  • _get_connection() -> context manager yielding sqlite3.Connection
  • hash_job(job_id: str) -> str (static method)
Functions
record_rate_limit_event
record_rate_limit_event(error_code, duration_seconds, job_id, model=None)

Record a rate limit event for cross-workspace coordination.

When one job hits a rate limit, it records the event so that other parallel jobs can check and avoid hitting the same limit.

Evolution #8: Cross-Workspace Circuit Breaker - enables jobs running in different workspaces to share rate limit awareness.

Parameters:

Name Type Description Default
error_code str

The error code (e.g., 'E101', 'E102').

required
duration_seconds float

Expected rate limit duration in seconds.

required
job_id str

ID of the job that encountered the rate limit.

required
model str | None

Optional model name that triggered the limit.

None

Returns:

Type Description
str

The rate limit event record ID.

Source code in src/marianne/learning/store/rate_limits.py
def record_rate_limit_event(
    self,
    error_code: str,
    duration_seconds: float,
    job_id: str,
    model: str | None = None,
) -> str:
    """Record a rate limit event for cross-workspace coordination.

    When one job hits a rate limit, it records the event so that other
    parallel jobs can check and avoid hitting the same limit.

    Evolution #8: Cross-Workspace Circuit Breaker - enables jobs running
    in different workspaces to share rate limit awareness.

    Args:
        error_code: The error code (e.g., 'E101', 'E102').
        duration_seconds: Expected rate limit duration in seconds.
        job_id: ID of the job that encountered the rate limit.
        model: Optional model name that triggered the limit.

    Returns:
        The rate limit event record ID.
    """
    from datetime import timedelta

    record_id = str(uuid.uuid4())
    now = datetime.now(UTC)
    job_hash = self.hash_job(job_id)

    # Use 80% of expected duration as expiry (conservative TTL)
    # This avoids waiting too long while still being safe
    expires_at = now + timedelta(seconds=duration_seconds * 0.8)

    with self._get_connection() as conn:
        conn.execute(
            """
            INSERT INTO rate_limit_events (
                id, error_code, model, recorded_at, expires_at,
                source_job_hash, duration_seconds
            ) VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
            (
                record_id,
                error_code,
                model,
                now.isoformat(),
                expires_at.isoformat(),
                job_hash,
                duration_seconds,
            ),
        )

    _logger.info(
        f"Recorded rate limit event {record_id}: {error_code} "
        f"expires in {duration_seconds * 0.8:.0f}s"
    )
    return record_id
is_rate_limited
is_rate_limited(error_code=None, model=None)

Check if there's an active rate limit from another job.

Queries the rate_limit_events table to see if any unexpired rate limit events exist that would affect this job.

Evolution #8: Cross-Workspace Circuit Breaker - allows jobs to check if another job has already hit a rate limit.

Parameters:

Name Type Description Default
error_code str | None

Optional error code to filter by. If None, checks any.

None
model str | None

Optional model to filter by. If None, checks any.

None

Returns:

Type Description
bool

Tuple of (is_limited: bool, seconds_until_expiry: float | None).

float | None

If is_limited is True, seconds_until_expiry indicates when it clears.

Source code in src/marianne/learning/store/rate_limits.py
def is_rate_limited(
    self,
    error_code: str | None = None,
    model: str | None = None,
) -> tuple[bool, float | None]:
    """Check if there's an active rate limit from another job.

    Queries the rate_limit_events table to see if any unexpired
    rate limit events exist that would affect this job.

    Evolution #8: Cross-Workspace Circuit Breaker - allows jobs to
    check if another job has already hit a rate limit.

    Args:
        error_code: Optional error code to filter by. If None, checks any.
        model: Optional model to filter by. If None, checks any.

    Returns:
        Tuple of (is_limited: bool, seconds_until_expiry: float | None).
        If is_limited is True, seconds_until_expiry indicates when it clears.
    """
    now = datetime.now(UTC)

    with self._get_connection() as conn:
        # Build query based on filters
        query = """
            SELECT expires_at, error_code, model, duration_seconds
            FROM rate_limit_events
            WHERE expires_at > ?
        """
        params: list[str] = [now.isoformat()]

        if error_code is not None:
            query += " AND error_code = ?"
            params.append(error_code)

        if model is not None:
            query += " AND model = ?"
            params.append(model)

        query += " ORDER BY expires_at DESC LIMIT 1"

        cursor = conn.execute(query, params)
        row = cursor.fetchone()

        if row is None:
            return False, None

        # Calculate time until expiry
        expires_at = datetime.fromisoformat(row["expires_at"])
        seconds_until_expiry = (expires_at - now).total_seconds()

        if seconds_until_expiry <= 0:
            return False, None

        _logger.debug(
            f"Rate limit active: {row['error_code']} "
            f"(expires in {seconds_until_expiry:.0f}s)"
        )
        return True, seconds_until_expiry
get_active_rate_limits
get_active_rate_limits(model=None)

Get all active (unexpired) rate limit events.

Parameters:

Name Type Description Default
model str | None

Optional model to filter by.

None

Returns:

Type Description
list[RateLimitEvent]

List of RateLimitEvent objects that haven't expired yet.

Source code in src/marianne/learning/store/rate_limits.py
def get_active_rate_limits(
    self,
    model: str | None = None,
) -> list[RateLimitEvent]:
    """Get all active (unexpired) rate limit events.

    Args:
        model: Optional model to filter by.

    Returns:
        List of RateLimitEvent objects that haven't expired yet.
    """
    now = datetime.now(UTC)

    with self._get_connection() as conn:
        if model:
            cursor = conn.execute(
                """
                SELECT * FROM rate_limit_events
                WHERE expires_at > ? AND model = ?
                ORDER BY expires_at DESC
                """,
                (now.isoformat(), model),
            )
        else:
            cursor = conn.execute(
                """
                SELECT * FROM rate_limit_events
                WHERE expires_at > ?
                ORDER BY expires_at DESC
                """,
                (now.isoformat(),),
            )

        events = []
        for row in cursor.fetchall():
            events.append(
                RateLimitEvent(
                    id=row["id"],
                    error_code=row["error_code"],
                    model=row["model"],
                    recorded_at=datetime.fromisoformat(row["recorded_at"]),
                    expires_at=datetime.fromisoformat(row["expires_at"]),
                    source_job_hash=row["source_job_hash"],
                    duration_seconds=row["duration_seconds"],
                )
            )

        return events
cleanup_expired_rate_limits
cleanup_expired_rate_limits()

Remove expired rate limit events from the database.

This is a housekeeping method that can be called periodically to prevent the rate_limit_events table from growing unbounded.

Returns:

Type Description
int

Number of expired records deleted.

Source code in src/marianne/learning/store/rate_limits.py
def cleanup_expired_rate_limits(self) -> int:
    """Remove expired rate limit events from the database.

    This is a housekeeping method that can be called periodically
    to prevent the rate_limit_events table from growing unbounded.

    Returns:
        Number of expired records deleted.
    """
    now = datetime.now(UTC)

    with self._get_connection() as conn:
        cursor = conn.execute(
            "DELETE FROM rate_limit_events WHERE expires_at <= ?",
            (now.isoformat(),),
        )
        deleted_count = cursor.rowcount

    if deleted_count > 0:
        _logger.debug("rate_limit_events_cleaned", count=deleted_count)

    return deleted_count

SuccessFactors dataclass

SuccessFactors(validation_types=list(), error_categories=list(), prior_sheet_status=None, time_of_day_bucket=None, retry_iteration=0, escalation_was_pending=False, grounding_confidence=None, occurrence_count=1, success_rate=1.0)

Captures WHY a pattern succeeded - the context conditions and factors.

v22 Evolution: Metacognitive Pattern Reflection - patterns now capture not just WHAT happened but WHY it worked. This enables better pattern selection by understanding causality, not just correlation.

Success factors include: - Context conditions: validation types, error categories, execution phase - Timing factors: time of day, retry iteration, prior sheet outcomes - Prerequisite states: prior sheet completion, escalation status

Attributes
validation_types class-attribute instance-attribute
validation_types = field(default_factory=list)

Validation types that were active: file, regex, artifact, etc.

error_categories class-attribute instance-attribute
error_categories = field(default_factory=list)

Error categories present in the execution: rate_limit, auth, validation, etc.

prior_sheet_status class-attribute instance-attribute
prior_sheet_status = None

Status of the immediately prior sheet: completed, failed, skipped.

time_of_day_bucket class-attribute instance-attribute
time_of_day_bucket = None

Time bucket: morning, afternoon, evening, night.

retry_iteration class-attribute instance-attribute
retry_iteration = 0

Which retry attempt this success occurred on (0 = first attempt).

escalation_was_pending class-attribute instance-attribute
escalation_was_pending = False

Whether an escalation was pending when pattern succeeded.

grounding_confidence class-attribute instance-attribute
grounding_confidence = None

Grounding confidence score if external validation was present.

occurrence_count class-attribute instance-attribute
occurrence_count = 1

How often this factor combination has been observed.

success_rate class-attribute instance-attribute
success_rate = 1.0

Success rate when these factors are present (0.0-1.0).

Functions
__post_init__
__post_init__()

Clamp success_rate and occurrence_count to valid bounds.

Source code in src/marianne/learning/store/models.py
def __post_init__(self) -> None:
    """Clamp success_rate and occurrence_count to valid bounds."""
    self.success_rate = max(0.0, min(1.0, self.success_rate))
    self.occurrence_count = max(1, self.occurrence_count)
to_dict
to_dict()

Serialize to dictionary for JSON storage.

Source code in src/marianne/learning/store/models.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to dictionary for JSON storage."""
    return {
        "validation_types": self.validation_types,
        "error_categories": self.error_categories,
        "prior_sheet_status": self.prior_sheet_status,
        "time_of_day_bucket": self.time_of_day_bucket,
        "retry_iteration": self.retry_iteration,
        "escalation_was_pending": self.escalation_was_pending,
        "grounding_confidence": self.grounding_confidence,
        "occurrence_count": self.occurrence_count,
        "success_rate": self.success_rate,
    }
from_dict classmethod
from_dict(data)

Deserialize from dictionary.

Source code in src/marianne/learning/store/models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "SuccessFactors":
    """Deserialize from dictionary."""
    return cls(
        validation_types=data.get("validation_types", []),
        error_categories=data.get("error_categories", []),
        prior_sheet_status=data.get("prior_sheet_status"),
        time_of_day_bucket=data.get("time_of_day_bucket"),
        retry_iteration=data.get("retry_iteration", 0),
        escalation_was_pending=data.get("escalation_was_pending", False),
        grounding_confidence=data.get("grounding_confidence"),
        occurrence_count=data.get("occurrence_count", 1),
        success_rate=data.get("success_rate", 1.0),
    )
get_time_bucket staticmethod
get_time_bucket(hour)

Get time bucket for an hour (0-23).

Source code in src/marianne/learning/store/models.py
@staticmethod
def get_time_bucket(hour: int) -> str:
    """Get time bucket for an hour (0-23)."""
    if 5 <= hour < 12:
        return "morning"
    elif 12 <= hour < 17:
        return "afternoon"
    elif 17 <= hour < 21:
        return "evening"
    else:
        return "night"

Functions

get_global_store

get_global_store(db_path=None)

Get or create the global learning store singleton.

This function provides a convenient singleton accessor for the GlobalLearningStore. It ensures only one store instance exists per database path, avoiding the overhead of creating multiple connections to the same SQLite database.

Parameters:

Name Type Description Default
db_path Path | None

Optional custom database path. If None, uses the default path at ~/.marianne/global-learning.db.

None

Returns:

Type Description
GlobalLearningStore

The GlobalLearningStore singleton instance.

Example

store = get_global_store() # Uses default path store = get_global_store(Path("/custom/path.db")) # Custom path

Source code in src/marianne/learning/store/__init__.py
def get_global_store(
    db_path: Path | None = None,
) -> GlobalLearningStore:
    """Get or create the global learning store singleton.

    This function provides a convenient singleton accessor for the GlobalLearningStore.
    It ensures only one store instance exists per database path, avoiding the overhead
    of creating multiple connections to the same SQLite database.

    Args:
        db_path: Optional custom database path. If None, uses the default
            path at ~/.marianne/global-learning.db.

    Returns:
        The GlobalLearningStore singleton instance.

    Example:
        >>> store = get_global_store()  # Uses default path
        >>> store = get_global_store(Path("/custom/path.db"))  # Custom path
    """
    global _global_store

    with _global_store_lock:
        if _global_store is None or (
            db_path is not None and _global_store.db_path != db_path
        ):
            _global_store = GlobalLearningStore(db_path)

    return _global_store