lakehouse_engine.terminators.notifier

Module with notification terminator.

  1"""Module with notification terminator."""
  2
  3from abc import ABC, abstractmethod
  4
  5from jinja2 import Template
  6
  7from lakehouse_engine.core.definitions import (
  8    NOTIFICATION_RUNTIME_PARAMETERS,
  9    NotificationRuntimeParameters,
 10    TerminatorSpec,
 11)
 12from lakehouse_engine.core.exec_env import ExecEnv
 13from lakehouse_engine.terminators.notifiers.notification_templates import (
 14    NotificationsTemplates,
 15)
 16from lakehouse_engine.utils.databricks_utils import DatabricksUtils
 17from lakehouse_engine.utils.logging_handler import LoggingHandler
 18
 19
 20class Notifier(ABC):
 21    """Abstract Notification class."""
 22
 23    _logger = LoggingHandler(__name__).get_logger()
 24
 25    def __init__(self, notification_spec: TerminatorSpec):
 26        """Construct Notification instances.
 27
 28        Args:
 29            notification_spec: notification specification.
 30        """
 31        self.type = notification_spec.args.get("type")
 32        self.notification = notification_spec.args
 33
 34    @abstractmethod
 35    def create_notification(self) -> None:
 36        """Abstract create notification method."""
 37        raise NotImplementedError
 38
 39    @abstractmethod
 40    def send_notification(self) -> None:
 41        """Abstract send notification method."""
 42        raise NotImplementedError
 43
 44    @staticmethod
 45    def _check_args_are_correct(given_args: dict, template_args: list) -> None:
 46        """Checking if all arguments in template were set.
 47
 48        Args:
 49            given_args: args set in the terminator spec.
 50            template_args: args needed to be set in the template.
 51        """
 52        extra_args = []
 53
 54        for key in given_args.keys():
 55            if key in template_args:
 56                template_args.remove(key)
 57            else:
 58                extra_args.append(key)
 59
 60        if set(template_args) - set(NOTIFICATION_RUNTIME_PARAMETERS):
 61            raise ValueError(
 62                "The following template args have not been set: "
 63                + ", ".join(template_args)
 64            )
 65
 66        if extra_args:
 67            Notifier._logger.info(
 68                "Extra parameters sent to template: " + ", ".join(extra_args)
 69            )
 70
 71    @staticmethod
 72    def _render_notification_field(template_field: str, args: dict) -> str:
 73        """Render the notification given args.
 74
 75        Args:
 76            template_field: Message with templates to be replaced.
 77            args: key/value pairs to be replaced in the message.
 78
 79        Returns:
 80            Rendered field
 81        """
 82        field_template = Template(template_field)
 83        if (
 84            NotificationRuntimeParameters.DATABRICKS_JOB_NAME.value in template_field
 85            or NotificationRuntimeParameters.DATABRICKS_WORKSPACE_ID.value
 86            in template_field
 87        ):
 88            workspace_id, job_name = DatabricksUtils.get_databricks_job_information(
 89                ExecEnv.SESSION
 90            )
 91            args["databricks_job_name"] = job_name
 92            args["databricks_workspace_id"] = workspace_id
 93
 94        return field_template.render(args)
 95
 96    @staticmethod
 97    def check_if_notification_is_failure_notification(
 98        spec: TerminatorSpec,
 99    ) -> bool:
