aggregator
aggregator
¶
Pattern aggregation for merging outcomes into global patterns.
This module implements the pattern aggregation strategy designed in Movement III: - Immediate aggregation on job completion (CV 0.83) - Conflict resolution for merging patterns - Integration with PatternDetector and GlobalLearningStore
The aggregator runs after each job completion to detect new patterns and merge them with existing patterns in the global store.
Classes¶
AggregationResult
¶
PatternAggregator
¶
Aggregates patterns from executions into the global store.
Implements the aggregation strategy defined in Movement III: 1. Record all sheet outcomes to executions table 2. Run PatternDetector.detect_all() on new outcomes 3. Merge detected patterns with existing patterns in global store 4. Update priority_score for all affected patterns 5. Record pattern_applications for any patterns that were applied 6. Update error_recoveries for any error learning
Conflict resolution strategy: - Same pattern: merge by increasing counts and updating timestamps - Different suggested_actions: keep action with higher effectiveness
Initialize the pattern aggregator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
global_store
|
GlobalLearningStore
|
The global learning store for persistence. |
required |
weighter
|
PatternWeighter | None
|
Pattern weighter for priority calculation. |
None
|
Source code in src/marianne/learning/aggregator.py
Functions¶
aggregate_outcomes
¶
Aggregate a batch of outcomes into the global store.
This is the main entry point called after job completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outcomes
|
list[SheetOutcome]
|
List of sheet outcomes from the completed job. |
required |
workspace_path
|
Path
|
Path to the workspace for hashing. |
required |
model
|
str | None
|
Optional model name used for execution. |
None
|
instrument_name
|
str | None
|
Optional instrument/backend type for pattern scoping. |
None
|
Returns:
| Type | Description |
|---|---|
AggregationResult
|
AggregationResult with statistics about the aggregation. |
Source code in src/marianne/learning/aggregator.py
merge_with_conflict_resolution
¶
Merge a new pattern with an existing one using conflict resolution.
Resolution strategy from design document: - occurrence_count: sum - effectiveness_score: weighted average by occurrence_count - last_seen: max - last_confirmed: max - suggested_action: keep action with higher effectiveness
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
existing
|
PatternRecord
|
The existing pattern record. |
required |
new
|
DetectedPattern
|
The newly detected pattern. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dict of updated field values. |
Source code in src/marianne/learning/aggregator.py
prune_deprecated_patterns
¶
Remove patterns that are below the effectiveness threshold.
Patterns are deprecated (not deleted) if: - They have enough application data (>= 3) - Their effectiveness is below 0.3
Returns:
| Type | Description |
|---|---|
int
|
Number of patterns deprecated. |
Source code in src/marianne/learning/aggregator.py
EnhancedAggregationResult
¶
Bases: AggregationResult
Extended aggregation result including output pattern extraction.
Adds fields for tracking patterns extracted from stdout/stderr output in addition to the standard validation-based patterns.
Source code in src/marianne/learning/aggregator.py
EnhancedPatternAggregator
¶
Bases: PatternAggregator
Extended aggregator that integrates OutputPatternExtractor.
Combines validation-based pattern detection with stdout/stderr output analysis to provide comprehensive learning data collection.
This aggregator: 1. Runs standard pattern detection (validation, retry, completion patterns) 2. Extracts patterns from stdout_tail/stderr_tail in outcomes 3. Creates DetectedPatterns from output patterns for global store 4. Merges all patterns into unified learning store
Initialize the enhanced pattern aggregator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
global_store
|
GlobalLearningStore
|
The global learning store for persistence. |
required |
weighter
|
PatternWeighter | None
|
Pattern weighter for priority calculation. |
None
|
Source code in src/marianne/learning/aggregator.py
Functions¶
aggregate_with_all_sources
¶
Aggregate outcomes using all pattern sources.
Extends standard aggregation by also extracting patterns from stdout/stderr output in outcomes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outcomes
|
list[SheetOutcome]
|
List of sheet outcomes from the completed job. |
required |
workspace_path
|
Path
|
Path to the workspace for hashing. |
required |
model
|
str | None
|
Optional model name used for execution. |
None
|
Returns:
| Type | Description |
|---|---|
EnhancedAggregationResult
|
EnhancedAggregationResult with output pattern statistics. |
Source code in src/marianne/learning/aggregator.py
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 | |
Functions¶
aggregate_job_outcomes
¶
Convenience function to aggregate outcomes after job completion.
This is the main entry point for the aggregation system.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outcomes
|
list[SheetOutcome]
|
List of sheet outcomes from the completed job. |
required |
workspace_path
|
Path
|
Path to the workspace for hashing. |
required |
global_store
|
GlobalLearningStore | None
|
Optional global store (uses default if None). |
None
|
model
|
str | None
|
Optional model name used for execution. |
None
|
Returns:
| Type | Description |
|---|---|
AggregationResult
|
AggregationResult with statistics. |
Source code in src/marianne/learning/aggregator.py
aggregate_job_outcomes_enhanced
¶
Enhanced aggregation including output pattern extraction.
Uses EnhancedPatternAggregator to also extract patterns from stdout/stderr output in outcomes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outcomes
|
list[SheetOutcome]
|
List of sheet outcomes from the completed job. |
required |
workspace_path
|
Path
|
Path to the workspace for hashing. |
required |
global_store
|
GlobalLearningStore | None
|
Optional global store (uses default if None). |
None
|
model
|
str | None
|
Optional model name used for execution. |
None
|
Returns:
| Type | Description |
|---|---|
EnhancedAggregationResult
|
EnhancedAggregationResult with output pattern statistics. |