Skip to content

Index

notifications

Marianne notification framework.

Provides notification infrastructure for Marianne job events: - Multiple notification backends (desktop, Slack, webhook) - Event-based subscription model - Graceful degradation when backends unavailable

Usage

from marianne.notifications import ( NotificationEvent, NotificationContext, NotificationManager, DesktopNotifier, SlackNotifier, WebhookNotifier, )

manager = NotificationManager([ DesktopNotifier(events={NotificationEvent.JOB_COMPLETE}), SlackNotifier(webhook_url="...", events={NotificationEvent.JOB_FAILED}), WebhookNotifier(url="...", events={NotificationEvent.JOB_COMPLETE}), ])

await manager.notify_job_complete( job_id="123", job_name="my-job", success_count=10, failure_count=0, duration_seconds=120.5, )

Classes

NotificationContext dataclass

NotificationContext(event, job_id, job_name, timestamp=utc_now(), sheet_num=None, total_sheets=None, success_count=None, failure_count=None, error_message=None, duration_seconds=None, extra=dict())

Context provided to notifiers when sending notifications.

Contains all relevant information about the event that triggered the notification, enabling rich notification messages.

Attributes
event instance-attribute
event

The event type that triggered this notification.

job_id instance-attribute
job_id

Unique identifier for the job.

job_name instance-attribute
job_name

Human-readable job name.

timestamp class-attribute instance-attribute
timestamp = field(default_factory=utc_now)

When the event occurred.

sheet_num class-attribute instance-attribute
sheet_num = None

Sheet number (for sheet-level events).

total_sheets class-attribute instance-attribute
total_sheets = None

Total number of sheets in the job.

success_count class-attribute instance-attribute
success_count = None

Number of successful validations/sheets.

failure_count class-attribute instance-attribute
failure_count = None

Number of failed validations/sheets.

error_message class-attribute instance-attribute
error_message = None

Error message (for failure events).

duration_seconds class-attribute instance-attribute
duration_seconds = None

Duration of the operation in seconds.

extra class-attribute instance-attribute
extra = field(default_factory=dict)

Additional context-specific data.

Functions
format_title
format_title()

Generate a notification title based on the event.

Returns:

Type Description
str

A concise title string suitable for notification headers.

Source code in src/marianne/notifications/base.py
def format_title(self) -> str:
    """Generate a notification title based on the event.

    Returns:
        A concise title string suitable for notification headers.
    """
    event_titles = {
        NotificationEvent.JOB_START: f"Marianne: Job '{self.job_name}' Started",
        NotificationEvent.JOB_COMPLETE: f"Marianne: Job '{self.job_name}' Complete ✓",
        NotificationEvent.JOB_FAILED: f"Marianne: Job '{self.job_name}' Failed ✗",
        NotificationEvent.JOB_PAUSED: f"Marianne: Job '{self.job_name}' Paused",
        NotificationEvent.JOB_RESUMED: f"Marianne: Job '{self.job_name}' Resumed",
        NotificationEvent.SHEET_START: f"Marianne: Sheet {self.sheet_num} Started",
        NotificationEvent.SHEET_COMPLETE: f"Marianne: Sheet {self.sheet_num} Complete",
        NotificationEvent.SHEET_FAILED: f"Marianne: Sheet {self.sheet_num} Failed",
        NotificationEvent.RATE_LIMIT_DETECTED: "Marianne: Rate Limit Detected",
    }
    return event_titles.get(self.event, f"Marianne: {self.event.value}")
format_message
format_message()

Generate a notification message body based on context.

Returns:

Type Description
str

A descriptive message string with relevant details.

Source code in src/marianne/notifications/base.py
def format_message(self) -> str:
    """Generate a notification message body based on context.

    Returns:
        A descriptive message string with relevant details.
    """
    parts: list[str] = []

    if self.sheet_num is not None and self.total_sheets is not None:
        parts.append(f"Sheet {self.sheet_num}/{self.total_sheets}")

    if self.success_count is not None or self.failure_count is not None:
        success = self.success_count or 0
        failure = self.failure_count or 0
        parts.append(f"{success} passed, {failure} failed")

    if self.duration_seconds is not None:
        if self.duration_seconds < 60:
            parts.append(f"{self.duration_seconds:.1f}s")
        elif self.duration_seconds < 3600:
            mins = self.duration_seconds / 60
            parts.append(f"{mins:.1f}min")
        else:
            hours = self.duration_seconds / 3600
            parts.append(f"{hours:.1f}h")

    if self.error_message:
        # Truncate long error messages for notifications
        error = self.error_message[:100]
        if len(self.error_message) > 100:
            error += "..."
        parts.append(f"Error: {error}")

    return " | ".join(parts) if parts else self.event.value

NotificationEvent

Bases: Enum

Events that can trigger notifications.

These events align with the lifecycle of Marianne job execution and are referenced in NotificationConfig.on_events.

NotificationManager

NotificationManager(notifiers=None)

Coordinates multiple notifiers for Marianne job events.

