Skip to content

observer

observer

Job observer — filesystem and process tree monitoring co-task.

Watches each running job's workspace for file changes (via watchfiles) and monitors the process tree (via psutil) to produce ObserverEvents independently of the runner's self-reports. Events are published to the EventBus for downstream consumers (SSE dashboard, learning hub).

Classes

JobObserver

JobObserver(job_id, workspace, pid, event_bus, *, watch_interval=2.0)

Async co-task that monitors a job's workspace and process tree.

Lifecycle

observer = JobObserver(job_id, workspace, pid, event_bus) await observer.start()

... runs until job completes ...

await observer.stop()

Produces events
  • observer.file_created / observer.file_modified / observer.file_deleted
  • observer.process_spawned / observer.process_exited
Source code in src/marianne/daemon/observer.py
def __init__(
    self,
    job_id: str,
    workspace: Path,
    pid: int,
    event_bus: EventBus,
    *,
    watch_interval: float = 2.0,
) -> None:
    self._job_id = job_id
    self._workspace = workspace
    self._pid = pid
    self._event_bus = event_bus
    self._watch_interval = watch_interval
    self._stop_event = asyncio.Event()
    self._fs_task: asyncio.Task[None] | None = None
    self._proc_task: asyncio.Task[None] | None = None
    self._running = False
    self._start_lock = asyncio.Lock()
Attributes
running property
running

Whether the observer is currently running.

Functions
start async
start()

Start filesystem and process monitoring tasks.

Source code in src/marianne/daemon/observer.py
async def start(self) -> None:
    """Start filesystem and process monitoring tasks."""
    async with self._start_lock:
        if self._running:
            return
        self._running = True
        self._stop_event.clear()

    self._fs_task = asyncio.create_task(
        self._watch_filesystem(),
        name=f"observer-fs-{self._job_id}",
    )
    self._proc_task = asyncio.create_task(
        self._watch_processes(),
        name=f"observer-proc-{self._job_id}",
    )
    _logger.info(
        "observer.started",
        job_id=self._job_id,
        workspace=str(self._workspace),
        pid=self._pid,
    )
stop async
stop()

Stop both monitoring tasks gracefully.

Source code in src/marianne/daemon/observer.py
async def stop(self) -> None:
    """Stop both monitoring tasks gracefully."""
    if not self._running:
        return
    self._running = False
    self._stop_event.set()

    tasks = [t for t in (self._fs_task, self._proc_task) if t is not None]
    for task in tasks:
        task.cancel()

    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)

    self._fs_task = None
    self._proc_task = None
    _logger.info("observer.stopped", job_id=self._job_id)

Functions