Skip to content

patterns_crud

patterns_crud

Pattern CRUD and effectiveness mixin for GlobalLearningStore.

Provides methods for creating, updating, and managing pattern effectiveness: - record_pattern: Create or update a pattern - record_pattern_application: Record pattern usage and update effectiveness - _calculate_effectiveness: Bayesian moving average with decay - _calculate_priority_score: Priority from effectiveness and frequency - update_pattern_effectiveness: Manual effectiveness recalculation - recalculate_all_pattern_priorities: Batch priority recalculation

Classes

PatternProvenance dataclass

PatternProvenance(job_hash=None, sheet_num=None)

Provenance information for where a pattern was discovered.

Groups the job_hash and sheet_num that always travel together when recording pattern origins.

PatternCrudMixin

Mixin providing pattern CRUD and effectiveness methods.

This mixin requires that the composed class provides: - _get_connection(): Context manager yielding sqlite3.Connection - update_pattern_effectiveness(): For batch recalculation (self-referential)

Functions
record_pattern
record_pattern(pattern_type, pattern_name, description=None, context_tags=None, suggested_action=None, provenance=None, provenance_job_hash=None, provenance_sheet_num=None, instrument_name=None)

Record or update a pattern in the global store.

Resolution order: 1. If a pattern with the same type+name ID exists, upsert it (existing behavior). 2. If no type+name match but a content_hash match exists, merge into the highest-priority existing pattern (incrementing its count, updating last_seen). Soft-deleted matches are reactivated. 3. Otherwise, insert a new pattern.

Parameters:

Name Type Description Default
pattern_type str

The type of pattern (e.g., 'validation_failure').

required
pattern_name str

A unique name for this pattern.

required
description str | None

Human-readable description.

None
context_tags list[str] | None

Tags for matching context.

None
suggested_action str | None

Recommended action for this pattern.

None
provenance PatternProvenance | None

Grouped provenance info (job_hash + sheet_num).

None
provenance_job_hash str | None

Deprecated — use provenance instead.

None
provenance_sheet_num int | None

Deprecated — use provenance instead.

None
instrument_name str | None

Backend instrument that produced this pattern.

None

Returns:

Type Description
str

The pattern ID (may be a merged-to existing ID).

