lakehouse_engine.terminators.notifier
Module with notification terminator.
1"""Module with notification terminator.""" 2 3from abc import ABC, abstractmethod 4 5from jinja2 import Template 6 7from lakehouse_engine.core.definitions import ( 8 NOTIFICATION_RUNTIME_PARAMETERS, 9 NotificationRuntimeParameters, 10 TerminatorSpec, 11) 12from lakehouse_engine.core.exec_env import ExecEnv 13from lakehouse_engine.terminators.notifiers.notification_templates import ( 14 NotificationsTemplates, 15) 16from lakehouse_engine.utils.databricks_utils import DatabricksUtils 17from lakehouse_engine.utils.logging_handler import LoggingHandler 18 19 20class Notifier(ABC): 21 """Abstract Notification class.""" 22 23 _logger = LoggingHandler(__name__).get_logger() 24 25 def __init__(self, notification_spec: TerminatorSpec): 26 """Construct Notification instances. 27 28 Args: 29 notification_spec: notification specification. 30 """ 31 self.type = notification_spec.args.get("type") 32 self.notification = notification_spec.args 33 34 @abstractmethod 35 def create_notification(self) -> None: 36 """Abstract create notification method.""" 37 raise NotImplementedError 38 39 @abstractmethod 40 def send_notification(self) -> None: 41 """Abstract send notification method.""" 42 raise NotImplementedError 43 44 @staticmethod 45 def _check_args_are_correct(given_args: dict, template_args: list) -> None: 46 """Checking if all arguments in template were set. 47 48 Args: 49 given_args: args set in the terminator spec. 50 template_args: args needed to be set in the template. 51 """ 52 extra_args = [] 53 54 for key in given_args.keys(): 55 if key in template_args: 56 template_args.remove(key) 57 else: 58 extra_args.append(key) 59 60 if set(template_args) - set(NOTIFICATION_RUNTIME_PARAMETERS): 61 raise ValueError( 62 "The following template args have not been set: " 63 + ", ".join(template_args) 64 ) 65 66 if extra_args: 67 Notifier._logger.info( 68 "Extra parameters sent to template: " + ", ".join(extra_args) 69 ) 70 71 @staticmethod 72 def _render_notification_field(template_field: str, args: dict) -> str: 73 """Render the notification given args. 74 75 Args: 76 template_field: Message with templates to be replaced. 77 args: key/value pairs to be replaced in the message. 78 79 Returns: 80 Rendered field 81 """ 82 field_template = Template(template_field) 83 if ( 84 NotificationRuntimeParameters.DATABRICKS_JOB_NAME.value in template_field 85 or NotificationRuntimeParameters.DATABRICKS_WORKSPACE_ID.value 86 in template_field 87 ): 88 workspace_id, job_name = DatabricksUtils.get_databricks_job_information( 89 ExecEnv.SESSION 90 ) 91 args["databricks_job_name"] = job_name 92 args["databricks_workspace_id"] = workspace_id 93 94 return field_template.render(args) 95 96 @staticmethod 97 def check_if_notification_is_failure_notification( 98 spec: TerminatorSpec, 99 ) -> bool: 100 """Check if given notification is a failure notification. 101 102 Args: 103 spec: spec to validate if it is a failure notification. 104 105 Returns: 106 A boolean telling if the notification is a failure notification 107 """ 108 notification = spec.args 109 is_notification_failure_notification: bool = False 110 111 if "template" in notification.keys(): 112 template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get( 113 notification["template"], {} 114 ) 115 116 if template: 117 is_notification_failure_notification = notification.get( 118 "on_failure", True 119 ) 120 else: 121 raise ValueError(f"""Template {notification["template"]} not found.""") 122 else: 123 is_notification_failure_notification = notification.get("on_failure", True) 124 125 return is_notification_failure_notification
class
Notifier(abc.ABC):
21class Notifier(ABC): 22 """Abstract Notification class.""" 23 24 _logger = LoggingHandler(__name__).get_logger() 25 26 def __init__(self, notification_spec: TerminatorSpec): 27 """Construct Notification instances. 28 29 Args: 30 notification_spec: notification specification. 31 """ 32 self.type = notification_spec.args.get("type") 33 self.notification = notification_spec.args 34 35 @abstractmethod 36 def create_notification(self) -> None: 37 """Abstract create notification method.""" 38 raise NotImplementedError 39 40 @abstractmethod 41 def send_notification(self) -> None: 42 """Abstract send notification method.""" 43 raise NotImplementedError 44 45 @staticmethod 46 def _check_args_are_correct(given_args: dict, template_args: list) -> None: 47 """Checking if all arguments in template were set. 48 49 Args: 50 given_args: args set in the terminator spec. 51 template_args: args needed to be set in the template. 52 """ 53 extra_args = [] 54 55 for key in given_args.keys(): 56 if key in template_args: 57 template_args.remove(key) 58 else: 59 extra_args.append(key) 60 61 if set(template_args) - set(NOTIFICATION_RUNTIME_PARAMETERS): 62 raise ValueError( 63 "The following template args have not been set: " 64 + ", ".join(template_args) 65 ) 66 67 if extra_args: 68 Notifier._logger.info( 69 "Extra parameters sent to template: " + ", ".join(extra_args) 70 ) 71 72 @staticmethod 73 def _render_notification_field(template_field: str, args: dict) -> str: 74 """Render the notification given args. 75 76 Args: 77 template_field: Message with templates to be replaced. 78 args: key/value pairs to be replaced in the message. 79 80 Returns: 81 Rendered field 82 """ 83 field_template = Template(template_field) 84 if ( 85 NotificationRuntimeParameters.DATABRICKS_JOB_NAME.value in template_field 86 or NotificationRuntimeParameters.DATABRICKS_WORKSPACE_ID.value 87 in template_field 88 ): 89 workspace_id, job_name = DatabricksUtils.get_databricks_job_information( 90 ExecEnv.SESSION 91 ) 92 args["databricks_job_name"] = job_name 93 args["databricks_workspace_id"] = workspace_id 94 95 return field_template.render(args) 96 97 @staticmethod 98 def check_if_notification_is_failure_notification( 99 spec: TerminatorSpec, 100 ) -> bool: 101 """Check if given notification is a failure notification. 102 103 Args: 104 spec: spec to validate if it is a failure notification. 105 106 Returns: 107 A boolean telling if the notification is a failure notification 108 """ 109 notification = spec.args 110 is_notification_failure_notification: bool = False 111 112 if "template" in notification.keys(): 113 template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get( 114 notification["template"], {} 115 ) 116 117 if template: 118 is_notification_failure_notification = notification.get( 119 "on_failure", True 120 ) 121 else: 122 raise ValueError(f"""Template {notification["template"]} not found.""") 123 else: 124 is_notification_failure_notification = notification.get("on_failure", True) 125 126 return is_notification_failure_notification
Abstract Notification class.
Notifier(notification_spec: lakehouse_engine.core.definitions.TerminatorSpec)
26 def __init__(self, notification_spec: TerminatorSpec): 27 """Construct Notification instances. 28 29 Args: 30 notification_spec: notification specification. 31 """ 32 self.type = notification_spec.args.get("type") 33 self.notification = notification_spec.args
Construct Notification instances.
Arguments:
- notification_spec: notification specification.
@abstractmethod
def
create_notification(self) -> None:
35 @abstractmethod 36 def create_notification(self) -> None: 37 """Abstract create notification method.""" 38 raise NotImplementedError
Abstract create notification method.
@abstractmethod
def
send_notification(self) -> None:
40 @abstractmethod 41 def send_notification(self) -> None: 42 """Abstract send notification method.""" 43 raise NotImplementedError
Abstract send notification method.
@staticmethod
def
check_if_notification_is_failure_notification(spec: lakehouse_engine.core.definitions.TerminatorSpec) -> bool:
97 @staticmethod 98 def check_if_notification_is_failure_notification( 99 spec: TerminatorSpec, 100 ) -> bool: 101 """Check if given notification is a failure notification. 102 103 Args: 104 spec: spec to validate if it is a failure notification. 105 106 Returns: 107 A boolean telling if the notification is a failure notification 108 """ 109 notification = spec.args 110 is_notification_failure_notification: bool = False 111 112 if "template" in notification.keys(): 113 template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get( 114 notification["template"], {} 115 ) 116 117 if template: 118 is_notification_failure_notification = notification.get( 119 "on_failure", True 120 ) 121 else: 122 raise ValueError(f"""Template {notification["template"]} not found.""") 123 else: 124 is_notification_failure_notification = notification.get("on_failure", True) 125 126 return is_notification_failure_notification
Check if given notification is a failure notification.
Arguments:
- spec: spec to validate if it is a failure notification.
Returns:
A boolean telling if the notification is a failure notification