Central hub for notification delivery: - Maintains list of active notifiers - Routes events to appropriate notifiers based on subscriptions - Handles failures gracefully (log but don't interrupt execution)

Example usage

manager = NotificationManager([ DesktopNotifier(events={NotificationEvent.JOB_COMPLETE}), SlackNotifier(webhook_url=..., events={NotificationEvent.JOB_FAILED}), ])

await manager.notify(NotificationContext( event=NotificationEvent.JOB_COMPLETE, job_id="123", job_name="my-job", ))

Initialize the notification manager.

Parameters:

Name Type Description Default
notifiers list[Notifier] | None

List of Notifier implementations to use. If None, starts with an empty list.

None
Source code in src/marianne/notifications/base.py
def __init__(self, notifiers: list[Notifier] | None = None) -> None:
    """Initialize the notification manager.

    Args:
        notifiers: List of Notifier implementations to use.
                   If None, starts with an empty list.
    """
    self._notifiers: list[Notifier] = notifiers or []
Attributes
notifier_count property
notifier_count

Number of registered notifiers.

Functions
add_notifier
add_notifier(notifier)

Add a notifier to the manager.

Parameters:

Name Type Description Default
notifier Notifier

Notifier implementation to add.

required
Source code in src/marianne/notifications/base.py
def add_notifier(self, notifier: Notifier) -> None:
    """Add a notifier to the manager.

    Args:
        notifier: Notifier implementation to add.
    """
    self._notifiers.append(notifier)
remove_notifier
remove_notifier(notifier)

Remove a notifier from the manager.

Parameters:

Name Type Description Default
notifier Notifier

Notifier to remove.

required

Raises:

Type Description
ValueError

If notifier is not registered.

Source code in src/marianne/notifications/base.py
def remove_notifier(self, notifier: Notifier) -> None:
    """Remove a notifier from the manager.

    Args:
        notifier: Notifier to remove.

    Raises:
        ValueError: If notifier is not registered.
    """
    self._notifiers.remove(notifier)
notify async
notify(context)

Send notification to all subscribed notifiers.

Iterates through registered notifiers and sends to those that are subscribed to the event type. Failures are logged but don't interrupt other notifications.

Parameters:

Name Type Description Default
context NotificationContext

Notification context with event details.

required

Returns:

Type Description
dict[str, bool]

Dict mapping notifier class name to success status.

dict[str, bool]

Only includes notifiers that were subscribed to this event.

Source code in src/marianne/notifications/base.py
async def notify(self, context: NotificationContext) -> dict[str, bool]:
    """Send notification to all subscribed notifiers.

    Iterates through registered notifiers and sends to those
    that are subscribed to the event type. Failures are logged
    but don't interrupt other notifications.

    Args:
        context: Notification context with event details.

    Returns:
        Dict mapping notifier class name to success status.
        Only includes notifiers that were subscribed to this event.
    """
    results: dict[str, bool] = {}

    for notifier in self._notifiers:
        if context.event in notifier.subscribed_events:
            notifier_name = type(notifier).__name__
            try:
                success = await notifier.send(context)
                results[notifier_name] = success
            except Exception as e:
                # Log but don't raise - notifications shouldn't break execution
                _logger.warning(
                    "notifier_failed",
                    notifier=notifier_name,
                    error=str(e),
                )
                results[notifier_name] = False

    return results
notify_job_start async
notify_job_start(job_id, job_name, total_sheets)

Convenience method for job start notification.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
job_name str

Human-readable job name.

required
total_sheets int

Total number of sheets to process.

required

Returns:

Type Description
dict[str, bool]

Dict of notifier results.

Source code in src/marianne/notifications/base.py
async def notify_job_start(
    self,
    job_id: str,
    job_name: str,
    total_sheets: int,
) -> dict[str, bool]:
    """Convenience method for job start notification.

    Args:
        job_id: Unique job identifier.
        job_name: Human-readable job name.
        total_sheets: Total number of sheets to process.

    Returns:
        Dict of notifier results.
    """
    return await self.notify(
        NotificationContext(
            event=NotificationEvent.JOB_START,
            job_id=job_id,
            job_name=job_name,
            total_sheets=total_sheets,
        )
    )
notify_job_complete async
notify_job_complete(job_id, job_name, success_count, failure_count, duration_seconds)

Convenience method for job completion notification.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
job_name str

Human-readable job name.

required
success_count int

Number of successful sheets.

required
failure_count int

Number of failed sheets.

required
duration_seconds float

Total job duration.

required

Returns:

Type Description
dict[str, bool]

Dict of notifier results.

Source code in src/marianne/notifications/base.py
async def notify_job_complete(
    self,
    job_id: str,
    job_name: str,
    success_count: int,
    failure_count: int,
    duration_seconds: float,
) -> dict[str, bool]:
    """Convenience method for job completion notification.

    Args:
        job_id: Unique job identifier.
        job_name: Human-readable job name.
        success_count: Number of successful sheets.
        failure_count: Number of failed sheets.
        duration_seconds: Total job duration.

    Returns:
        Dict of notifier results.
    """
    return await self.notify(
        NotificationContext(
            event=NotificationEvent.JOB_COMPLETE,
            job_id=job_id,
            job_name=job_name,
            success_count=success_count,
            failure_count=failure_count,
            duration_seconds=duration_seconds,
        )
    )
notify_job_failed async
notify_job_failed(job_id, job_name, error_message, sheet_num=None)

Convenience method for job failure notification.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
job_name str

Human-readable job name.

required
error_message str

Error that caused the failure.

required
sheet_num int | None

Sheet number where failure occurred (optional).

None

Returns:

Type Description
dict[str, bool]

Dict of notifier results.

Source code in src/marianne/notifications/base.py
async def notify_job_failed(
    self,
    job_id: str,
    job_name: str,
    error_message: str,
    sheet_num: int | None = None,
) -> dict[str, bool]:
    """Convenience method for job failure notification.

    Args:
        job_id: Unique job identifier.
        job_name: Human-readable job name.
        error_message: Error that caused the failure.
        sheet_num: Sheet number where failure occurred (optional).

    Returns:
        Dict of notifier results.
    """
    return await self.notify(
        NotificationContext(
            event=NotificationEvent.JOB_FAILED,
            job_id=job_id,
            job_name=job_name,
            error_message=error_message,
            sheet_num=sheet_num,
        )
    )
notify_sheet_complete async
notify_sheet_complete(job_id, job_name, sheet_num, total_sheets, success_count, failure_count)

Convenience method for sheet completion notification.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
job_name str

Human-readable job name.

required
sheet_num int

Completed sheet number.

required
total_sheets int

Total number of sheets.

required
success_count int

Validations passed.

required
failure_count int

Validations failed.

required

Returns:

Type Description
dict[str, bool]

Dict of notifier results.

Source code in src/marianne/notifications/base.py
async def notify_sheet_complete(
    self,
    job_id: str,
    job_name: str,
    sheet_num: int,
    total_sheets: int,
    success_count: int,
    failure_count: int,
) -> dict[str, bool]:
    """Convenience method for sheet completion notification.

    Args:
        job_id: Unique job identifier.
        job_name: Human-readable job name.
        sheet_num: Completed sheet number.
        total_sheets: Total number of sheets.
        success_count: Validations passed.
        failure_count: Validations failed.

    Returns:
        Dict of notifier results.
    """
    return await self.notify(
        NotificationContext(
            event=NotificationEvent.SHEET_COMPLETE,
            job_id=job_id,
            job_name=job_name,
            sheet_num=sheet_num,
            total_sheets=total_sheets,
            success_count=success_count,
            failure_count=failure_count,
        )
    )
notify_rate_limit async
notify_rate_limit(job_id, job_name, sheet_num)

Convenience method for rate limit notification.

Parameters:

Name Type Description Default
job_id str

Unique job identifier.

required
job_name str

Human-readable job name.

required
sheet_num int

Sheet that hit rate limit.

required

Returns:

Type Description
dict[str, bool]

Dict of notifier results.

Source code in src/marianne/notifications/base.py
async def notify_rate_limit(
    self,
    job_id: str,
    job_name: str,
    sheet_num: int,
) -> dict[str, bool]:
    """Convenience method for rate limit notification.

    Args:
        job_id: Unique job identifier.
        job_name: Human-readable job name.
        sheet_num: Sheet that hit rate limit.

    Returns:
        Dict of notifier results.
    """
    return await self.notify(
        NotificationContext(
            event=NotificationEvent.RATE_LIMIT_DETECTED,
            job_id=job_id,
            job_name=job_name,
            sheet_num=sheet_num,
        )
    )
close async
close()

Close all registered notifiers.

Should be called when the job completes or the manager is no longer needed. Ignores individual notifier errors.

Source code in src/marianne/notifications/base.py
async def close(self) -> None:
    """Close all registered notifiers.

    Should be called when the job completes or the manager
    is no longer needed. Ignores individual notifier errors.
    """
    for notifier in self._notifiers:
        try:
            await notifier.close()
        except Exception as e:
            _logger.warning(
                "notifier_close_error",
                notifier=type(notifier).__name__,
                error=str(e),
            )

Notifier

Bases: Protocol

Protocol for notification backends.

Implementations handle sending notifications through specific channels (desktop, Slack, webhook, etc.). Each notifier: - Registers for specific event types - Receives NotificationContext when events occur - Handles delivery asynchronously

Following Marianne's Protocol pattern (like OutcomeStore, EscalationHandler).

Attributes
subscribed_events property
subscribed_events

Events this notifier is registered to receive.

Returns:

Type Description
set[NotificationEvent]

Set of NotificationEvent types this notifier handles.

Functions
send async
send(context)

Send a notification for the given context.

Parameters:

Name Type Description Default
context NotificationContext

Full notification context with event details.

required

Returns:

Type Description
bool

True if notification was sent successfully, False otherwise.

bool

Failures should be logged but not raise exceptions.

Source code in src/marianne/notifications/base.py
async def send(self, context: NotificationContext) -> bool:
    """Send a notification for the given context.

    Args:
        context: Full notification context with event details.

    Returns:
        True if notification was sent successfully, False otherwise.
        Failures should be logged but not raise exceptions.
    """
    ...
close async
close()

Clean up any resources held by the notifier.

Called when the NotificationManager is shutting down. Implementations should release connections, close files, etc.

Source code in src/marianne/notifications/base.py
async def close(self) -> None:
    """Clean up any resources held by the notifier.

    Called when the NotificationManager is shutting down.
    Implementations should release connections, close files, etc.
    """
    ...

DesktopNotifier

DesktopNotifier(events=None, app_name='Marianne AI Compose', timeout=10)

Desktop notification implementation using plyer.

Provides cross-platform desktop notifications on Windows, macOS, and Linux. Gracefully degrades if plyer is not installed - logs warning but doesn't fail.

Example usage

notifier = DesktopNotifier( events={NotificationEvent.JOB_COMPLETE, NotificationEvent.JOB_FAILED}, app_name="Marianne", ) await notifier.send(context)

Configuration from YAML

notifications: - type: desktop on_events: [job_complete, job_failed] config: timeout: 10 app_name: "Marianne AI"

Initialize the desktop notifier.

Parameters:

Name Type Description Default
events set[NotificationEvent] | None

Set of events to subscribe to. Defaults to job-level events.

None
app_name str

Application name shown in notifications.

'Marianne AI Compose'
timeout int

Notification display timeout in seconds (platform-dependent).

10
Source code in src/marianne/notifications/desktop.py
def __init__(
    self,
    events: set[NotificationEvent] | None = None,
    app_name: str = "Marianne AI Compose",
    timeout: int = 10,
) -> None:
    """Initialize the desktop notifier.

    Args:
        events: Set of events to subscribe to. Defaults to job-level events.
        app_name: Application name shown in notifications.
        timeout: Notification display timeout in seconds (platform-dependent).
    """
    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
        NotificationEvent.JOB_PAUSED,
    }
    self._app_name = app_name
    self._timeout = timeout
    self._warned_unavailable = False