Source code in src/marianne/learning/store/patterns_crud.py
def record_pattern(
    self,
    pattern_type: str,
    pattern_name: str,
    description: str | None = None,
    context_tags: list[str] | None = None,
    suggested_action: str | None = None,
    provenance: PatternProvenance | None = None,
    # Deprecated individual params — use `provenance` instead
    provenance_job_hash: str | None = None,
    provenance_sheet_num: int | None = None,
    instrument_name: str | None = None,
) -> str:
    """Record or update a pattern in the global store.

    Resolution order:
    1. If a pattern with the same type+name ID exists, upsert it (existing behavior).
    2. If no type+name match but a content_hash match exists, merge into the
       highest-priority existing pattern (incrementing its count, updating last_seen).
       Soft-deleted matches are reactivated.
    3. Otherwise, insert a new pattern.

    Args:
        pattern_type: The type of pattern (e.g., 'validation_failure').
        pattern_name: A unique name for this pattern.
        description: Human-readable description.
        context_tags: Tags for matching context.
        suggested_action: Recommended action for this pattern.
        provenance: Grouped provenance info (job_hash + sheet_num).
        provenance_job_hash: Deprecated — use provenance instead.
        provenance_sheet_num: Deprecated — use provenance instead.
        instrument_name: Backend instrument that produced this pattern.

    Returns:
        The pattern ID (may be a merged-to existing ID).
    """
    # Resolve provenance: prefer grouped param, fall back to individual params
    if provenance is not None:
        job_hash = provenance.job_hash
        sheet_num = provenance.sheet_num
    else:
        job_hash = provenance_job_hash
        sheet_num = provenance_sheet_num

    now = datetime.now().isoformat()
    # Normalize for consistent dedup: "File Not Created" -> "file not created"
    normalized_name = " ".join(pattern_name.lower().split())
    pattern_id = hashlib.sha256(
        f"{pattern_type}:{normalized_name}".encode()
    ).hexdigest()[:16]

    content_hash = self._compute_content_hash(
        pattern_type, normalized_name, description,
    )

    with self._get_connection() as conn:
        # Step 1: Try type+name upsert first (existing behavior, highest priority).
        cursor = conn.execute(
            "SELECT id FROM patterns WHERE id = ?", (pattern_id,),
        )
        existing_by_id = cursor.fetchone()

        if existing_by_id:
            # Type+name match — upsert as before, also update content_hash
            # and reactivate if soft-deleted.
            conn.execute(
                """
                UPDATE patterns SET
                    occurrence_count = occurrence_count + 1,
                    last_seen = ?,
                    description = COALESCE(?, description),
                    suggested_action = COALESCE(?, suggested_action),
                    context_tags = ?,
                    content_hash = ?,
                    active = 1
                WHERE id = ?
                """,
                (
                    now,
                    description,
                    suggested_action,
                    json.dumps(context_tags or []),
                    content_hash,
                    pattern_id,
                ),
            )
            return pattern_id

        # Step 2: No type+name match. Check for content_hash merge.
        hash_match = conn.execute(
            """
            SELECT id, COALESCE(active, 1) as active
            FROM patterns
            WHERE content_hash = ?
            ORDER BY priority_score DESC
            LIMIT 1
            """,
            (content_hash,),
        ).fetchone()

        if hash_match:
            merged_id = hash_match["id"]
            is_active = hash_match["active"]

            # Merge into existing pattern: increment count, update last_seen,
            # reactivate if soft-deleted. Preserve original instrument_name.
            conn.execute(
                """
                UPDATE patterns SET
                    occurrence_count = occurrence_count + 1,
                    last_seen = ?,
                    active = 1
                WHERE id = ?
                """,
                (now, merged_id),
            )

            if not is_active:
                _logger.info(
                    "pattern_reactivated_via_hash_merge",
                    merged_id=merged_id,
                    new_type=pattern_type,
                    new_name=pattern_name,
                )
            else:
                _logger.debug(
                    "pattern_merged_via_content_hash",
                    merged_id=merged_id,
                    new_type=pattern_type,
                    new_name=pattern_name,
                    content_hash=content_hash,
                )

            return cast(str, merged_id)

        # Step 3: No match at all — insert new pattern.
        conn.execute(
            """
            INSERT INTO patterns (
                id, pattern_type, pattern_name, description,
                occurrence_count, first_seen, last_seen, last_confirmed,
                led_to_success_count, led_to_failure_count,
                effectiveness_score, variance, suggested_action,
                context_tags, priority_score,
                quarantine_status, provenance_job_hash, provenance_sheet_num,
                trust_score, trust_calculation_date,
                content_hash, instrument_name, active
            ) VALUES (?, ?, ?, ?, 1, ?, ?, ?, 0, 0, 0.5, 0.0, ?, ?, 0.5,
                      ?, ?, ?, 0.5, ?,
                      ?, ?, 1)
            """,
            (
                pattern_id,
                pattern_type,
                pattern_name,
                description,
                now,
                now,
                now,
                suggested_action,
                json.dumps(context_tags or []),
                QuarantineStatus.PENDING.value,
                job_hash,
                sheet_num,
                now,
                content_hash,
                instrument_name,
            ),
        )

    return pattern_id
record_pattern_application
record_pattern_application(pattern_id, execution_id, pattern_led_to_success, retry_count_before=0, retry_count_after=0, application_mode='exploitation', validation_passed=None, grounding_confidence=None)

Record that a pattern was applied to an execution.

Creates the feedback loop for effectiveness tracking. After recording the application, automatically updates effectiveness_score and priority_score.

Parameters:

Name Type Description Default
pattern_id str

The pattern that was applied.

required
execution_id str

The execution it was applied to.

required
pattern_led_to_success bool

Whether applying this pattern led to execution success (validation passed on first attempt).

required
retry_count_before int

Retry count before pattern applied.

0
retry_count_after int

Retry count after pattern applied.

0
application_mode str

'exploration' or 'exploitation'.

'exploitation'
validation_passed bool | None

Whether validation passed on first attempt.

None
grounding_confidence float | None

Grounding confidence (0.0-1.0).

None

Returns:

Type Description
str

The application record ID.

