Skip to content

webhook

webhook

Generic webhook notification implementation using httpx.

Provides configurable HTTP webhook notifications for Marianne job events. Supports custom headers, retries, and flexible JSON payloads.

Phase 5 of Marianne implementation: Missing README features.

Classes

WebhookNotifier

WebhookNotifier(url=None, url_env=None, headers=None, events=None, timeout=30.0, max_retries=2, retry_delay=1.0, include_metadata=True)

Generic HTTP webhook notification implementation.

Posts JSON notifications to configurable HTTP endpoints. Supports custom headers (for auth tokens), retries, and timeouts.

Example usage

notifier = WebhookNotifier( url="https://example.com/webhooks/marianne", headers={"Authorization": "Bearer token123"}, events={NotificationEvent.JOB_COMPLETE, NotificationEvent.JOB_FAILED}, ) await notifier.send(context)

Configuration from YAML

notifications: - type: webhook on_events: [job_complete, job_failed] config: url: https://example.com/webhook url_env: MZT_WEBHOOK_URL # Alternative to url headers: Authorization: "Bearer ${WEBHOOK_TOKEN}" X-Custom-Header: "value" timeout: 30 max_retries: 2 retry_delay: 1.0

Initialize the webhook notifier.

Parameters:

Name Type Description Default
url str | None

Direct webhook URL.

None
url_env str | None

Environment variable containing webhook URL.

None
headers dict[str, str] | None

HTTP headers to include in requests.

None
events set[NotificationEvent] | None

Set of events to subscribe to. Defaults to job-level events.

None
timeout float

HTTP request timeout in seconds.

30.0
max_retries int

Maximum retry attempts on failure (0 = no retries).

2
retry_delay float

Delay between retries in seconds.

1.0
include_metadata bool

Include Marianne metadata in payload (version, source).

True
Source code in src/marianne/notifications/webhook.py
def __init__(
    self,
    url: str | None = None,
    url_env: str | None = None,
    headers: dict[str, str] | None = None,
    events: set[NotificationEvent] | None = None,
    timeout: float = 30.0,
    max_retries: int = 2,
    retry_delay: float = 1.0,
    include_metadata: bool = True,
) -> None:
    """Initialize the webhook notifier.

    Args:
        url: Direct webhook URL.
        url_env: Environment variable containing webhook URL.
        headers: HTTP headers to include in requests.
        events: Set of events to subscribe to. Defaults to job-level events.
        timeout: HTTP request timeout in seconds.
        max_retries: Maximum retry attempts on failure (0 = no retries).
        retry_delay: Delay between retries in seconds.
        include_metadata: Include Marianne metadata in payload (version, source).
    """
    # Get URL from param or environment
    self._url = url
    if not self._url and url_env:
        self._url = os.environ.get(url_env, "")

    # Process headers - expand environment variables
    self._headers = self._expand_env_headers(headers or {})
    self._timeout = timeout
    self._max_retries = max_retries
    self._retry_delay = retry_delay
    self._include_metadata = include_metadata

    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
    }

    self._client: httpx.AsyncClient | None = None
    self._warned_no_url = False
Attributes
subscribed_events property
subscribed_events

Events this notifier is registered to receive.

Returns:

Type Description
set[NotificationEvent]

Set of subscribed NotificationEvent types.

Functions
from_config classmethod
from_config(on_events, config=None)

Create WebhookNotifier from YAML configuration.

Parameters:

Name Type Description Default
on_events list[str]

List of event name strings from config.

required
config dict[str, Any] | None

Optional dict with webhook-specific settings: - url: Direct webhook URL - url_env: Env var for webhook URL - headers: Dict of HTTP headers - timeout: Request timeout in seconds - max_retries: Retry attempts on failure - retry_delay: Delay between retries - include_metadata: Include Marianne metadata

None

Returns:

Type Description
WebhookNotifier

Configured WebhookNotifier instance.

Example

notifier = WebhookNotifier.from_config( on_events=["job_complete", "job_failed"], config={ "url": "https://example.com/webhook", "headers": {"X-API-Key": "secret"}, }, )

Source code in src/marianne/notifications/webhook.py
@classmethod
def from_config(
    cls,
    on_events: list[str],
    config: dict[str, Any] | None = None,
) -> "WebhookNotifier":
    """Create WebhookNotifier from YAML configuration.

    Args:
        on_events: List of event name strings from config.
        config: Optional dict with webhook-specific settings:
            - url: Direct webhook URL
            - url_env: Env var for webhook URL
            - headers: Dict of HTTP headers
            - timeout: Request timeout in seconds
            - max_retries: Retry attempts on failure
            - retry_delay: Delay between retries
            - include_metadata: Include Marianne metadata

    Returns:
        Configured WebhookNotifier instance.

    Example:
        notifier = WebhookNotifier.from_config(
            on_events=["job_complete", "job_failed"],
            config={
                "url": "https://example.com/webhook",
                "headers": {"X-API-Key": "secret"},
            },
        )
    """
    config = config or {}

    # Convert string event names to NotificationEvent enums
    events: set[NotificationEvent] = set()
    for event_name in on_events:
        try:
            normalized = event_name.upper()
            events.add(NotificationEvent[normalized])
        except KeyError:
            _logger.warning("unknown_notification_event", event_name=event_name)

    return cls(
        url=config.get("url"),
        url_env=config.get("url_env"),
        headers=config.get("headers"),
        events=events if events else None,
        timeout=config.get("timeout", 30.0),
        max_retries=config.get("max_retries", 2),
        retry_delay=config.get("retry_delay", 1.0),
        include_metadata=config.get("include_metadata", True),
    )
