Skip to content

learning_hub

learning_hub

Centralized learning hub — single store shared across all daemon jobs.

Instead of each job opening its own connection to the global learning store, the daemon maintains a single GlobalLearningStore instance and passes it to every JobService. This provides three benefits:

  1. Instant cross-job learning. Pattern discoveries in Job A are immediately visible to Job B because they share the same in-memory store backed by one SQLite connection.

  2. No locking contention. Multiple jobs writing through the same store instance serialise at the Python level rather than fighting over SQLite WAL locks across threads/processes.

  3. Lifecycle ownership. The daemon controls when the store is created (start) and when a final persist happens (stop), ensuring clean shutdown semantics.

Classes

LearningHub

LearningHub(db_path=None)

Centralized learning for daemon mode.

Instead of each job opening its own SQLite connection to ~/.marianne/global-learning.db, the daemon maintains a single GlobalLearningStore instance shared across all jobs.

Benefits: - Pattern discoveries in Job A immediately available to Job B - No cross-process SQLite locking contention - Periodic persistence instead of per-write persistence - Cross-job effectiveness tracking

Source code in src/marianne/daemon/learning_hub.py
def __init__(self, db_path: Path | None = None) -> None:
    self._db_path = db_path or DEFAULT_GLOBAL_STORE_PATH
    self._store: GlobalLearningStore | None = None
    self._heartbeat_interval = 60.0  # Persist every 60 seconds
    self._heartbeat_task: asyncio.Task[None] | None = None
Attributes
store property
store

Get the shared learning store.

Raises:

Type Description
RuntimeError

If the hub has not been started yet.

is_running property
is_running

Whether the hub has been started and has an active store.

Functions
start async
start()

Initialize store and start persistence loop.

Source code in src/marianne/daemon/learning_hub.py
async def start(self) -> None:
    """Initialize store and start persistence loop."""
    self._store = GlobalLearningStore(self._db_path)
    self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
    self._heartbeat_task.add_done_callback(self._on_heartbeat_done)
    _logger.info("learning_hub.started", db_path=str(self._db_path))
stop async
stop()

Persist final state and stop.

Source code in src/marianne/daemon/learning_hub.py
async def stop(self) -> None:
    """Persist final state and stop."""
    if self._heartbeat_task:
        self._heartbeat_task.cancel()
        try:
            await self._heartbeat_task
        except asyncio.CancelledError:
            pass
        self._heartbeat_task = None
    if self._store:
        # Final persist — the store commits on each _get_connection()
        # exit, so no explicit flush is needed, but we log for
        # observability.
        _logger.info("learning_hub.final_persist")
        self._store.close()
        self._store = None
    _logger.info("learning_hub.stopped")

Functions