Index
notifications
¶
Marianne notification framework.
Provides notification infrastructure for Marianne job events: - Multiple notification backends (desktop, Slack, webhook) - Event-based subscription model - Graceful degradation when backends unavailable
Usage
from marianne.notifications import ( NotificationEvent, NotificationContext, NotificationManager, DesktopNotifier, SlackNotifier, WebhookNotifier, )
manager = NotificationManager([ DesktopNotifier(events={NotificationEvent.JOB_COMPLETE}), SlackNotifier(webhook_url="...", events={NotificationEvent.JOB_FAILED}), WebhookNotifier(url="...", events={NotificationEvent.JOB_COMPLETE}), ])
await manager.notify_job_complete( job_id="123", job_name="my-job", success_count=10, failure_count=0, duration_seconds=120.5, )
Classes¶
NotificationContext
dataclass
¶
NotificationContext(event, job_id, job_name, timestamp=utc_now(), sheet_num=None, total_sheets=None, success_count=None, failure_count=None, error_message=None, duration_seconds=None, extra=dict())
Context provided to notifiers when sending notifications.
Contains all relevant information about the event that triggered the notification, enabling rich notification messages.
Attributes¶
timestamp
class-attribute
instance-attribute
¶
timestamp = field(default_factory=utc_now)
When the event occurred.
sheet_num
class-attribute
instance-attribute
¶
Sheet number (for sheet-level events).
total_sheets
class-attribute
instance-attribute
¶
Total number of sheets in the job.
success_count
class-attribute
instance-attribute
¶
Number of successful validations/sheets.
failure_count
class-attribute
instance-attribute
¶
Number of failed validations/sheets.
error_message
class-attribute
instance-attribute
¶
Error message (for failure events).
duration_seconds
class-attribute
instance-attribute
¶
Duration of the operation in seconds.
extra
class-attribute
instance-attribute
¶
Additional context-specific data.
Functions¶
format_title
¶
Generate a notification title based on the event.
Returns:
| Type | Description |
|---|---|
str
|
A concise title string suitable for notification headers. |
Source code in src/marianne/notifications/base.py
format_message
¶
Generate a notification message body based on context.
Returns:
| Type | Description |
|---|---|
str
|
A descriptive message string with relevant details. |
Source code in src/marianne/notifications/base.py
NotificationEvent
¶
Bases: Enum
Events that can trigger notifications.
These events align with the lifecycle of Marianne job execution and are referenced in NotificationConfig.on_events.
NotificationManager
¶
Coordinates multiple notifiers for Marianne job events.
Central hub for notification delivery: - Maintains list of active notifiers - Routes events to appropriate notifiers based on subscriptions - Handles failures gracefully (log but don't interrupt execution)
Example usage
manager = NotificationManager([ DesktopNotifier(events={NotificationEvent.JOB_COMPLETE}), SlackNotifier(webhook_url=..., events={NotificationEvent.JOB_FAILED}), ])
await manager.notify(NotificationContext( event=NotificationEvent.JOB_COMPLETE, job_id="123", job_name="my-job", ))
Initialize the notification manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
notifiers
|
list[Notifier] | None
|
List of Notifier implementations to use. If None, starts with an empty list. |
None
|
Source code in src/marianne/notifications/base.py
Attributes¶
Functions¶
add_notifier
¶
Add a notifier to the manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
notifier
|
Notifier
|
Notifier implementation to add. |
required |
remove_notifier
¶
Remove a notifier from the manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
notifier
|
Notifier
|
Notifier to remove. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If notifier is not registered. |
Source code in src/marianne/notifications/base.py
notify
async
¶
Send notification to all subscribed notifiers.
Iterates through registered notifiers and sends to those that are subscribed to the event type. Failures are logged but don't interrupt other notifications.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context with event details. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict mapping notifier class name to success status. |
dict[str, bool]
|
Only includes notifiers that were subscribed to this event. |
Source code in src/marianne/notifications/base.py
notify_job_start
async
¶
Convenience method for job start notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
total_sheets
|
int
|
Total number of sheets to process. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_job_complete
async
¶
Convenience method for job completion notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
success_count
|
int
|
Number of successful sheets. |
required |
failure_count
|
int
|
Number of failed sheets. |
required |
duration_seconds
|
float
|
Total job duration. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_job_failed
async
¶
Convenience method for job failure notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
error_message
|
str
|
Error that caused the failure. |
required |
sheet_num
|
int | None
|
Sheet number where failure occurred (optional). |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_sheet_complete
async
¶
Convenience method for sheet completion notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
sheet_num
|
int
|
Completed sheet number. |
required |
total_sheets
|
int
|
Total number of sheets. |
required |
success_count
|
int
|
Validations passed. |
required |
failure_count
|
int
|
Validations failed. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_rate_limit
async
¶
Convenience method for rate limit notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
sheet_num
|
int
|
Sheet that hit rate limit. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
close
async
¶
Close all registered notifiers.
Should be called when the job completes or the manager is no longer needed. Ignores individual notifier errors.
Source code in src/marianne/notifications/base.py
Notifier
¶
Bases: Protocol
Protocol for notification backends.
Implementations handle sending notifications through specific channels (desktop, Slack, webhook, etc.). Each notifier: - Registers for specific event types - Receives NotificationContext when events occur - Handles delivery asynchronously
Following Marianne's Protocol pattern (like OutcomeStore, EscalationHandler).
Attributes¶
subscribed_events
property
¶
Events this notifier is registered to receive.
Returns:
| Type | Description |
|---|---|
set[NotificationEvent]
|
Set of NotificationEvent types this notifier handles. |
Functions¶
send
async
¶
Send a notification for the given context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Full notification context with event details. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if notification was sent successfully, False otherwise. |
bool
|
Failures should be logged but not raise exceptions. |
Source code in src/marianne/notifications/base.py
close
async
¶
Clean up any resources held by the notifier.
Called when the NotificationManager is shutting down. Implementations should release connections, close files, etc.
DesktopNotifier
¶
Desktop notification implementation using plyer.
Provides cross-platform desktop notifications on Windows, macOS, and Linux. Gracefully degrades if plyer is not installed - logs warning but doesn't fail.
Example usage
notifier = DesktopNotifier( events={NotificationEvent.JOB_COMPLETE, NotificationEvent.JOB_FAILED}, app_name="Marianne", ) await notifier.send(context)
Configuration from YAML
notifications: - type: desktop on_events: [job_complete, job_failed] config: timeout: 10 app_name: "Marianne AI"
Initialize the desktop notifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events
|
set[NotificationEvent] | None
|
Set of events to subscribe to. Defaults to job-level events. |
None
|
app_name
|
str
|
Application name shown in notifications. |
'Marianne AI Compose'
|
timeout
|
int
|
Notification display timeout in seconds (platform-dependent). |
10
|
Source code in src/marianne/notifications/desktop.py
Attributes¶
subscribed_events
property
¶
Events this notifier is registered to receive.
Returns:
| Type | Description |
|---|---|
set[NotificationEvent]
|
Set of subscribed NotificationEvent types. |
Functions¶
from_config
classmethod
¶
Create DesktopNotifier from YAML configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_events
|
list[str]
|
List of event name strings from config. |
required |
config
|
dict[str, Any] | None
|
Optional dict with 'app_name' and 'timeout'. |
None
|
Returns:
| Type | Description |
|---|---|
DesktopNotifier
|
Configured DesktopNotifier instance. |
Example
notifier = DesktopNotifier.from_config( on_events=["job_complete", "job_failed"], config={"timeout": 5, "app_name": "My App"}, )
Source code in src/marianne/notifications/desktop.py
send
async
¶
Send a desktop notification.
Uses plyer for cross-platform notification support. Fails gracefully if plyer is unavailable or notification fails.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context with event details. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if notification was sent, False if unavailable or failed. |
Source code in src/marianne/notifications/desktop.py
close
async
¶
Clean up resources.
Desktop notifier has no resources to clean up, but implements the protocol method.
MockDesktopNotifier
¶
Mock desktop notifier for testing.
Records all notifications sent without actually displaying them. Useful for testing notification integration.
Initialize mock notifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events
|
set[NotificationEvent] | None
|
Set of events to subscribe to. |
None
|
Source code in src/marianne/notifications/desktop.py
Attributes¶
Functions¶
set_fail_next
¶
Configure the next send() call to fail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
should_fail
|
bool
|
If True, next send() returns False. |
True
|
send
async
¶
Record notification without displaying.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True unless set_fail_next was called. |
Source code in src/marianne/notifications/desktop.py
close
async
¶
get_notification_count
¶
get_notifications_for_event
¶
Get all notifications for a specific event type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
NotificationEvent
|
Event type to filter by. |
required |
Returns:
| Type | Description |
|---|---|
list[NotificationContext]
|
List of matching notification contexts. |
Source code in src/marianne/notifications/desktop.py
MockSlackNotifier
¶
Mock Slack notifier for testing.
Records all notifications sent without making HTTP calls. Useful for testing notification integration.
Initialize mock notifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events
|
set[NotificationEvent] | None
|
Set of events to subscribe to. |
None
|
Source code in src/marianne/notifications/slack.py
Attributes¶
Functions¶
set_fail_next
¶
Configure the next send() call to fail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
should_fail
|
bool
|
If True, next send() returns False. |
True
|
send
async
¶
Record notification without making HTTP call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True unless set_fail_next was called. |
Source code in src/marianne/notifications/slack.py
close
async
¶
get_notification_count
¶
get_notifications_for_event
¶
Get all notifications for a specific event type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
NotificationEvent
|
Event type to filter by. |
required |
Returns:
| Type | Description |
|---|---|
list[NotificationContext]
|
List of matching notification contexts. |
Source code in src/marianne/notifications/slack.py
SlackNotifier
¶
SlackNotifier(webhook_url=None, webhook_url_env='SLACK_WEBHOOK_URL', channel=None, username='Marianne AI Compose', icon_emoji=':musical_score:', events=None, timeout=10.0)
Slack notification implementation using webhooks.
Sends notifications to Slack channels via incoming webhooks. Supports rich formatting with Slack Block Kit attachments.
Example usage
notifier = SlackNotifier( webhook_url="https://hooks.slack.com/services/...", channel="#alerts", events={NotificationEvent.JOB_COMPLETE, NotificationEvent.JOB_FAILED}, ) await notifier.send(context)
Configuration from YAML
notifications: - type: slack on_events: [job_complete, job_failed, sheet_failed] config: webhook_url_env: SLACK_WEBHOOK_URL channel: "#marianne-alerts" username: "Marianne Bot" timeout: 10
Initialize the Slack notifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
webhook_url
|
str | None
|
Direct webhook URL. If not provided, reads from env var. |
None
|
webhook_url_env
|
str
|
Environment variable containing webhook URL. |
'SLACK_WEBHOOK_URL'
|
channel
|
str | None
|
Override channel (optional, uses webhook default if not set). |
None
|
username
|
str
|
Bot username displayed in Slack. |
'Marianne AI Compose'
|
icon_emoji
|
str
|
Emoji for bot avatar. |
':musical_score:'
|
events
|
set[NotificationEvent] | None
|
Set of events to subscribe to. Defaults to job-level events. |
None
|
timeout
|
float
|
HTTP request timeout in seconds. |
10.0
|
Source code in src/marianne/notifications/slack.py
Attributes¶
Functions¶
from_config
classmethod
¶
Create SlackNotifier from YAML configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_events
|
list[str]
|
List of event name strings from config. |
required |
config
|
dict[str, Any] | None
|
Optional dict with Slack-specific settings: - webhook_url: Direct webhook URL - webhook_url_env: Env var for webhook URL (default: SLACK_WEBHOOK_URL) - channel: Override channel - username: Bot username - icon_emoji: Bot avatar emoji - timeout: Request timeout in seconds |
None
|
Returns:
| Type | Description |
|---|---|
SlackNotifier
|
Configured SlackNotifier instance. |
Example
notifier = SlackNotifier.from_config( on_events=["job_complete", "job_failed"], config={ "webhook_url_env": "MY_SLACK_WEBHOOK", "channel": "#alerts", }, )
Source code in src/marianne/notifications/slack.py
send
async
¶
Send a Slack notification.
Posts to the configured Slack webhook with rich formatting. Fails gracefully if webhook is unavailable or request fails.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context with event details. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if notification was sent, False if unavailable or failed. |
Source code in src/marianne/notifications/slack.py
close
async
¶
Clean up HTTP client resources.
Called when the NotificationManager is shutting down.
Source code in src/marianne/notifications/slack.py
MockWebhookNotifier
¶
Mock webhook notifier for testing.
Records all notifications sent without making HTTP calls. Useful for testing notification integration.
Initialize mock notifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events
|
set[NotificationEvent] | None
|
Set of events to subscribe to. |
None
|
Source code in src/marianne/notifications/webhook.py
Attributes¶
Functions¶
set_fail_next
¶
Configure the next send() call to fail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
should_fail
|
bool
|
If True, next send() returns False. |
True
|
simulate_status_code
¶
Simulate a specific HTTP status code response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
code
|
int
|
HTTP status code to simulate. |
required |
send
async
¶
Record notification without making HTTP call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True unless set_fail_next was called or simulated error status. |
Source code in src/marianne/notifications/webhook.py
close
async
¶
get_notification_count
¶
get_notifications_for_event
¶
Get all notifications for a specific event type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
NotificationEvent
|
Event type to filter by. |
required |
Returns:
| Type | Description |
|---|---|
list[NotificationContext]
|
List of matching notification contexts. |
Source code in src/marianne/notifications/webhook.py
WebhookNotifier
¶
WebhookNotifier(url=None, url_env=None, headers=None, events=None, timeout=30.0, max_retries=2, retry_delay=1.0, include_metadata=True)
Generic HTTP webhook notification implementation.
Posts JSON notifications to configurable HTTP endpoints. Supports custom headers (for auth tokens), retries, and timeouts.
Example usage
notifier = WebhookNotifier( url="https://example.com/webhooks/marianne", headers={"Authorization": "Bearer token123"}, events={NotificationEvent.JOB_COMPLETE, NotificationEvent.JOB_FAILED}, ) await notifier.send(context)
Configuration from YAML
notifications: - type: webhook on_events: [job_complete, job_failed] config: url: https://example.com/webhook url_env: MZT_WEBHOOK_URL # Alternative to url headers: Authorization: "Bearer ${WEBHOOK_TOKEN}" X-Custom-Header: "value" timeout: 30 max_retries: 2 retry_delay: 1.0
Initialize the webhook notifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str | None
|
Direct webhook URL. |
None
|
url_env
|
str | None
|
Environment variable containing webhook URL. |
None
|
headers
|
dict[str, str] | None
|
HTTP headers to include in requests. |
None
|
events
|
set[NotificationEvent] | None
|
Set of events to subscribe to. Defaults to job-level events. |
None
|
timeout
|
float
|
HTTP request timeout in seconds. |
30.0
|
max_retries
|
int
|
Maximum retry attempts on failure (0 = no retries). |
2
|
retry_delay
|
float
|
Delay between retries in seconds. |
1.0
|
include_metadata
|
bool
|
Include Marianne metadata in payload (version, source). |
True
|
Source code in src/marianne/notifications/webhook.py
Attributes¶
subscribed_events
property
¶
Events this notifier is registered to receive.
Returns:
| Type | Description |
|---|---|
set[NotificationEvent]
|
Set of subscribed NotificationEvent types. |
Functions¶
from_config
classmethod
¶
Create WebhookNotifier from YAML configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_events
|
list[str]
|
List of event name strings from config. |
required |
config
|
dict[str, Any] | None
|
Optional dict with webhook-specific settings: - url: Direct webhook URL - url_env: Env var for webhook URL - headers: Dict of HTTP headers - timeout: Request timeout in seconds - max_retries: Retry attempts on failure - retry_delay: Delay between retries - include_metadata: Include Marianne metadata |
None
|
Returns:
| Type | Description |
|---|---|
WebhookNotifier
|
Configured WebhookNotifier instance. |
Example
notifier = WebhookNotifier.from_config( on_events=["job_complete", "job_failed"], config={ "url": "https://example.com/webhook", "headers": {"X-API-Key": "secret"}, }, )
Source code in src/marianne/notifications/webhook.py
send
async
¶
Send a webhook notification.
Posts JSON payload to the configured URL. Implements retry logic for transient failures.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context with event details. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if notification was sent, False if unavailable or failed. |
Source code in src/marianne/notifications/webhook.py
close
async
¶
Clean up HTTP client resources.
Called when the NotificationManager is shutting down.
Source code in src/marianne/notifications/webhook.py
Functions¶
is_desktop_notification_available
¶
Check if desktop notifications are available.
Returns:
| Type | Description |
|---|---|
bool
|
True if plyer is installed and can send notifications. |
create_notifiers_from_config
¶
Create Notifier instances from notification configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
notification_configs
|
list[NotificationConfig]
|
List of NotificationConfig from job config. |
required |
Returns:
| Type | Description |
|---|---|
list[Notifier]
|
List of configured Notifier instances. |