Skip to content

observer_recorder

observer_recorder

Observer event recorder — persists per-job observer events to JSONL.

Subscribes to observer.* events on the EventBus, applies path exclusion and coalescing, then writes to per-job JSONL files inside each job's workspace. Also maintains an in-memory ring buffer for real-time TUI consumption via IPC.

Classes

ObserverRecorder

ObserverRecorder(config)

EventBus subscriber that persists observer events to per-job JSONL files.

Thread-safety: This class is NOT thread-safe. All methods must be called from the same asyncio event loop. The coalesce buffer and file handles are safe from concurrent access because asyncio is cooperative — the EventBus drain loop and explicit flush() calls cannot interleave within synchronous code sections. Do NOT add await statements inside _write_event() or _flush_state() without re-evaluating concurrency safety.

Lifecycle follows the SemanticAnalyzer pattern::

recorder = ObserverRecorder(config=config)
await recorder.start(event_bus)
# ... events flow ...
await recorder.stop(event_bus)
Source code in src/marianne/daemon/observer_recorder.py
def __init__(self, config: ObserverConfig) -> None:
    self._config = config
    self._event_bus: EventBus | None = None
    self._sub_id: str | None = None
    self._flush_task: asyncio.Task[None] | None = None
    self._jobs: dict[str, _JobRecorderState] = {}
Functions
start async
start(event_bus)

Subscribe to observer.* events and start the coalesce flush timer.

Parameters:

Name Type Description Default
event_bus EventBus

The EventBus instance to subscribe to.

required
Source code in src/marianne/daemon/observer_recorder.py
async def start(self, event_bus: EventBus) -> None:
    """Subscribe to observer.* events and start the coalesce flush timer.

    Args:
        event_bus: The EventBus instance to subscribe to.
    """
    if not self._config.enabled:
        _logger.info("observer_recorder.disabled")
        return

    self._event_bus = event_bus
    self._sub_id = event_bus.subscribe(
        callback=self._handle_event,
        event_filter=lambda e: e.get("event", "").startswith("observer."),
    )

    # Start periodic coalesce flush if window > 0
    if self._config.coalesce_window_seconds > 0:
        self._flush_task = asyncio.create_task(
            self._periodic_flush(), name="observer-recorder-flush",
        )

    _logger.info(
        "observer_recorder.started",
        persist_events=self._config.persist_events,
        coalesce_window=self._config.coalesce_window_seconds,
    )
stop async
stop(event_bus=None)

Unsubscribe, cancel flush timer, flush all, close all file handles.

Parameters:

Name Type Description Default
event_bus EventBus | None

The EventBus instance to unsubscribe from. Falls back to the bus stored from start().

None
Source code in src/marianne/daemon/observer_recorder.py
async def stop(self, event_bus: EventBus | None = None) -> None:
    """Unsubscribe, cancel flush timer, flush all, close all file handles.

    Args:
        event_bus: The EventBus instance to unsubscribe from.
            Falls back to the bus stored from ``start()``.
    """
    bus = event_bus or self._event_bus

    # Unsubscribe from event bus
    if self._sub_id is not None and bus is not None:
        bus.unsubscribe(self._sub_id)
        self._sub_id = None

    # Cancel periodic flush task
    if self._flush_task is not None:
        self._flush_task.cancel()
        try:
            await self._flush_task
        except asyncio.CancelledError:
            pass
        self._flush_task = None

    # REVIEW FIX 6: Close all remaining job states to prevent file handle leaks
    for job_id in list(self._jobs.keys()):
        self.unregister_job(job_id)

    self._event_bus = None
    _logger.info("observer_recorder.stopped")
register_job
register_job(job_id, workspace)

Start recording events for a job.

Source code in src/marianne/daemon/observer_recorder.py
def register_job(self, job_id: str, workspace: Path) -> None:
    """Start recording events for a job."""
    if job_id in self._jobs:
        return
    state = _JobRecorderState(job_id, workspace)
    if self._config.persist_events:
        jsonl_path = workspace / ".marianne-observer.jsonl"
        try:
            state.file_handle = open(jsonl_path, "a+", encoding="utf-8")  # noqa: SIM115
        except OSError:
            _logger.warning(
                "observer_recorder.open_failed",
                job_id=job_id,
                path=str(jsonl_path),
                exc_info=True,
            )
    self._jobs[job_id] = state
    _logger.info("observer_recorder.registered", job_id=job_id)
unregister_job
unregister_job(job_id)

Stop recording events for a job, flush and close file.

Source code in src/marianne/daemon/observer_recorder.py
def unregister_job(self, job_id: str) -> None:
    """Stop recording events for a job, flush and close file."""
    state = self._jobs.pop(job_id, None)
    if state is None:
        return
    self._flush_state(state)
    self._close_state(state)
    _logger.info("observer_recorder.unregistered", job_id=job_id)
flush
flush(job_id)

Flush coalesce buffer and file handle for a job.

Source code in src/marianne/daemon/observer_recorder.py
def flush(self, job_id: str) -> None:
    """Flush coalesce buffer and file handle for a job."""
    state = self._jobs.get(job_id)
    if state is None:
        return
    self._flush_state(state)
get_recent_events
get_recent_events(job_id, *, limit=50)

Return recent events from the in-memory ring buffer.

Parameters:

Name Type Description Default
job_id str | None

Specific job ID, or None to aggregate all jobs.

required
limit int

Maximum number of events to return.

50

Returns:

Type Description
list[ObserverEvent]

List of events, newest first.

Source code in src/marianne/daemon/observer_recorder.py
def get_recent_events(
    self, job_id: str | None, *, limit: int = 50,
) -> list[ObserverEvent]:
    """Return recent events from the in-memory ring buffer.

    Args:
        job_id: Specific job ID, or ``None`` to aggregate all jobs.
        limit: Maximum number of events to return.

    Returns:
        List of events, newest first.
    """
    if job_id is not None:
        state = self._jobs.get(job_id)
        if state is None:
            return []
        # Slice from tail (newest) and reverse for newest-first ordering
        events = list(state.recent_events)
        return list(reversed(events[-limit:]))

    # Aggregate across all jobs, sorted by timestamp descending
    all_events: list[ObserverEvent] = []
    for state in self._jobs.values():
        all_events.extend(state.recent_events)
    all_events.sort(key=lambda e: e.get("timestamp", 0), reverse=True)
    return all_events[:limit]

Functions