Source code in src/marianne/learning/store/patterns_crud.py
def record_pattern_application(
    self,
    pattern_id: str,
    execution_id: str,
    pattern_led_to_success: bool,
    retry_count_before: int = 0,
    retry_count_after: int = 0,
    application_mode: str = "exploitation",
    validation_passed: bool | None = None,
    grounding_confidence: float | None = None,
) -> str:
    """Record that a pattern was applied to an execution.

    Creates the feedback loop for effectiveness tracking. After recording
    the application, automatically updates effectiveness_score and priority_score.

    Args:
        pattern_id: The pattern that was applied.
        execution_id: The execution it was applied to.
        pattern_led_to_success: Whether applying this pattern led to
            execution success (validation passed on first attempt).
        retry_count_before: Retry count before pattern applied.
        retry_count_after: Retry count after pattern applied.
        application_mode: 'exploration' or 'exploitation'.
        validation_passed: Whether validation passed on first attempt.
        grounding_confidence: Grounding confidence (0.0-1.0).

    Returns:
        The application record ID.
    """
    _ = validation_passed  # Accepted for API compatibility, not yet stored
    app_id = str(uuid.uuid4())
    now = datetime.now()
    now_iso = now.isoformat()

    with self._get_connection() as conn:
        # Guard: verify pattern exists before recording application
        exists = conn.execute(
            "SELECT 1 FROM patterns WHERE id = ?", (pattern_id,)
        ).fetchone()
        if not exists:
            _logger.warning(
                "pattern_application_skipped",
                pattern_id=pattern_id,
                reason="pattern_not_found",
            )
            return app_id

        try:
            conn.execute(
                """
                INSERT INTO pattern_applications (
                    id, pattern_id, execution_id, applied_at,
                    pattern_led_to_success, retry_count_before,
                    retry_count_after, grounding_confidence
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    app_id,
                    pattern_id,
                    execution_id,
                    now_iso,
                    pattern_led_to_success,
                    retry_count_before,
                    retry_count_after,
                    grounding_confidence,
                ),
            )
        except sqlite3.IntegrityError as e:
            # Legacy databases may have FK constraints on
            # pattern_applications that reference executions(id).
            # The v15 migration removes these, but if it hasn't
            # run yet, we catch and log instead of propagating.
            _logger.warning(
                "pattern_application_insert_fk_error",
                pattern_id=pattern_id,
                execution_id=execution_id,
                error=str(e),
            )
            return app_id

        if pattern_led_to_success:
            conn.execute(
                """
                UPDATE patterns SET
                    led_to_success_count = led_to_success_count + 1,
                    last_confirmed = ?
                WHERE id = ?
                """,
                (now_iso, pattern_id),
            )
        else:
            conn.execute(
                """
                UPDATE patterns SET
                    led_to_failure_count = led_to_failure_count + 1
                WHERE id = ?
                """,
                (pattern_id,),
            )

        cursor = conn.execute(
            """
            SELECT led_to_success_count, led_to_failure_count, last_confirmed,
                   occurrence_count, variance
            FROM patterns WHERE id = ?
            """,
            (pattern_id,),
        )
        row = cursor.fetchone()

        if row:
            last_confirmed_raw = row["last_confirmed"]
            last_confirmed = (
                datetime.fromisoformat(last_confirmed_raw)
                if last_confirmed_raw
                else now
            )
            new_effectiveness = self._calculate_effectiveness(
                pattern_id=pattern_id,
                led_to_success_count=row["led_to_success_count"],
                led_to_failure_count=row["led_to_failure_count"],
                last_confirmed=last_confirmed,
                now=now,
                conn=conn,
            )

            new_priority = self._calculate_priority_score(
                effectiveness=new_effectiveness,
                occurrence_count=row["occurrence_count"],
                variance=row["variance"],
            )

            conn.execute(
                """
                UPDATE patterns SET
                    effectiveness_score = ?,
                    priority_score = ?
                WHERE id = ?
                """,
                (new_effectiveness, new_priority, pattern_id),
            )

            _logger.debug(
                "pattern_effectiveness_updated",
                pattern_id=pattern_id,
                effectiveness=round(new_effectiveness, 3),
                priority=round(new_priority, 3),
                mode=application_mode,
            )

    return app_id
soft_delete_pattern
soft_delete_pattern(pattern_id)

Soft-delete a pattern by setting active=0.

Preserves FK integrity — the row remains in the database so pattern_applications referencing it don't violate constraints. Re-recording a soft-deleted pattern reactivates it.

Parameters:

Name Type Description Default
pattern_id str

The pattern to soft-delete.

required

Returns:

Type Description
bool

True if the pattern was found and deactivated, False if not found.

Source code in src/marianne/learning/store/patterns_crud.py
def soft_delete_pattern(self, pattern_id: str) -> bool:
    """Soft-delete a pattern by setting active=0.

    Preserves FK integrity — the row remains in the database so
    pattern_applications referencing it don't violate constraints.
    Re-recording a soft-deleted pattern reactivates it.

    Args:
        pattern_id: The pattern to soft-delete.

    Returns:
        True if the pattern was found and deactivated, False if not found.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            "UPDATE patterns SET active = 0 WHERE id = ? AND COALESCE(active, 1) = 1",
            (pattern_id,),
        )
        if cursor.rowcount > 0:
            _logger.info(
                "pattern_soft_deleted",
                pattern_id=pattern_id,
            )
            return True
        return False
