observer_recorder
observer_recorder
¶
Observer event recorder — persists per-job observer events to JSONL.
Subscribes to observer.* events on the EventBus, applies path
exclusion and coalescing, then writes to per-job JSONL files inside
each job's workspace. Also maintains an in-memory ring buffer for
real-time TUI consumption via IPC.
Classes¶
ObserverRecorder
¶
EventBus subscriber that persists observer events to per-job JSONL files.
Thread-safety: This class is NOT thread-safe. All methods must be called
from the same asyncio event loop. The coalesce buffer and file handles
are safe from concurrent access because asyncio is cooperative — the
EventBus drain loop and explicit flush() calls cannot interleave within
synchronous code sections. Do NOT add await statements inside
_write_event() or _flush_state() without re-evaluating
concurrency safety.
Lifecycle follows the SemanticAnalyzer pattern::
recorder = ObserverRecorder(config=config)
await recorder.start(event_bus)
# ... events flow ...
await recorder.stop(event_bus)
Source code in src/marianne/daemon/observer_recorder.py
Functions¶
start
async
¶
Subscribe to observer.* events and start the coalesce flush timer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_bus
|
EventBus
|
The EventBus instance to subscribe to. |
required |
Source code in src/marianne/daemon/observer_recorder.py
stop
async
¶
Unsubscribe, cancel flush timer, flush all, close all file handles.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_bus
|
EventBus | None
|
The EventBus instance to unsubscribe from.
Falls back to the bus stored from |
None
|
Source code in src/marianne/daemon/observer_recorder.py
register_job
¶
Start recording events for a job.
Source code in src/marianne/daemon/observer_recorder.py
unregister_job
¶
Stop recording events for a job, flush and close file.
Source code in src/marianne/daemon/observer_recorder.py
flush
¶
Flush coalesce buffer and file handle for a job.
get_recent_events
¶
Return recent events from the in-memory ring buffer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str | None
|
Specific job ID, or |
required |
limit
|
int
|
Maximum number of events to return. |
50
|
Returns:
| Type | Description |
|---|---|
list[ObserverEvent]
|
List of events, newest first. |