Skip to content

correlation

correlation

Periodic correlation analysis — resource usage vs. job outcomes.

Runs on a configurable interval (default 30 min), queries completed jobs from MonitorStorage, cross-references with outcomes from the learning store, and generates RESOURCE_CORRELATION patterns for statistical relationships with confidence > 0.6.

No LLM calls — pure statistical analysis. Minimum sample size of 5 jobs before any correlations are generated.

Classes

CorrelationAnalyzer

CorrelationAnalyzer(storage, learning_hub, config=None)

Periodic statistical analysis of resource usage vs. job outcomes.

Cross-references profiler snapshots (peak memory, CPU, syscall distributions, anomalies) with job success/failure outcomes from the learning store to identify predictive patterns.

Lifecycle::

analyzer = CorrelationAnalyzer(storage, learning_hub, config)
await analyzer.start(event_bus)
# ... periodic analysis runs automatically ...
await analyzer.stop()
Source code in src/marianne/daemon/profiler/correlation.py
def __init__(
    self,
    storage: MonitorStorage,
    learning_hub: LearningHub,
    config: CorrelationConfig | None = None,
) -> None:
    self._storage = storage
    self._learning_hub = learning_hub
    self._config = config or CorrelationConfig()
    self._running = False
    self._loop_task: asyncio.Task[None] | None = None
Functions
start async
start(event_bus)

Start the periodic analysis loop.

The event_bus parameter is accepted for interface consistency with other daemon components but is not currently used by the correlation analyzer (it reads from storage, not events).

Source code in src/marianne/daemon/profiler/correlation.py
async def start(self, event_bus: EventBus) -> None:
    """Start the periodic analysis loop.

    The event_bus parameter is accepted for interface consistency
    with other daemon components but is not currently used by the
    correlation analyzer (it reads from storage, not events).
    """
    if self._running:
        return

    self._running = True
    self._loop_task = asyncio.create_task(
        self._analysis_loop(), name="correlation-analysis-loop"
    )
    _logger.info(
        "correlation_analyzer.started",
        interval_minutes=self._config.interval_minutes,
        min_sample_size=self._config.min_sample_size,
    )
stop async
stop()

Stop the periodic analysis loop.

Source code in src/marianne/daemon/profiler/correlation.py
async def stop(self) -> None:
    """Stop the periodic analysis loop."""
    self._running = False

    if self._loop_task is not None:
        self._loop_task.cancel()
        try:
            await self._loop_task
        except asyncio.CancelledError:
            pass
        self._loop_task = None

    _logger.info("correlation_analyzer.stopped")
analyze async
analyze()

Run correlation analysis on completed jobs.

Steps: 1. Query completed jobs from storage (last 7 days) 2. For each job: get peak memory, total CPU, syscall distribution 3. Cross-reference with job outcomes from learning store 4. Statistical analysis: - Memory vs failure rate (binned histogram) - Syscall hotspots vs failure rate - Anomaly presence vs failure rate - Execution duration vs failure rate 5. Generate RESOURCE_CORRELATION patterns for confidence > 0.6 6. Store in LearningHub

Returns:

Type Description
list[dict[str, Any]]

List of generated correlation dicts (for testing/logging).

Source code in src/marianne/daemon/profiler/correlation.py
async def analyze(self) -> list[dict[str, Any]]:
    """Run correlation analysis on completed jobs.

    Steps:
    1. Query completed jobs from storage (last 7 days)
    2. For each job: get peak memory, total CPU, syscall distribution
    3. Cross-reference with job outcomes from learning store
    4. Statistical analysis:
       - Memory vs failure rate (binned histogram)
       - Syscall hotspots vs failure rate
       - Anomaly presence vs failure rate
       - Execution duration vs failure rate
    5. Generate RESOURCE_CORRELATION patterns for confidence > 0.6
    6. Store in LearningHub

    Returns:
        List of generated correlation dicts (for testing/logging).
    """
    if not self._learning_hub.is_running:
        _logger.debug("correlation_analyzer.learning_hub_not_running")
        return []

    # 1. Get completed job profiles from storage
    since = time.time() - _LOOKBACK_SECONDS
    job_profiles = await self._get_job_profiles(since)

    if len(job_profiles) < self._config.min_sample_size:
        _logger.debug(
            "correlation_analyzer.insufficient_samples",
            sample_count=len(job_profiles),
            min_required=self._config.min_sample_size,
        )
        return []

    # 2. Cross-reference with outcomes from learning store
    enriched = self._enrich_with_outcomes(job_profiles)

    if not enriched:
        _logger.debug("correlation_analyzer.no_enriched_profiles")
        return []

    # 3. Run statistical analyses
    correlations: list[dict[str, Any]] = []
    correlations.extend(self._analyze_memory_vs_failure(enriched))
    correlations.extend(self._analyze_syscall_vs_failure(enriched))
    correlations.extend(self._analyze_duration_vs_failure(enriched))

    # 4. Filter by confidence and store
    stored = 0
    for corr in correlations:
        if corr["confidence"] >= _MIN_CONFIDENCE:
            self._store_correlation(corr)
            stored += 1

    if stored > 0:
        _logger.info(
            "correlation_analyzer.patterns_generated",
            total_analyzed=len(enriched),
            correlations_found=len(correlations),
            patterns_stored=stored,
        )

    return correlations

Functions