Skip to content

factory

factory

Factory for creating notifiers from configuration.

Moved from marianne.cli.helpers to break the CLI dependency in the daemon package. This is a notification-layer concern, not a CLI concern.

Classes

Functions

create_notifiers_from_config

create_notifiers_from_config(notification_configs)

Create Notifier instances from notification configuration.

Parameters:

Name Type Description Default
notification_configs list[NotificationConfig]

List of NotificationConfig from job config.

required

Returns:

Type Description
list[Notifier]

List of configured Notifier instances.

Source code in src/marianne/notifications/factory.py
def create_notifiers_from_config(
    notification_configs: list[NotificationConfig],
) -> list[Notifier]:
    """Create Notifier instances from notification configuration.

    Args:
        notification_configs: List of NotificationConfig from job config.

    Returns:
        List of configured Notifier instances.
    """
    from marianne.notifications.desktop import DesktopNotifier
    from marianne.notifications.slack import SlackNotifier
    from marianne.notifications.webhook import WebhookNotifier

    notifiers: list[Notifier] = []

    for config in notification_configs:
        notifier: Notifier | None = None
        # Cast Literal list to str list for from_config methods
        events: list[str] = list(config.on_events)

        if config.type == "desktop":
            notifier = DesktopNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        elif config.type == "slack":
            notifier = SlackNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        elif config.type == "webhook":
            notifier = WebhookNotifier.from_config(
                on_events=events,
                config=config.config,
            )
        else:
            _logger.warning("unknown_notification_type", type=config.type)
            continue

        if notifier:
            notifiers.append(notifier)

    return notifiers