base
base
¶
Notification framework base types and protocols.
Provides the core notification infrastructure for Marianne: - NotificationEvent enum for event types - Notifier protocol for notification backends - NotificationManager for coordinating multiple notifiers
Phase 5 of Marianne implementation: Missing README features.
Classes¶
NotificationEvent
¶
Bases: Enum
Events that can trigger notifications.
These events align with the lifecycle of Marianne job execution and are referenced in NotificationConfig.on_events.
NotificationContext
dataclass
¶
NotificationContext(event, job_id, job_name, timestamp=utc_now(), sheet_num=None, total_sheets=None, success_count=None, failure_count=None, error_message=None, duration_seconds=None, extra=dict())
Context provided to notifiers when sending notifications.
Contains all relevant information about the event that triggered the notification, enabling rich notification messages.
Attributes¶
timestamp
class-attribute
instance-attribute
¶
timestamp = field(default_factory=utc_now)
When the event occurred.
sheet_num
class-attribute
instance-attribute
¶
Sheet number (for sheet-level events).
total_sheets
class-attribute
instance-attribute
¶
Total number of sheets in the job.
success_count
class-attribute
instance-attribute
¶
Number of successful validations/sheets.
failure_count
class-attribute
instance-attribute
¶
Number of failed validations/sheets.
error_message
class-attribute
instance-attribute
¶
Error message (for failure events).
duration_seconds
class-attribute
instance-attribute
¶
Duration of the operation in seconds.
extra
class-attribute
instance-attribute
¶
Additional context-specific data.
Functions¶
format_title
¶
Generate a notification title based on the event.
Returns:
| Type | Description |
|---|---|
str
|
A concise title string suitable for notification headers. |
Source code in src/marianne/notifications/base.py
format_message
¶
Generate a notification message body based on context.
Returns:
| Type | Description |
|---|---|
str
|
A descriptive message string with relevant details. |
Source code in src/marianne/notifications/base.py
Notifier
¶
Bases: Protocol
Protocol for notification backends.
Implementations handle sending notifications through specific channels (desktop, Slack, webhook, etc.). Each notifier: - Registers for specific event types - Receives NotificationContext when events occur - Handles delivery asynchronously
Following Marianne's Protocol pattern (like OutcomeStore, EscalationHandler).
Attributes¶
subscribed_events
property
¶
Events this notifier is registered to receive.
Returns:
| Type | Description |
|---|---|
set[NotificationEvent]
|
Set of NotificationEvent types this notifier handles. |
Functions¶
send
async
¶
Send a notification for the given context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Full notification context with event details. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if notification was sent successfully, False otherwise. |
bool
|
Failures should be logged but not raise exceptions. |
Source code in src/marianne/notifications/base.py
close
async
¶
Clean up any resources held by the notifier.
Called when the NotificationManager is shutting down. Implementations should release connections, close files, etc.
NotificationManager
¶
Coordinates multiple notifiers for Marianne job events.
Central hub for notification delivery: - Maintains list of active notifiers - Routes events to appropriate notifiers based on subscriptions - Handles failures gracefully (log but don't interrupt execution)
Example usage
manager = NotificationManager([ DesktopNotifier(events={NotificationEvent.JOB_COMPLETE}), SlackNotifier(webhook_url=..., events={NotificationEvent.JOB_FAILED}), ])
await manager.notify(NotificationContext( event=NotificationEvent.JOB_COMPLETE, job_id="123", job_name="my-job", ))
Initialize the notification manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
notifiers
|
list[Notifier] | None
|
List of Notifier implementations to use. If None, starts with an empty list. |
None
|
Source code in src/marianne/notifications/base.py
Attributes¶
Functions¶
add_notifier
¶
Add a notifier to the manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
notifier
|
Notifier
|
Notifier implementation to add. |
required |
remove_notifier
¶
Remove a notifier from the manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
notifier
|
Notifier
|
Notifier to remove. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If notifier is not registered. |
Source code in src/marianne/notifications/base.py
notify
async
¶
Send notification to all subscribed notifiers.
Iterates through registered notifiers and sends to those that are subscribed to the event type. Failures are logged but don't interrupt other notifications.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
NotificationContext
|
Notification context with event details. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict mapping notifier class name to success status. |
dict[str, bool]
|
Only includes notifiers that were subscribed to this event. |
Source code in src/marianne/notifications/base.py
notify_job_start
async
¶
Convenience method for job start notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
total_sheets
|
int
|
Total number of sheets to process. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_job_complete
async
¶
Convenience method for job completion notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
success_count
|
int
|
Number of successful sheets. |
required |
failure_count
|
int
|
Number of failed sheets. |
required |
duration_seconds
|
float
|
Total job duration. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_job_failed
async
¶
Convenience method for job failure notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
error_message
|
str
|
Error that caused the failure. |
required |
sheet_num
|
int | None
|
Sheet number where failure occurred (optional). |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_sheet_complete
async
¶
Convenience method for sheet completion notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
sheet_num
|
int
|
Completed sheet number. |
required |
total_sheets
|
int
|
Total number of sheets. |
required |
success_count
|
int
|
Validations passed. |
required |
failure_count
|
int
|
Validations failed. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
notify_rate_limit
async
¶
Convenience method for rate limit notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Unique job identifier. |
required |
job_name
|
str
|
Human-readable job name. |
required |
sheet_num
|
int
|
Sheet that hit rate limit. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dict of notifier results. |
Source code in src/marianne/notifications/base.py
close
async
¶
Close all registered notifiers.
Should be called when the job completes or the manager is no longer needed. Ignores individual notifier errors.