Skip to content

semantic_analyzer

semantic_analyzer

Semantic analyzer — LLM-based analysis of sheet completions.

Subscribes to EventBus sheet events, sends completion context to an LLM, and stores resulting insights as patterns in the global learning store.

The analyzer operates independently of job execution: analysis failures never affect running jobs. Concurrency is limited by a semaphore to control API costs.

Attributes

Classes

SemanticAnalyzer

SemanticAnalyzer(config, backend, learning_hub, live_states)

Analyzes sheet completions via LLM to produce learning insights.

Lifecycle

backend = create_backend_from_config(config.backend) analyzer = SemanticAnalyzer(config, backend, learning_hub, live_states) await analyzer.start(event_bus)

... events flow, analyses run ...

await analyzer.stop()

Source code in src/marianne/daemon/semantic_analyzer.py
def __init__(
    self,
    config: SemanticLearningConfig,
    backend: Backend,
    learning_hub: LearningHub,
    live_states: dict[str, CheckpointState],
) -> None:
    self._config = config
    self._backend = backend
    self._learning_hub = learning_hub
    self._live_states = live_states
    self._semaphore = asyncio.Semaphore(config.max_concurrent_analyses)
    self._sub_id: str | None = None
    self._anomaly_sub_id: str | None = None
    self._pending_tasks: set[asyncio.Task[None]] = set()
Functions
start async
start(event_bus)

Subscribe to sheet events and anomaly events on the event bus.

Source code in src/marianne/daemon/semantic_analyzer.py
async def start(self, event_bus: EventBus) -> None:
    """Subscribe to sheet events and anomaly events on the event bus."""
    if not self._config.enabled:
        _logger.info("semantic_analyzer.disabled")
        return

    self._sub_id = event_bus.subscribe(
        callback=self._on_sheet_event,
        event_filter=lambda e: e.get("event", "") in _COMPLETION_EVENTS,
    )

    # Subscribe to monitor.anomaly events — store directly as
    # RESOURCE_ANOMALY patterns without LLM calls.
    self._anomaly_sub_id = event_bus.subscribe(
        callback=self._on_anomaly_event,
        event_filter=lambda e: e.get("event", "") == _ANOMALY_EVENT,
    )

    _logger.info(
        "semantic_analyzer.started",
        backend=self._backend.name,
        analyze_on=self._config.analyze_on,
        max_concurrent=self._config.max_concurrent_analyses,
    )
stop async
stop(event_bus=None)

Unsubscribe and wait for pending analyses to drain.

Source code in src/marianne/daemon/semantic_analyzer.py
async def stop(self, event_bus: EventBus | None = None) -> None:
    """Unsubscribe and wait for pending analyses to drain."""
    if self._sub_id is not None and event_bus is not None:
        event_bus.unsubscribe(self._sub_id)
        self._sub_id = None

    if self._anomaly_sub_id is not None and event_bus is not None:
        event_bus.unsubscribe(self._anomaly_sub_id)
        self._anomaly_sub_id = None

    # Wait for pending analysis tasks to complete
    if self._pending_tasks:
        _logger.info(
            "semantic_analyzer.draining",
            pending=len(self._pending_tasks),
        )
        done, _ = await asyncio.wait(
            self._pending_tasks,
            timeout=self._config.backend.timeout_seconds,
        )
        # Cancel any tasks that didn't finish in time
        for task in self._pending_tasks - done:
            task.cancel()

    self._pending_tasks.clear()

    # Close the backend
    try:
        await self._backend.close()
    except (OSError, RuntimeError):
        pass

    _logger.info("semantic_analyzer.stopped")

Functions