Attributes
subscribed_events property
subscribed_events

Events this notifier is registered to receive.

Returns:

Type Description
set[NotificationEvent]

Set of subscribed NotificationEvent types.

Functions
from_config classmethod
from_config(on_events, config=None)

Create DesktopNotifier from YAML configuration.

Parameters:

Name Type Description Default
on_events list[str]

List of event name strings from config.

required
config dict[str, Any] | None

Optional dict with 'app_name' and 'timeout'.

None

Returns:

Type Description
DesktopNotifier

Configured DesktopNotifier instance.

Example

notifier = DesktopNotifier.from_config( on_events=["job_complete", "job_failed"], config={"timeout": 5, "app_name": "My App"}, )

Source code in src/marianne/notifications/desktop.py
@classmethod
def from_config(
    cls,
    on_events: list[str],
    config: dict[str, Any] | None = None,
) -> "DesktopNotifier":
    """Create DesktopNotifier from YAML configuration.

    Args:
        on_events: List of event name strings from config.
        config: Optional dict with 'app_name' and 'timeout'.

    Returns:
        Configured DesktopNotifier instance.

    Example:
        notifier = DesktopNotifier.from_config(
            on_events=["job_complete", "job_failed"],
            config={"timeout": 5, "app_name": "My App"},
        )
    """
    config = config or {}

    # Convert string event names to NotificationEvent enums
    events: set[NotificationEvent] = set()
    for event_name in on_events:
        try:
            # Handle both "job_complete" and "JOB_COMPLETE" formats
            normalized = event_name.upper()
            events.add(NotificationEvent[normalized])
        except KeyError:
            _logger.warning("unknown_notification_event", event_name=event_name)

    return cls(
        events=events if events else None,
        app_name=config.get("app_name", "Marianne AI Compose"),
        timeout=config.get("timeout", 10),
    )