send async
send(context)

Send a webhook notification.

Posts JSON payload to the configured URL. Implements retry logic for transient failures.

Parameters:

Name Type Description Default
context NotificationContext

Notification context with event details.

required

Returns:

Type Description
bool

True if notification was sent, False if unavailable or failed.

Source code in src/marianne/notifications/webhook.py
async def send(self, context: NotificationContext) -> bool:
    """Send a webhook notification.

    Posts JSON payload to the configured URL.
    Implements retry logic for transient failures.

    Args:
        context: Notification context with event details.

    Returns:
        True if notification was sent, False if unavailable or failed.
    """
    if not self._url:
        if not self._warned_no_url:
            _logger.warning(
                "Webhook URL not configured. "
                "Set url or url_env in webhook notification config."
            )
            self._warned_no_url = True
        return False

    if context.event not in self._events:
        # Not subscribed to this event
        return True

    try:
        client = await self._get_client()
        payload = self._build_payload(context)

        success, error = await self._send_with_retry(client, payload)

        if success:
            _logger.debug("webhook_notification_sent", title=context.format_title())
        else:
            _logger.warning("webhook_notification_failed", error=error)

        return success

    except Exception as e:
        _logger.warning(
            "webhook_notification_unexpected_error",
            error=str(e),
            exc_info=True,
        )
        return False
close async
close()

Clean up HTTP client resources.

Called when the NotificationManager is shutting down.

Source code in src/marianne/notifications/webhook.py
async def close(self) -> None:
    """Clean up HTTP client resources.

    Called when the NotificationManager is shutting down.
    """
    if self._client is not None and not self._client.is_closed:
        await self._client.aclose()
        self._client = None

MockWebhookNotifier

MockWebhookNotifier(events=None)

Mock webhook notifier for testing.

Records all notifications sent without making HTTP calls. Useful for testing notification integration.

Initialize mock notifier.

Parameters:

Name Type Description Default
events set[NotificationEvent] | None

Set of events to subscribe to.

None
Source code in src/marianne/notifications/webhook.py
def __init__(
    self,
    events: set[NotificationEvent] | None = None,
) -> None:
    """Initialize mock notifier.

    Args:
        events: Set of events to subscribe to.
    """
    self._events = events or {
        NotificationEvent.JOB_COMPLETE,
        NotificationEvent.JOB_FAILED,
    }
    self.sent_notifications: list[NotificationContext] = []
    self.sent_payloads: list[dict[str, Any]] = []
    self._fail_next = False
    self._simulated_status_code = 200
Attributes
subscribed_events property
subscribed_events

Events this notifier handles.

Functions
set_fail_next
set_fail_next(should_fail=True)

Configure the next send() call to fail.

Parameters:

Name Type Description Default
should_fail bool

If True, next send() returns False.

True
Source code in src/marianne/notifications/webhook.py
def set_fail_next(self, should_fail: bool = True) -> None:
    """Configure the next send() call to fail.

    Args:
        should_fail: If True, next send() returns False.
    """
    self._fail_next = should_fail
simulate_status_code
simulate_status_code(code)

Simulate a specific HTTP status code response.

Parameters:

Name Type Description Default
code int

HTTP status code to simulate.

required
Source code in src/marianne/notifications/webhook.py
def simulate_status_code(self, code: int) -> None:
    """Simulate a specific HTTP status code response.

    Args:
        code: HTTP status code to simulate.
    """
    self._simulated_status_code = code
send async
send(context)

Record notification without making HTTP call.

Parameters:

Name Type Description Default
context NotificationContext

Notification context.

required

Returns:

Type Description
bool

True unless set_fail_next was called or simulated error status.

Source code in src/marianne/notifications/webhook.py
async def send(self, context: NotificationContext) -> bool:
    """Record notification without making HTTP call.

    Args:
        context: Notification context.

    Returns:
        True unless set_fail_next was called or simulated error status.
    """
    if self._fail_next:
        self._fail_next = False
        return False

    if self._simulated_status_code >= 400:
        self._simulated_status_code = 200  # Reset after checking
        return False

    self.sent_notifications.append(context)
    # Build payload like real notifier would
    notifier = WebhookNotifier(url="https://mock")
    self.sent_payloads.append(notifier._build_payload(context))
    return True
close async
close()

Clear recorded notifications.

Source code in src/marianne/notifications/webhook.py
async def close(self) -> None:
    """Clear recorded notifications."""
    self.sent_notifications.clear()
    self.sent_payloads.clear()
get_notification_count
get_notification_count()

Get number of recorded notifications.

Source code in src/marianne/notifications/webhook.py
def get_notification_count(self) -> int:
    """Get number of recorded notifications."""
    return len(self.sent_notifications)
get_notifications_for_event
get_notifications_for_event(event)

Get all notifications for a specific event type.

Parameters:

Name Type Description Default
event NotificationEvent

Event type to filter by.

required

Returns:

Type Description
list[NotificationContext]

List of matching notification contexts.

Source code in src/marianne/notifications/webhook.py
def get_notifications_for_event(
    self, event: NotificationEvent
) -> list[NotificationContext]:
    """Get all notifications for a specific event type.

    Args:
        event: Event type to filter by.

    Returns:
        List of matching notification contexts.
    """
    return [n for n in self.sent_notifications if n.event == event]

Functions