Skip to content

base

base

Base class for GlobalLearningStore with connection and schema management.

This module provides the foundational GlobalLearningStoreBase class that handles: - SQLite database connection management with WAL mode - Schema creation and migration - Static hashing utilities for workspace and job identification

Extracted from global_store.py as part of the modularization effort. Mixins inherit from this base to add domain-specific functionality.

Classes

WhereBuilder

WhereBuilder()

Accumulates SQL WHERE clauses and their bound parameters.

Provides a consistent pattern for building dynamic SQL queries across learning store mixins. Clauses are joined with AND.

Usage::

wb = WhereBuilder()
wb.add("status = ?", status)
wb.add("score >= ?", min_score)
where_sql, params = wb.build()
conn.execute(f"SELECT * FROM t WHERE {where_sql}", params)
Source code in src/marianne/learning/store/base.py
def __init__(self) -> None:
    self._clauses: list[str] = []
    self._params: list[SQLParam] = []
Functions
add
add(clause, *params)

Append a WHERE clause with its bound parameters.

Source code in src/marianne/learning/store/base.py
def add(self, clause: str, *params: SQLParam) -> None:
    """Append a WHERE clause with its bound parameters."""
    self._clauses.append(clause)
    self._params.extend(params)
build
build()

Return the combined WHERE fragment and parameter tuple.

Returns ("1=1", ()) when no clauses have been added.

Source code in src/marianne/learning/store/base.py
def build(self) -> tuple[str, tuple[SQLParam, ...]]:
    """Return the combined WHERE fragment and parameter tuple.

    Returns ``("1=1", ())`` when no clauses have been added.
    """
    if not self._clauses:
        return "1=1", ()
    return " AND ".join(self._clauses), tuple(self._params)

GlobalLearningStoreBase

GlobalLearningStoreBase(db_path=None)

SQLite-based global learning store base class.

Provides persistent storage infrastructure for execution outcomes, detected patterns, and error recovery data across all Marianne workspaces. Uses WAL mode for safe concurrent access.

This base class handles: - Database connection lifecycle - Schema version management - Migration and schema creation - Hashing utilities

Subclasses (via mixins) add domain-specific methods for patterns, executions, rate limits, drift detection, escalation, and budget management.

Attributes:

Name Type Description
db_path

Path to the SQLite database file.

_logger

Module logger instance for consistent logging.

Initialize the global learning store.

Creates the database directory if needed, establishes the connection, and runs any necessary migrations.

Parameters:

Name Type Description Default
db_path Path | None

Path to the SQLite database file. Defaults to ~/.marianne/global-learning.db

None
Source code in src/marianne/learning/store/base.py
def __init__(self, db_path: Path | None = None) -> None:
    """Initialize the global learning store.

    Creates the database directory if needed, establishes the connection,
    and runs any necessary migrations.

    Args:
        db_path: Path to the SQLite database file.
                Defaults to ~/.marianne/global-learning.db
    """
    self.db_path = db_path or DEFAULT_GLOBAL_STORE_PATH
    self._logger = _logger
    # ContextVar scopes the batch connection per-asyncio-task, preventing
    # a race where Task A's batch_connection() leaks into Task B's
    # _get_connection() calls.  Each task sees its own (or no) batch conn.
    self._batch_conn: contextvars.ContextVar[sqlite3.Connection | None] = (
        contextvars.ContextVar("_batch_conn", default=None)
    )
    self._ensure_db_exists()
    self._migrate_if_needed()
Functions
batch_connection
batch_connection()

Reuse a single connection across multiple operations.

While this context manager is active, all _get_connection() calls will reuse the same connection, avoiding repeated open/close overhead. The connection is committed once on successful exit or rolled back on error.

Example::

with store.batch_connection():
    patterns = store.get_patterns(min_priority=0.01)
    for p in patterns:
        store.update_trust_score(p.pattern_id, ...)

Yields:

Type Description
Connection

The shared sqlite3.Connection instance.