send async
send(context)

Send a desktop notification.

Uses plyer for cross-platform notification support. Fails gracefully if plyer is unavailable or notification fails.

Parameters:

Name Type Description Default
context NotificationContext

Notification context with event details.

required

Returns:

Type Description
bool

True if notification was sent, False if unavailable or failed.

Source code in src/marianne/notifications/desktop.py
async def send(self, context: NotificationContext) -> bool:
    """Send a desktop notification.

    Uses plyer for cross-platform notification support.
    Fails gracefully if plyer is unavailable or notification fails.

    Args:
        context: Notification context with event details.

    Returns:
        True if notification was sent, False if unavailable or failed.
    """
    if not _PLYER_AVAILABLE:
        if not self._warned_unavailable:
            _logger.warning(
                "Desktop notifications unavailable - install plyer: "
                "pip install plyer"
            )
            self._warned_unavailable = True
        return False

    if context.event not in self._events:
        # Not subscribed to this event
        return True

    title = context.format_title()
    message = context.format_message()

    try:
        # plyer.notification.notify is synchronous, but we wrap it
        # in the async interface for consistency
        _notification_module.notify(
            title=title,
            message=message,
            app_name=self._app_name,
            timeout=self._timeout,
        )
        _logger.debug("desktop_notification_sent", title=title)
        return True

    except Exception as e:
        # Various platform-specific errors can occur
        # (missing system notification service, etc.)
        _logger.warning("desktop_notification_failed", error=str(e))
        return False
close async
close()

Clean up resources.

Desktop notifier has no resources to clean up, but implements the protocol method.

Source code in src/marianne/notifications/desktop.py
async def close(self) -> None:
    """Clean up resources.

    Desktop notifier has no resources to clean up,
    but implements the protocol method.
    """
    pass

MockDesktopNotifier

MockDesktopNotifier(events=None)

Mock desktop notifier for testing.

Records all notifications sent without actually displaying them. Useful for testing notification integration.

Initialize mock notifier.

Parameters:

Name Type Description Default
events set[NotificationEvent] | None

Set of events to subscribe to.

None
Source code in src/marianne/notifications/desktop.py
def __init__(
    self,
    events: set[NotificationEvent] | None = None,
) -> None:
    """Initialize mock notifier.

    Args:
        events: Set of events to subscribe to.
    """
    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
    }
    self.sent_notifications: list[NotificationContext] = []
    self._fail_next = False
Attributes
subscribed_events property
subscribed_events

Events this notifier handles.

Functions
set_fail_next
set_fail_next(should_fail=True)

Configure the next send() call to fail.

Parameters:

Name Type Description Default
should_fail bool

If True, next send() returns False.

True
Source code in src/marianne/notifications/desktop.py
def set_fail_next(self, should_fail: bool = True) -> None:
    """Configure the next send() call to fail.

    Args:
        should_fail: If True, next send() returns False.
    """
    self._fail_next = should_fail
send async
send(context)

Record notification without displaying.

Parameters:

Name Type Description Default
context NotificationContext

Notification context.

required

Returns:

Type Description
bool

True unless set_fail_next was called.

Source code in src/marianne/notifications/desktop.py
async def send(self, context: NotificationContext) -> bool:
    """Record notification without displaying.

    Args:
        context: Notification context.

    Returns:
        True unless set_fail_next was called.
    """
    if self._fail_next:
        self._fail_next = False
        return False

    self.sent_notifications.append(context)
    return True
close async
close()

Clear recorded notifications.

Source code in src/marianne/notifications/desktop.py
async def close(self) -> None:
    """Clear recorded notifications."""
    self.sent_notifications.clear()
get_notification_count
get_notification_count()

Get number of recorded notifications.

Source code in src/marianne/notifications/desktop.py
def get_notification_count(self) -> int:
    """Get number of recorded notifications."""
    return len(self.sent_notifications)
get_notifications_for_event
get_notifications_for_event(event)

Get all notifications for a specific event type.

Parameters:

