Semantic analyzer — LLM-based analysis of sheet completions.
Subscribes to EventBus sheet events, sends completion context to an LLM,
and stores resulting insights as patterns in the global learning store.
The analyzer operates independently of job execution: analysis failures
never affect running jobs. Concurrency is limited by a semaphore to
control API costs.
Attributes
Classes
SemanticAnalyzer
SemanticAnalyzer(config, backend, learning_hub, live_states)
Analyzes sheet completions via LLM to produce learning insights.
Lifecycle
backend = create_backend_from_config(config.backend)
analyzer = SemanticAnalyzer(config, backend, learning_hub, live_states)
await analyzer.start(event_bus)
... events flow, analyses run ...
await analyzer.stop()
Source code in src/marianne/daemon/semantic_analyzer.py
| def __init__(
self,
config: SemanticLearningConfig,
backend: Backend,
learning_hub: LearningHub,
live_states: dict[str, CheckpointState],
) -> None:
self._config = config
self._backend = backend
self._learning_hub = learning_hub
self._live_states = live_states
self._semaphore = asyncio.Semaphore(config.max_concurrent_analyses)
self._sub_id: str | None = None
self._anomaly_sub_id: str | None = None
self._pending_tasks: set[asyncio.Task[None]] = set()
|
Functions
start
async
Subscribe to sheet events and anomaly events on the event bus.
Source code in src/marianne/daemon/semantic_analyzer.py
| async def start(self, event_bus: EventBus) -> None:
"""Subscribe to sheet events and anomaly events on the event bus."""
if not self._config.enabled:
_logger.info("semantic_analyzer.disabled")
return
self._sub_id = event_bus.subscribe(
callback=self._on_sheet_event,
event_filter=lambda e: e.get("event", "") in _COMPLETION_EVENTS,
)
# Subscribe to monitor.anomaly events — store directly as
# RESOURCE_ANOMALY patterns without LLM calls.
self._anomaly_sub_id = event_bus.subscribe(
callback=self._on_anomaly_event,
event_filter=lambda e: e.get("event", "") == _ANOMALY_EVENT,
)
_logger.info(
"semantic_analyzer.started",
backend=self._backend.name,
analyze_on=self._config.analyze_on,
max_concurrent=self._config.max_concurrent_analyses,
)
|
stop
async
Unsubscribe and wait for pending analyses to drain.
Source code in src/marianne/daemon/semantic_analyzer.py
| async def stop(self, event_bus: EventBus | None = None) -> None:
"""Unsubscribe and wait for pending analyses to drain."""
if self._sub_id is not None and event_bus is not None:
event_bus.unsubscribe(self._sub_id)
self._sub_id = None
if self._anomaly_sub_id is not None and event_bus is not None:
event_bus.unsubscribe(self._anomaly_sub_id)
self._anomaly_sub_id = None
# Wait for pending analysis tasks to complete
if self._pending_tasks:
_logger.info(
"semantic_analyzer.draining",
pending=len(self._pending_tasks),
)
done, _ = await asyncio.wait(
self._pending_tasks,
timeout=self._config.backend.timeout_seconds,
)
# Cancel any tasks that didn't finish in time
for task in self._pending_tasks - done:
task.cancel()
self._pending_tasks.clear()
# Close the backend
try:
await self._backend.close()
except (OSError, RuntimeError):
pass
_logger.info("semantic_analyzer.stopped")
|
Functions