Source code in src/marianne/learning/store/base.py
@contextmanager
def batch_connection(self) -> Generator[sqlite3.Connection, None, None]:
    """Reuse a single connection across multiple operations.

    While this context manager is active, all ``_get_connection()`` calls
    will reuse the same connection, avoiding repeated open/close overhead.
    The connection is committed once on successful exit or rolled back on error.

    Example::

        with store.batch_connection():
            patterns = store.get_patterns(min_priority=0.01)
            for p in patterns:
                store.update_trust_score(p.pattern_id, ...)

    Yields:
        The shared sqlite3.Connection instance.
    """
    conn = sqlite3.connect(str(self.db_path), timeout=30.0)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA foreign_keys=ON")
    conn.execute("PRAGMA busy_timeout=30000")
    conn.row_factory = sqlite3.Row
    token = self._batch_conn.set(conn)
    try:
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        _logger.warning(
            f"Batch operation failed on {self.db_path}: {type(e).__name__}: {e}"
        )
        raise
    finally:
        self._batch_conn.reset(token)
        conn.close()
close
close()

Close any persistent resources.

No-op: connections are managed per-operation via _get_connection(). This method exists for API compatibility so callers can unconditionally call store.close() without checking the backend type.

Source code in src/marianne/learning/store/base.py
def close(self) -> None:  # noqa: B027 — intentional concrete no-op default
    """Close any persistent resources.

    No-op: connections are managed per-operation via _get_connection().
    This method exists for API compatibility so callers can unconditionally
    call ``store.close()`` without checking the backend type.
    """
hash_workspace staticmethod
hash_workspace(workspace_path)

Generate a stable hash for a workspace path.

Creates a reproducible 16-character hex hash from the resolved absolute path. This allows pattern matching across sessions while preserving privacy (paths are not stored directly).

Parameters:

Name Type Description Default
workspace_path Path

The absolute path to the workspace.

required

Returns:

Type Description
str

A hex string hash of the workspace path (16 characters).

Source code in src/marianne/learning/store/base.py
@staticmethod
def hash_workspace(workspace_path: Path) -> str:
    """Generate a stable hash for a workspace path.

    Creates a reproducible 16-character hex hash from the resolved
    absolute path. This allows pattern matching across sessions
    while preserving privacy (paths are not stored directly).

    Args:
        workspace_path: The absolute path to the workspace.

    Returns:
        A hex string hash of the workspace path (16 characters).
    """
    normalized = str(workspace_path.resolve())
    return hashlib.sha256(normalized.encode()).hexdigest()[:16]
hash_job staticmethod
hash_job(job_name, config_hash=None)

Generate a stable hash for a job.

Creates a reproducible 16-character hex hash from the job name and optional config hash. The config hash enables version-awareness: the same job with different configs will have different hashes.

Parameters:

Name Type Description Default
job_name str

The job name.

required
config_hash str | None

Optional hash of the job config for versioning.

None

Returns:

Type Description
str

A hex string hash of the job (16 characters).

Source code in src/marianne/learning/store/base.py
@staticmethod
def hash_job(job_name: str, config_hash: str | None = None) -> str:
    """Generate a stable hash for a job.

    Creates a reproducible 16-character hex hash from the job name
    and optional config hash. The config hash enables version-awareness:
    the same job with different configs will have different hashes.

    Args:
        job_name: The job name.
        config_hash: Optional hash of the job config for versioning.

    Returns:
        A hex string hash of the job (16 characters).
    """
    combined = f"{job_name}:{config_hash or ''}"
    return hashlib.sha256(combined.encode()).hexdigest()[:16]
clear_all
clear_all()

Clear all data from the global store.

WARNING: This is destructive and should only be used for testing.

Source code in src/marianne/learning/store/base.py
def clear_all(self) -> None:
    """Clear all data from the global store.

    WARNING: This is destructive and should only be used for testing.
    """
    with self._get_connection() as conn:
        for table in self._DATA_TABLES:
            conn.execute(f"DELETE FROM {table}")  # noqa: S608 -- table names are hardcoded constants

    _logger.warning("Cleared all data from global learning store")

Functions