Name Type Description Default
event NotificationEvent

Event type to filter by.

required

Returns:

Type Description
list[NotificationContext]

List of matching notification contexts.

Source code in src/marianne/notifications/desktop.py
def get_notifications_for_event(
    self, event: NotificationEvent
) -> list[NotificationContext]:
    """Get all notifications for a specific event type.

    Args:
        event: Event type to filter by.

    Returns:
        List of matching notification contexts.
    """
    return [n for n in self.sent_notifications if n.event == event]

MockSlackNotifier

MockSlackNotifier(events=None)

Mock Slack notifier for testing.

Records all notifications sent without making HTTP calls. Useful for testing notification integration.

Initialize mock notifier.

Parameters:

Name Type Description Default
events set[NotificationEvent] | None

Set of events to subscribe to.

None
Source code in src/marianne/notifications/slack.py
def __init__(
    self,
    events: set[NotificationEvent] | None = None,
) -> None:
    """Initialize mock notifier.

    Args:
        events: Set of events to subscribe to.
    """
    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
        NotificationEvent.SHEET_FAILED,
    }
    self.sent_notifications: list[NotificationContext] = []
    self.sent_payloads: list[dict[str, Any]] = []
    self._fail_next = False
Attributes
subscribed_events property
subscribed_events

Events this notifier handles.

Functions
set_fail_next
set_fail_next(should_fail=True)

Configure the next send() call to fail.

Parameters:

Name Type Description Default
should_fail bool

If True, next send() returns False.

True
Source code in src/marianne/notifications/slack.py
def set_fail_next(self, should_fail: bool = True) -> None:
    """Configure the next send() call to fail.

    Args:
        should_fail: If True, next send() returns False.
    """
    self._fail_next = should_fail
send async
send(context)

Record notification without making HTTP call.

Parameters:

Name Type Description Default
context NotificationContext

Notification context.

required

Returns:

Type Description
bool

True unless set_fail_next was called.

Source code in src/marianne/notifications/slack.py
async def send(self, context: NotificationContext) -> bool:
    """Record notification without making HTTP call.

    Args:
        context: Notification context.

    Returns:
        True unless set_fail_next was called.
    """
    if self._fail_next:
        self._fail_next = False
        return False

    self.sent_notifications.append(context)
    # Build payload like real notifier would
    notifier = SlackNotifier(webhook_url="https://mock")
    self.sent_payloads.append(notifier._build_payload(context))
    return True
close async
close()

Clear recorded notifications.

Source code in src/marianne/notifications/slack.py
async def close(self) -> None:
    """Clear recorded notifications."""
    self.sent_notifications.clear()
    self.sent_payloads.clear()
get_notification_count
get_notification_count()

Get number of recorded notifications.

Source code in src/marianne/notifications/slack.py
def get_notification_count(self) -> int:
    """Get number of recorded notifications."""
    return len(self.sent_notifications)
get_notifications_for_event
get_notifications_for_event(event)

Get all notifications for a specific event type.

Parameters:

Name Type Description Default
event NotificationEvent

Event type to filter by.

required

Returns:

Type Description
list[NotificationContext]

List of matching notification contexts.

Source code in src/marianne/notifications/slack.py
def get_notifications_for_event(
    self, event: NotificationEvent
) -> list[NotificationContext]:
    """Get all notifications for a specific event type.

    Args:
        event: Event type to filter by.

    Returns:
        List of matching notification contexts.
    """
    return [n for n in self.sent_notifications if n.event == event]

SlackNotifier

SlackNotifier(webhook_url=None, webhook_url_env='SLACK_WEBHOOK_URL', channel=None, username='Marianne AI Compose', icon_emoji=':musical_score:', events=None, timeout=10.0)

Slack notification implementation using webhooks.

Sends notifications to Slack channels via incoming webhooks. Supports rich formatting with Slack Block Kit attachments.

Example usage

notifier = SlackNotifier( webhook_url="https://hooks.slack.com/services/...", channel="#alerts", events={NotificationEvent.JOB_COMPLETE, NotificationEvent.JOB_FAILED}, ) await notifier.send(context)

Configuration from YAML

notifications: - type: slack on_events: [job_complete, job_failed, sheet_failed] config: webhook_url_env: SLACK_WEBHOOK_URL channel: "#marianne-alerts" username: "Marianne Bot" timeout: 10

Initialize the Slack notifier.

Parameters:

Name Type Description Default
webhook_url str | None

Direct webhook URL. If not provided, reads from env var.

None
webhook_url_env str

Environment variable containing webhook URL.

'SLACK_WEBHOOK_URL'
channel str | None

Override channel (optional, uses webhook default if not set).

None
username str

Bot username displayed in Slack.

'Marianne AI Compose'
icon_emoji str

Emoji for bot avatar.

':musical_score:'
events set[NotificationEvent] | None

Set of events to subscribe to. Defaults to job-level events.

None
timeout float

HTTP request timeout in seconds.

10.0
Source code in src/marianne/notifications/slack.py
def __init__(
    self,
    webhook_url: str | None = None,
    webhook_url_env: str = "SLACK_WEBHOOK_URL",
    channel: str | None = None,
    username: str = "Marianne AI Compose",
    icon_emoji: str = ":musical_score:",
    events: set[NotificationEvent] | None = None,
    timeout: float = 10.0,
) -> None:
    """Initialize the Slack notifier.

    Args:
        webhook_url: Direct webhook URL. If not provided, reads from env var.
        webhook_url_env: Environment variable containing webhook URL.
        channel: Override channel (optional, uses webhook default if not set).
        username: Bot username displayed in Slack.
        icon_emoji: Emoji for bot avatar.
        events: Set of events to subscribe to. Defaults to job-level events.
        timeout: HTTP request timeout in seconds.
    """
    # Get webhook URL from param or environment
    self._webhook_url = webhook_url or os.environ.get(webhook_url_env, "")
    self._channel = channel
    self._username = username
    self._icon_emoji = icon_emoji
    self._timeout = timeout

    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
        NotificationEvent.SHEET_FAILED,
    }

    self._client: httpx.AsyncClient | None = None
    self._warned_no_webhook = False