update_pattern_effectiveness
update_pattern_effectiveness(pattern_id)

Manually recalculate and update a pattern's effectiveness.

Parameters:

Name Type Description Default
pattern_id str

The pattern to update.

required

Returns:

Type Description
float | None

New effectiveness score, or None if pattern not found.

Source code in src/marianne/learning/store/patterns_crud.py
def update_pattern_effectiveness(
    self,
    pattern_id: str,
) -> float | None:
    """Manually recalculate and update a pattern's effectiveness.

    Args:
        pattern_id: The pattern to update.

    Returns:
        New effectiveness score, or None if pattern not found.
    """
    with self._get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT led_to_success_count, led_to_failure_count, last_confirmed,
                   occurrence_count, variance
            FROM patterns WHERE id = ?
            """,
            (pattern_id,),
        )
        row = cursor.fetchone()

        if not row:
            return None

        now = datetime.now()
        last_confirmed_raw = row["last_confirmed"]
        last_confirmed = (
            datetime.fromisoformat(last_confirmed_raw)
            if last_confirmed_raw
            else now
        )
        new_effectiveness = self._calculate_effectiveness(
            pattern_id=pattern_id,
            led_to_success_count=row["led_to_success_count"],
            led_to_failure_count=row["led_to_failure_count"],
            last_confirmed=last_confirmed,
            now=now,
            conn=conn,
        )

        new_priority = self._calculate_priority_score(
            effectiveness=new_effectiveness,
            occurrence_count=row["occurrence_count"],
            variance=row["variance"],
        )

        conn.execute(
            """
            UPDATE patterns SET
                effectiveness_score = ?,
                priority_score = ?
            WHERE id = ?
            """,
            (new_effectiveness, new_priority, pattern_id),
        )

        _logger.debug(
            "pattern_effectiveness_manual_update",
            pattern_id=pattern_id,
            effectiveness=round(new_effectiveness, 3),
            priority=round(new_priority, 3),
        )
        return new_effectiveness
recalculate_all_pattern_priorities
recalculate_all_pattern_priorities()

Recalculate priorities for all patterns.

Uses batch_connection() to reuse a single SQLite connection across all pattern updates, avoiding N+1 connection overhead.

Returns:

Type Description
int

Number of patterns updated.

Source code in src/marianne/learning/store/patterns_crud.py
def recalculate_all_pattern_priorities(self) -> int:
    """Recalculate priorities for all patterns.

    Uses batch_connection() to reuse a single SQLite connection across
    all pattern updates, avoiding N+1 connection overhead.

    Returns:
        Number of patterns updated.
    """
    with self.batch_connection():
        with self._get_connection() as conn:
            cursor = conn.execute(
                "SELECT id FROM patterns"
            )
            pattern_ids = [row["id"] for row in cursor.fetchall()]

        updated = 0
        for pattern_id in pattern_ids:
            result = self.update_pattern_effectiveness(pattern_id)
            if result is not None:
                updated += 1

    _logger.info("priorities_recalculated", count=updated)
    return updated