100        """Check if given notification is a failure notification.
101
102        Args:
103            spec: spec to validate if it is a failure notification.
104
105        Returns:
106            A boolean telling if the notification is a failure notification
107        """
108        notification = spec.args
109        is_notification_failure_notification: bool = False
110
111        if "template" in notification.keys():
112            template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get(
113                notification["template"], {}
114            )
115
116            if template:
117                is_notification_failure_notification = notification.get(
118                    "on_failure", True
119                )
120            else:
121                raise ValueError(f"""Template {notification["template"]} not found.""")
122        else:
123            is_notification_failure_notification = notification.get("on_failure", True)
124
125        return is_notification_failure_notification
class Notifier(abc.ABC):
 21class Notifier(ABC):
 22    """Abstract Notification class."""
 23
 24    _logger = LoggingHandler(__name__).get_logger()
 25
 26    def __init__(self, notification_spec: TerminatorSpec):
 27        """Construct Notification instances.
 28
 29        Args:
 30            notification_spec: notification specification.
 31        """
 32        self.type = notification_spec.args.get("type")
 33        self.notification = notification_spec.args
 34
 35    @abstractmethod
 36    def create_notification(self) -> None:
 37        """Abstract create notification method."""
 38        raise NotImplementedError
 39
 40    @abstractmethod
 41    def send_notification(self) -> None:
 42        """Abstract send notification method."""
 43        raise NotImplementedError
 44
 45    @staticmethod
 46    def _check_args_are_correct(given_args: dict, template_args: list) -> None:
 47        """Checking if all arguments in template were set.
 48
 49        Args:
 50            given_args: args set in the terminator spec.
 51            template_args: args needed to be set in the template.
 52        """
 53        extra_args = []
 54
 55        for key in given_args.keys():
 56            if key in template_args:
 57                template_args.remove(key)
 58            else:
 59                extra_args.append(key)
 60
 61        if set(template_args) - set(NOTIFICATION_RUNTIME_PARAMETERS):
 62            raise ValueError(
 63                "The following template args have not been set: "
 64                + ", ".join(template_args)
 65            )
 66
 67        if extra_args:
 68            Notifier._logger.info(
 69                "Extra parameters sent to template: " + ", ".join(extra_args)
 70            )
 71
 72    @staticmethod
 73    def _render_notification_field(template_field: str, args: dict) -> str:
 74        """Render the notification given args.
 75
 76        Args:
 77            template_field: Message with templates to be replaced.
 78            args: key/value pairs to be replaced in the message.
 79
 80        Returns:
 81            Rendered field
 82        """
 83        field_template = Template(template_field)
 84        if (
 85            NotificationRuntimeParameters.DATABRICKS_JOB_NAME.value in template_field
 86            or NotificationRuntimeParameters.DATABRICKS_WORKSPACE_ID.value
 87            in template_field
 88        ):
 89            workspace_id, job_name = DatabricksUtils.get_databricks_job_information(
 90                ExecEnv.SESSION
 91            )
 92            args["databricks_job_name"] = job_name
 93            args["databricks_workspace_id"] = workspace_id
 94
 95        return field_template.render(args)
 96
 97    @staticmethod
 98    def check_if_notification_is_failure_notification(
 99        spec: TerminatorSpec,
100    ) -> bool:
101        """Check if given notification is a failure notification.
102
103        Args:
104            spec: spec to validate if it is a failure notification.
105
106        Returns:
107            A boolean telling if the notification is a failure notification
108        """
109        notification = spec.args
110        is_notification_failure_notification: bool = False
111
112        if "template" in notification.keys():
113            template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get(
114                notification["template"], {}
115            )
116
117            if template:
118                is_notification_failure_notification = notification.get(
119                    "on_failure", True
120                )
121            else:
122                raise ValueError(f"""Template {notification["template"]} not found.""")
123        else:
124            is_notification_failure_notification = notification.get("on_failure", True)
125
126        return is_notification_failure_notification

Abstract Notification class.

Notifier(notification_spec: lakehouse_engine.core.definitions.TerminatorSpec)
26    def __init__(self, notification_spec: TerminatorSpec):
27        """Construct Notification instances.
28
29        Args:
30            notification_spec: notification specification.
31        """
32        self.type = notification_spec.args.get("type")
33        self.notification = notification_spec.args

Construct Notification instances.

Arguments:
  • notification_spec: notification specification.
type
notification
@abstractmethod
def create_notification(self) -> None:
35    @abstractmethod
36    def create_notification(self) -> None:
37        """Abstract create notification method."""
38        raise NotImplementedError

Abstract create notification method.

@abstractmethod
def send_notification(self) -> None:
40    @abstractmethod
41    def send_notification(self) -> None:
42        """Abstract send notification method."""
43        raise NotImplementedError

Abstract send notification method.

@staticmethod
def check_if_notification_is_failure_notification(spec: lakehouse_engine.core.definitions.TerminatorSpec) -> bool:
 97    @staticmethod
 98    def check_if_notification_is_failure_notification(
 99        spec: TerminatorSpec,
100    ) -> bool:
101        """Check if given notification is a failure notification.
102
103        Args:
104            spec: spec to validate if it is a failure notification.
105
106        Returns:
107            A boolean telling if the notification is a failure notification
108        """
109        notification = spec.args
110        is_notification_failure_notification: bool = False
111
112        if "template" in notification.keys():
113            template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get(
114                notification["template"], {}
115            )
116
117            if template:
118                is_notification_failure_notification = notification.get(
119                    "on_failure", True
120                )
121            else:
122                raise ValueError(f"""Template {notification["template"]} not found.""")
123        else:
124            is_notification_failure_notification = notification.get("on_failure", True)
125
126        return is_notification_failure_notification

Check if given notification is a failure notification.

Arguments:
  • spec: spec to validate if it is a failure notification.
Returns:

A boolean telling if the notification is a failure notification