Attributes
subscribed_events property
subscribed_events

Events this notifier is registered to receive.

Functions
from_config classmethod
from_config(on_events, config=None)

Create SlackNotifier from YAML configuration.

Parameters:

Name Type Description Default
on_events list[str]

List of event name strings from config.

required
config dict[str, Any] | None

Optional dict with Slack-specific settings: - webhook_url: Direct webhook URL - webhook_url_env: Env var for webhook URL (default: SLACK_WEBHOOK_URL) - channel: Override channel - username: Bot username - icon_emoji: Bot avatar emoji - timeout: Request timeout in seconds

None

Returns:

Type Description
SlackNotifier

Configured SlackNotifier instance.

Example

notifier = SlackNotifier.from_config( on_events=["job_complete", "job_failed"], config={ "webhook_url_env": "MY_SLACK_WEBHOOK", "channel": "#alerts", }, )

Source code in src/marianne/notifications/slack.py
@classmethod
def from_config(
    cls,
    on_events: list[str],
    config: dict[str, Any] | None = None,
) -> "SlackNotifier":
    """Create SlackNotifier from YAML configuration.

    Args:
        on_events: List of event name strings from config.
        config: Optional dict with Slack-specific settings:
            - webhook_url: Direct webhook URL
            - webhook_url_env: Env var for webhook URL (default: SLACK_WEBHOOK_URL)
            - channel: Override channel
            - username: Bot username
            - icon_emoji: Bot avatar emoji
            - timeout: Request timeout in seconds

    Returns:
        Configured SlackNotifier instance.

    Example:
        notifier = SlackNotifier.from_config(
            on_events=["job_complete", "job_failed"],
            config={
                "webhook_url_env": "MY_SLACK_WEBHOOK",
                "channel": "#alerts",
            },
        )
    """
    config = config or {}

    # Convert string event names to NotificationEvent enums
    events: set[NotificationEvent] = set()
    for event_name in on_events:
        try:
            normalized = event_name.upper()
            events.add(NotificationEvent[normalized])
        except KeyError:
            _logger.warning("unknown_notification_event", event_name=event_name)

    return cls(
        webhook_url=config.get("webhook_url"),
        webhook_url_env=config.get("webhook_url_env", "SLACK_WEBHOOK_URL"),
        channel=config.get("channel"),
        username=config.get("username", "Marianne AI Compose"),
        icon_emoji=config.get("icon_emoji", ":musical_score:"),
        events=events if events else None,
        timeout=config.get("timeout", 10.0),
    )
send async
send(context)

Send a Slack notification.

Posts to the configured Slack webhook with rich formatting. Fails gracefully if webhook is unavailable or request fails.

Parameters:

Name Type Description Default
context NotificationContext

Notification context with event details.

required

Returns:

Type Description
bool

True if notification was sent, False if unavailable or failed.

Source code in src/marianne/notifications/slack.py
async def send(self, context: NotificationContext) -> bool:
    """Send a Slack notification.

    Posts to the configured Slack webhook with rich formatting.
    Fails gracefully if webhook is unavailable or request fails.

    Args:
        context: Notification context with event details.

    Returns:
        True if notification was sent, False if unavailable or failed.
    """
    if not self._webhook_url:
        if not self._warned_no_webhook:
            _logger.warning(
                "Slack webhook URL not configured. "
                "Set webhook_url or SLACK_WEBHOOK_URL environment variable."
            )
            self._warned_no_webhook = True
        return False

    if context.event not in self._events:
        # Not subscribed to this event
        return True

    try:
        client = await self._get_client()
        payload = self._build_payload(context)

        response = await client.post(
            self._webhook_url,
            json=payload,
        )

        if response.status_code == 200:
            _logger.debug("slack_notification_sent", title=context.format_title())
            return True

        _logger.warning(
            "slack_webhook_error",
            status_code=response.status_code,
            body=response.text[:200],
        )
        return False

    except httpx.TimeoutException:
        _logger.warning("Slack notification timed out")
        return False
    except httpx.RequestError as e:
        _logger.warning("slack_notification_failed", error=str(e))
        return False
    except Exception as e:
        _logger.warning("slack_notification_unexpected_error", error=str(e), exc_info=True)
        return False
close async
close()

Clean up HTTP client resources.

Called when the NotificationManager is shutting down.

Source code in src/marianne/notifications/slack.py
async def close(self) -> None:
    """Clean up HTTP client resources.

    Called when the NotificationManager is shutting down.
    """
    if self._client is not None and not self._client.is_closed:
        await self._client.aclose()
        self._client = None

MockWebhookNotifier

MockWebhookNotifier(events=None)

Mock webhook notifier for testing.

Records all notifications sent without making HTTP calls. Useful for testing notification integration.

Initialize mock notifier.

Parameters:

Name Type Description Default
events set[NotificationEvent] | None

Set of events to subscribe to.

None
Source code in src/marianne/notifications/webhook.py
def __init__(
    self,
    events: set[NotificationEvent] | None = None,
) -> None:
    """Initialize mock notifier.

    Args:
        events: Set of events to subscribe to.
    """
    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
    }
    self.sent_notifications: list[NotificationContext] = []
    self.sent_payloads: list[dict[str, Any]] = []
    self._fail_next = False
    self._simulated_status_code = 200
Attributes
subscribed_events property
subscribed_events

