Skip to content

migration

migration

Migration support for workspace-local outcomes to global store.

This module implements the migration strategy from Movement III design: - Import existing .marianne-outcomes.json on first use or explicit command - Scan common workspace locations - Run pattern detection on imported data - Preserve workspace-local files (non-destructive)

Migration Flow: 1. On GlobalLearningStore initialization, check if empty 2. Scan common workspace locations for .marianne-outcomes.json 3. For each found, import outcomes to executions table 4. Run pattern detection on imported data 5. Log migration summary to user

Attributes

Classes

MigrationResult dataclass

MigrationResult(errors=list(), skipped_workspaces=list(), imported_workspaces=list(), workspaces_found=0, outcomes_imported=0, patterns_detected=0)

Result of a migration operation.

Attributes:

Name Type Description
workspaces_found int

Number of workspaces with outcomes.

outcomes_imported int

Total outcomes imported.

patterns_detected int

Patterns detected from imported outcomes.

errors list[str]

Any errors encountered during migration.

skipped_workspaces list[str]

Workspaces skipped (already imported, etc.).

imported_workspaces list[str]

Workspaces successfully imported.

OutcomeMigrator

OutcomeMigrator(global_store, aggregator=None)

Migrates workspace-local outcomes to the global store.

This migrator scans for existing .marianne-outcomes.json files and imports their contents into the global SQLite database, enabling cross-workspace learning from historical data.

Migration is: - Non-destructive: Original files are preserved - Idempotent: Already-imported outcomes are skipped - Pattern-aware: Runs pattern detection after import

Usage

migrator = OutcomeMigrator(global_store) result = migrator.migrate_all() print(f"Imported {result.outcomes_imported} outcomes")

Initialize the outcome migrator.

Parameters:

Name Type Description Default
global_store GlobalLearningStore

Global learning store to import into.

required
aggregator PatternAggregator | None

Optional pattern aggregator for pattern detection.

None
Source code in src/marianne/learning/migration.py
def __init__(
    self,
    global_store: "GlobalLearningStore",
    aggregator: "PatternAggregator | None" = None,
) -> None:
    """Initialize the outcome migrator.

    Args:
        global_store: Global learning store to import into.
        aggregator: Optional pattern aggregator for pattern detection.
    """
    self._store = global_store
    self._aggregator = aggregator

    # Track imported workspaces to avoid duplicates
    self._imported_workspace_hashes: set[str] = set()
Functions
migrate_all
migrate_all(scan_patterns=None, additional_paths=None)

Migrate all discoverable workspace-local outcomes.

Scans standard locations plus any additional paths for .marianne-outcomes.json files and imports them.

Parameters:

Name Type Description Default
scan_patterns list[str] | None

Glob patterns to scan (defaults to standard locations).

None
additional_paths list[Path] | None

Additional specific paths to scan.

None

Returns:

Type Description
MigrationResult

MigrationResult with import statistics.

Source code in src/marianne/learning/migration.py
def migrate_all(
    self,
    scan_patterns: list[str] | None = None,
    additional_paths: list[Path] | None = None,
) -> MigrationResult:
    """Migrate all discoverable workspace-local outcomes.

    Scans standard locations plus any additional paths for
    .marianne-outcomes.json files and imports them.

    Args:
        scan_patterns: Glob patterns to scan (defaults to standard locations).
        additional_paths: Additional specific paths to scan.

    Returns:
        MigrationResult with import statistics.
    """
    result = MigrationResult()
    patterns = scan_patterns or DEFAULT_SCAN_PATTERNS

    # Collect all outcome files to migrate
    outcome_files: list[Path] = []

    # Expand glob patterns
    for pattern in patterns:
        expanded = Path(pattern).expanduser()
        if "*" in str(expanded):
            # It's a glob pattern - find the first non-glob parent
            # e.g., "./*-workspace/.marianne-outcomes.json" -> glob from "."
            pattern_str = str(expanded)
            parts = pattern_str.split("/")
            base_parts: list[str] = []
            glob_parts: list[str] = []
            in_glob = False
            for part in parts:
                if "*" in part or in_glob:
                    in_glob = True
                    glob_parts.append(part)
                else:
                    base_parts.append(part)

            base_path = Path("/".join(base_parts)) if base_parts else Path(".")
            glob_pattern = "/".join(glob_parts)

            if base_path.exists():
                outcome_files.extend(base_path.glob(glob_pattern))
        elif expanded.exists() and expanded.is_file():
            outcome_files.append(expanded)

    # Add additional paths
    if additional_paths:
        for path in additional_paths:
            if path.exists() and path.is_file():
                outcome_files.append(path)

    # Deduplicate by resolved path
    unique_files = list({f.resolve() for f in outcome_files})
    result.workspaces_found = len(unique_files)

    _logger.info("migration_files_found", count=len(unique_files))

    # Import each file
    for outcome_file in unique_files:
        try:
            imported = self._migrate_file(outcome_file)
            if imported > 0:
                result.outcomes_imported += imported
                result.imported_workspaces.append(str(outcome_file.parent))
            else:
                result.skipped_workspaces.append(str(outcome_file.parent))
        except Exception as e:
            error_msg = f"Error migrating {outcome_file}: {e}"
            _logger.warning(error_msg)
            result.errors.append(error_msg)

    # Run pattern detection on imported data if aggregator available
    if self._aggregator and result.outcomes_imported > 0:
        try:
            detected_count: int = self._detect_patterns_from_store()
            result.patterns_detected = detected_count
        except Exception as e:
            _logger.warning("pattern_detection_error", error=str(e), exc_info=True)
            result.errors.append(f"Pattern detection error: {e}")

    _logger.info(
        "migration_complete",
        outcomes=result.outcomes_imported,
        patterns=result.patterns_detected,
    )

    return result
