Periodic correlation analysis — resource usage vs. job outcomes.
Runs on a configurable interval (default 30 min), queries completed jobs
from MonitorStorage, cross-references with outcomes from the learning
store, and generates RESOURCE_CORRELATION patterns for statistical
relationships with confidence > 0.6.
No LLM calls — pure statistical analysis. Minimum sample size of 5 jobs
before any correlations are generated.
Classes
CorrelationAnalyzer
CorrelationAnalyzer(storage, learning_hub, config=None)
Periodic statistical analysis of resource usage vs. job outcomes.
Cross-references profiler snapshots (peak memory, CPU, syscall
distributions, anomalies) with job success/failure outcomes from
the learning store to identify predictive patterns.
Lifecycle::
analyzer = CorrelationAnalyzer(storage, learning_hub, config)
await analyzer.start(event_bus)
# ... periodic analysis runs automatically ...
await analyzer.stop()
Source code in src/marianne/daemon/profiler/correlation.py
| def __init__(
self,
storage: MonitorStorage,
learning_hub: LearningHub,
config: CorrelationConfig | None = None,
) -> None:
self._storage = storage
self._learning_hub = learning_hub
self._config = config or CorrelationConfig()
self._running = False
self._loop_task: asyncio.Task[None] | None = None
|
Functions
start
async
Start the periodic analysis loop.
The event_bus parameter is accepted for interface consistency
with other daemon components but is not currently used by the
correlation analyzer (it reads from storage, not events).
Source code in src/marianne/daemon/profiler/correlation.py
| async def start(self, event_bus: EventBus) -> None:
"""Start the periodic analysis loop.
The event_bus parameter is accepted for interface consistency
with other daemon components but is not currently used by the
correlation analyzer (it reads from storage, not events).
"""
if self._running:
return
self._running = True
self._loop_task = asyncio.create_task(
self._analysis_loop(), name="correlation-analysis-loop"
)
_logger.info(
"correlation_analyzer.started",
interval_minutes=self._config.interval_minutes,
min_sample_size=self._config.min_sample_size,
)
|
stop
async
Stop the periodic analysis loop.
Source code in src/marianne/daemon/profiler/correlation.py
| async def stop(self) -> None:
"""Stop the periodic analysis loop."""
self._running = False
if self._loop_task is not None:
self._loop_task.cancel()
try:
await self._loop_task
except asyncio.CancelledError:
pass
self._loop_task = None
_logger.info("correlation_analyzer.stopped")
|
analyze
async
Run correlation analysis on completed jobs.
Steps:
1. Query completed jobs from storage (last 7 days)
2. For each job: get peak memory, total CPU, syscall distribution
3. Cross-reference with job outcomes from learning store
4. Statistical analysis:
- Memory vs failure rate (binned histogram)
- Syscall hotspots vs failure rate
- Anomaly presence vs failure rate
- Execution duration vs failure rate
5. Generate RESOURCE_CORRELATION patterns for confidence > 0.6
6. Store in LearningHub
Returns:
| Type |
Description |
list[dict[str, Any]]
|
List of generated correlation dicts (for testing/logging).
|
Source code in src/marianne/daemon/profiler/correlation.py
| async def analyze(self) -> list[dict[str, Any]]:
"""Run correlation analysis on completed jobs.
Steps:
1. Query completed jobs from storage (last 7 days)
2. For each job: get peak memory, total CPU, syscall distribution
3. Cross-reference with job outcomes from learning store
4. Statistical analysis:
- Memory vs failure rate (binned histogram)
- Syscall hotspots vs failure rate
- Anomaly presence vs failure rate
- Execution duration vs failure rate
5. Generate RESOURCE_CORRELATION patterns for confidence > 0.6
6. Store in LearningHub
Returns:
List of generated correlation dicts (for testing/logging).
"""
if not self._learning_hub.is_running:
_logger.debug("correlation_analyzer.learning_hub_not_running")
return []
# 1. Get completed job profiles from storage
since = time.time() - _LOOKBACK_SECONDS
job_profiles = await self._get_job_profiles(since)
if len(job_profiles) < self._config.min_sample_size:
_logger.debug(
"correlation_analyzer.insufficient_samples",
sample_count=len(job_profiles),
min_required=self._config.min_sample_size,
)
return []
# 2. Cross-reference with outcomes from learning store
enriched = self._enrich_with_outcomes(job_profiles)
if not enriched:
_logger.debug("correlation_analyzer.no_enriched_profiles")
return []
# 3. Run statistical analyses
correlations: list[dict[str, Any]] = []
correlations.extend(self._analyze_memory_vs_failure(enriched))
correlations.extend(self._analyze_syscall_vs_failure(enriched))
correlations.extend(self._analyze_duration_vs_failure(enriched))
# 4. Filter by confidence and store
stored = 0
for corr in correlations:
if corr["confidence"] >= _MIN_CONFIDENCE:
self._store_correlation(corr)
stored += 1
if stored > 0:
_logger.info(
"correlation_analyzer.patterns_generated",
total_analyzed=len(enriched),
correlations_found=len(correlations),
patterns_stored=stored,
)
return correlations
|
Functions