Events this notifier handles.

Functions
set_fail_next
set_fail_next(should_fail=True)

Configure the next send() call to fail.

Parameters:

Name Type Description Default
should_fail bool

If True, next send() returns False.

True
Source code in src/marianne/notifications/webhook.py
def set_fail_next(self, should_fail: bool = True) -> None:
    """Configure the next send() call to fail.

    Args:
        should_fail: If True, next send() returns False.
    """
    self._fail_next = should_fail
simulate_status_code
simulate_status_code(code)

Simulate a specific HTTP status code response.

Parameters:

Name Type Description Default
code int

HTTP status code to simulate.

required
Source code in src/marianne/notifications/webhook.py
def simulate_status_code(self, code: int) -> None:
    """Simulate a specific HTTP status code response.

    Args:
        code: HTTP status code to simulate.
    """
    self._simulated_status_code = code
send async
send(context)

Record notification without making HTTP call.

Parameters:

Name Type Description Default
context NotificationContext

Notification context.

required

Returns:

Type Description
bool

True unless set_fail_next was called or simulated error status.

Source code in src/marianne/notifications/webhook.py
async def send(self, context: NotificationContext) -> bool:
    """Record notification without making HTTP call.

    Args:
        context: Notification context.

    Returns:
        True unless set_fail_next was called or simulated error status.
    """
    if self._fail_next:
        self._fail_next = False
        return False

    if self._simulated_status_code >= 400:
        self._simulated_status_code = 200  # Reset after checking
        return False

    self.sent_notifications.append(context)
    # Build payload like real notifier would
    notifier = WebhookNotifier(url="https://mock")
    self.sent_payloads.append(notifier._build_payload(context))
    return True
close async
close()

Clear recorded notifications.

Source code in src/marianne/notifications/webhook.py
async def close(self) -> None:
    """Clear recorded notifications."""
    self.sent_notifications.clear()
    self.sent_payloads.clear()
get_notification_count
get_notification_count()

Get number of recorded notifications.

Source code in src/marianne/notifications/webhook.py
def get_notification_count(self) -> int:
    """Get number of recorded notifications."""
    return len(self.sent_notifications)
get_notifications_for_event
get_notifications_for_event(event)

Get all notifications for a specific event type.

Parameters:

Name Type Description Default
event NotificationEvent

Event type to filter by.

required

Returns:

Type Description
list[NotificationContext]

List of matching notification contexts.

Source code in src/marianne/notifications/webhook.py
def get_notifications_for_event(
    self, event: NotificationEvent
) -> list[NotificationContext]:
    """Get all notifications for a specific event type.

    Args:
        event: Event type to filter by.

    Returns:
        List of matching notification contexts.
    """
    return [n for n in self.sent_notifications if n.event == event]

WebhookNotifier

WebhookNotifier(url=None, url_env=None, headers=None, events=None, timeout=30.0, max_retries=2, retry_delay=1.0, include_metadata=True)

Generic HTTP webhook notification implementation.

Posts JSON notifications to configurable HTTP endpoints. Supports custom headers (for auth tokens), retries, and timeouts.

Example usage

notifier = WebhookNotifier( url="https://example.com/webhooks/marianne", headers={"Authorization": "Bearer token123"}, events={NotificationEvent.JOB_COMPLETE, NotificationEvent.JOB_FAILED}, ) await notifier.send(context)

Configuration from YAML

notifications: - type: webhook on_events: [job_complete, job_failed] config: url: https://example.com/webhook url_env: MZT_WEBHOOK_URL # Alternative to url headers: Authorization: "Bearer ${WEBHOOK_TOKEN}" X-Custom-Header: "value" timeout: 30 max_retries: 2 retry_delay: 1.0

Initialize the webhook notifier.

Parameters:

Name Type Description Default
url str | None

Direct webhook URL.

None
url_env str | None

Environment variable containing webhook URL.

None
headers dict[str, str] | None

HTTP headers to include in requests.

None
events set[NotificationEvent] | None

Set of events to subscribe to. Defaults to job-level events.

None
timeout float

HTTP request timeout in seconds.

30.0
max_retries int

Maximum retry attempts on failure (0 = no retries).

2
retry_delay float

Delay between retries in seconds.

1.0
include_metadata bool

Include Marianne metadata in payload (version, source).

True
Source code in src/marianne/notifications/webhook.py
def __init__(
    self,
    url: str | None = None,
    url_env: str | None = None,
    headers: dict[str, str] | None = None,
    events: set[NotificationEvent] | None = None,
    timeout: float = 30.0,
    max_retries: int = 2,
    retry_delay: float = 1.0,
    include_metadata: bool = True,
) -> None:
    """Initialize the webhook notifier.

    Args:
        url: Direct webhook URL.
        url_env: Environment variable containing webhook URL.
        headers: HTTP headers to include in requests.
        events: Set of events to subscribe to. Defaults to job-level events.
        timeout: HTTP request timeout in seconds.
        max_retries: Maximum retry attempts on failure (0 = no retries).
        retry_delay: Delay between retries in seconds.
        include_metadata: Include Marianne metadata in payload (version, source).
    """
    # Get URL from param or environment
    self._url = url
    if not self._url and url_env:
        self._url = os.environ.get(url_env, "")

    # Process headers - expand environment variables
    self._headers = self._expand_env_headers(headers or {})
    self._timeout = timeout
    self._max_retries = max_retries
    self._retry_delay = retry_delay
    self._include_metadata = include_metadata

    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
    }

    self._client: httpx.AsyncClient | None = None
    self._warned_no_url = False
Attributes
subscribed_events property
subscribed_events

Events this notifier is registered to receive.

Returns:

Type Description
set[NotificationEvent]

Set of subscribed NotificationEvent types.