migrate_workspace
migrate_workspace(workspace_path)

Migrate a single workspace's outcomes.

Parameters:

Name Type Description Default
workspace_path Path

Path to the workspace directory.

required

Returns:

Type Description
MigrationResult

MigrationResult for this workspace.

Source code in src/marianne/learning/migration.py
def migrate_workspace(self, workspace_path: Path) -> MigrationResult:
    """Migrate a single workspace's outcomes.

    Args:
        workspace_path: Path to the workspace directory.

    Returns:
        MigrationResult for this workspace.
    """
    result = MigrationResult()

    # Look for outcome file in workspace
    outcome_file = workspace_path / ".marianne-outcomes.json"
    if not outcome_file.exists():
        result.errors.append(f"No outcomes file found in {workspace_path}")
        return result

    result.workspaces_found = 1

    try:
        imported = self._migrate_file(outcome_file)
        result.outcomes_imported = imported
        if imported > 0:
            result.imported_workspaces.append(str(workspace_path))
        else:
            result.skipped_workspaces.append(str(workspace_path))
    except Exception as e:
        _logger.warning(
            "workspace_migration_error",
            workspace=str(workspace_path),
            error=str(e),
            exc_info=True,
        )
        result.errors.append(f"Error: {e}")

    return result

Functions

migrate_existing_outcomes

migrate_existing_outcomes(global_store, scan_patterns=None, additional_paths=None)

Convenience function to migrate all existing outcomes.

Parameters:

Name Type Description Default
global_store GlobalLearningStore

Global learning store to import into.

required
scan_patterns list[str] | None

Optional custom scan patterns.

None
additional_paths list[Path] | None

Optional additional paths to scan.

None

Returns:

Type Description
MigrationResult

MigrationResult with import statistics.

Source code in src/marianne/learning/migration.py
def migrate_existing_outcomes(
    global_store: "GlobalLearningStore",
    scan_patterns: list[str] | None = None,
    additional_paths: list[Path] | None = None,
) -> MigrationResult:
    """Convenience function to migrate all existing outcomes.

    Args:
        global_store: Global learning store to import into.
        scan_patterns: Optional custom scan patterns.
        additional_paths: Optional additional paths to scan.

    Returns:
        MigrationResult with import statistics.
    """
    migrator = OutcomeMigrator(global_store)
    return migrator.migrate_all(
        scan_patterns=scan_patterns,
        additional_paths=additional_paths,
    )

check_migration_status

check_migration_status(global_store)

Check the current migration status.

Parameters:

Name Type Description Default
global_store GlobalLearningStore

Global learning store to check.

required

Returns:

Type Description
dict[str, Any]

Dictionary with migration status information.

Source code in src/marianne/learning/migration.py
def check_migration_status(global_store: "GlobalLearningStore") -> dict[str, Any]:
    """Check the current migration status.

    Args:
        global_store: Global learning store to check.

    Returns:
        Dictionary with migration status information.
    """
    stats = global_store.get_execution_stats()

    return {
        "total_executions": stats.get("total_executions", 0),
        "total_patterns": stats.get("total_patterns", 0),
        "unique_workspaces": stats.get("unique_workspaces", 0),
        "needs_migration": stats.get("total_executions", 0) == 0,
        "avg_pattern_effectiveness": stats.get("avg_pattern_effectiveness", 0.0),
    }