Functions
from_config classmethod
from_config(on_events, config=None)

Create WebhookNotifier from YAML configuration.

Parameters:

Name Type Description Default
on_events list[str]

List of event name strings from config.

required
config dict[str, Any] | None

Optional dict with webhook-specific settings: - url: Direct webhook URL - url_env: Env var for webhook URL - headers: Dict of HTTP headers - timeout: Request timeout in seconds - max_retries: Retry attempts on failure - retry_delay: Delay between retries - include_metadata: Include Marianne metadata

None

Returns:

Type Description
WebhookNotifier

Configured WebhookNotifier instance.

Example

notifier = WebhookNotifier.from_config( on_events=["job_complete", "job_failed"], config={ "url": "https://example.com/webhook", "headers": {"X-API-Key": "secret"}, }, )

Source code in src/marianne/notifications/webhook.py
@classmethod
def from_config(
    cls,
    on_events: list[str],
    config: dict[str, Any] | None = None,
) -> "WebhookNotifier":
    """Create WebhookNotifier from YAML configuration.

    Args:
        on_events: List of event name strings from config.
        config: Optional dict with webhook-specific settings:
            - url: Direct webhook URL
            - url_env: Env var for webhook URL
            - headers: Dict of HTTP headers
            - timeout: Request timeout in seconds
            - max_retries: Retry attempts on failure
            - retry_delay: Delay between retries
            - include_metadata: Include Marianne metadata

    Returns:
        Configured WebhookNotifier instance.

    Example:
        notifier = WebhookNotifier.from_config(
            on_events=["job_complete", "job_failed"],
            config={
                "url": "https://example.com/webhook",
                "headers": {"X-API-Key": "secret"},
            },
        )
    """
    config = config or {}

    # Convert string event names to NotificationEvent enums
    events: set[NotificationEvent] = set()
    for event_name in on_events:
        try:
            normalized = event_name.upper()
            events.add(NotificationEvent[normalized])
        except KeyError:
            _logger.warning("unknown_notification_event", event_name=event_name)

    return cls(
        url=config.get("url"),
        url_env=config.get("url_env"),
        headers=config.get("headers"),
        events=events if events else None,
        timeout=config.get("timeout", 30.0),
        max_retries=config.get("max_retries", 2),
        retry_delay=config.get("retry_delay", 1.0),
        include_metadata=config.get("include_metadata", True),
    )
send async
send(context)

Send a webhook notification.

Posts JSON payload to the configured URL. Implements retry logic for transient failures.

Parameters:

Name Type Description Default
context NotificationContext

Notification context with event details.

required

Returns:

Type Description
bool

True if notification was sent, False if unavailable or failed.

Source code in src/marianne/notifications/webhook.py
async def send(self, context: NotificationContext) -> bool:
    """Send a webhook notification.

    Posts JSON payload to the configured URL.
    Implements retry logic for transient failures.

    Args:
        context: Notification context with event details.

    Returns:
        True if notification was sent, False if unavailable or failed.
    """
    if not self._url:
        if not self._warned_no_url:
            _logger.warning(
                "Webhook URL not configured. "
                "Set url or url_env in webhook notification config."
            )
            self._warned_no_url = True
        return False

    if context.event not in self._events:
        # Not subscribed to this event
        return True

    try:
        client = await self._get_client()
        payload = self._build_payload(context)

        success, error = await self._send_with_retry(client, payload)

        if success:
            _logger.debug("webhook_notification_sent", title=context.format_title())
        else:
            _logger.warning("webhook_notification_failed", error=error)

        return success

    except Exception as e:
        _logger.warning(
            "webhook_notification_unexpected_error",
            error=str(e),
            exc_info=True,
        )
        return False
close async
close()

Clean up HTTP client resources.

Called when the NotificationManager is shutting down.

Source code in src/marianne/notifications/webhook.py
async def close(self) -> None:
    """Clean up HTTP client resources.

    Called when the NotificationManager is shutting down.
    """
    if self._client is not None and not self._client.is_closed:
        await self._client.aclose()
        self._client = None

Functions

is_desktop_notification_available

is_desktop_notification_available()

Check if desktop notifications are available.

Returns:

Type Description
bool

True if plyer is installed and can send notifications.

Source code in src/marianne/notifications/desktop.py
def is_desktop_notification_available() -> bool:
    """Check if desktop notifications are available.

    Returns:
        True if plyer is installed and can send notifications.
    """
    return _PLYER_AVAILABLE

create_notifiers_from_config

create_notifiers_from_config(notification_configs)

Create Notifier instances from notification configuration.

Parameters:

Name Type Description Default
notification_configs list[NotificationConfig]

List of NotificationConfig from job config.

required

Returns:

Type Description
list[Notifier]

List of configured Notifier instances.

Source code in src/marianne/notifications/factory.py
def create_notifiers_from_config(
    notification_configs: list[NotificationConfig],
) -> list[Notifier]:
    """Create Notifier instances from notification configuration.

    Args:
        notification_configs: List of NotificationConfig from job config.

    Returns:
        List of configured Notifier instances.
    """
    from marianne.notifications.desktop import DesktopNotifier
    from marianne.notifications.slack import SlackNotifier
    from marianne.notifications.webhook import WebhookNotifier

    notifiers: list[Notifier] = []

    for config in notification_configs:
        notifier: Notifier | None = None
        # Cast Literal list to str list for from_config methods
        events: list[str] = list(config.on_events)

        if config.type == "desktop":
            notifier = DesktopNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        elif config.type == "slack":
            notifier = SlackNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        elif config.type == "webhook":
            notifier = WebhookNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        else:
            _logger.warning("unknown_notification_type", type=config.type)
            continue

        if notifier:
            notifiers.append(notifier)

    return notifiers