lakehouse_engine.terminators.notifier_factory
Module for notifier factory.
1"""Module for notifier factory.""" 2 3from abc import ABC 4 5from lakehouse_engine.core.definitions import NotifierType, TerminatorSpec 6from lakehouse_engine.terminators.notifier import Notifier 7from lakehouse_engine.terminators.notifiers.email_notifier import EmailNotifier 8 9 10class NotifierFactory(ABC): 11 """Class for notification factory.""" 12 13 NOTIFIER_TYPES = {NotifierType.EMAIL.value: EmailNotifier} 14 15 @classmethod 16 def get_notifier(cls, spec: TerminatorSpec) -> Notifier: 17 """Get a notifier according to the terminator specs using a factory. 18 19 Args: 20 spec: terminator specification. 21 22 Returns: 23 Notifier: notifier that will handle notifications. 24 """ 25 notifier_name = spec.args.get("type") 26 notifier = cls.NOTIFIER_TYPES.get(notifier_name) 27 28 if notifier: 29 return notifier(notification_spec=spec) 30 else: 31 raise NotImplementedError( 32 f"The requested notification format {notifier_name} is not supported." 33 ) 34 35 @staticmethod 36 def generate_failure_notification(spec: list, exception: Exception) -> None: 37 """Check if it is necessary to send a failure notification and generate it. 38 39 Args: 40 spec: List of termination specs 41 exception: Exception that caused the failure. 42 """ 43 notification_specs = [] 44 45 for terminator in spec: 46 if terminator.function == "notify": 47 notification_specs.append(terminator) 48 49 for notification in notification_specs: 50 notification_args = notification.args 51 generate_failure_notification = notification_args.get( 52 "generate_failure_notification", False 53 ) 54 55 if generate_failure_notification or ( 56 Notifier.check_if_notification_is_failure_notification(notification) 57 ): 58 failure_notification_spec = notification_args 59 60 failure_notification_spec_args = notification_args.get("args", {}) 61 62 failure_notification_spec_args["exception"] = str(exception) 63 64 failure_notification_spec["args"] = failure_notification_spec_args 65 66 if generate_failure_notification: 67 failure_notification_spec["template"] = ( 68 f"""failure_notification_{notification_args["type"]}""" 69 ) 70 elif "template" in notification_args.keys(): 71 failure_notification_spec["template"] = notification_args[ 72 "template" 73 ] 74 75 failure_spec = TerminatorSpec( 76 function="notification", args=failure_notification_spec 77 ) 78 79 notifier = NotifierFactory.get_notifier(failure_spec) 80 notifier.create_notification() 81 notifier.send_notification()
class
NotifierFactory(abc.ABC):
11class NotifierFactory(ABC): 12 """Class for notification factory.""" 13 14 NOTIFIER_TYPES = {NotifierType.EMAIL.value: EmailNotifier} 15 16 @classmethod 17 def get_notifier(cls, spec: TerminatorSpec) -> Notifier: 18 """Get a notifier according to the terminator specs using a factory. 19 20 Args: 21 spec: terminator specification. 22 23 Returns: 24 Notifier: notifier that will handle notifications. 25 """ 26 notifier_name = spec.args.get("type") 27 notifier = cls.NOTIFIER_TYPES.get(notifier_name) 28 29 if notifier: 30 return notifier(notification_spec=spec) 31 else: 32 raise NotImplementedError( 33 f"The requested notification format {notifier_name} is not supported." 34 ) 35 36 @staticmethod 37 def generate_failure_notification(spec: list, exception: Exception) -> None: 38 """Check if it is necessary to send a failure notification and generate it. 39 40 Args: 41 spec: List of termination specs 42 exception: Exception that caused the failure. 43 """ 44 notification_specs = [] 45 46 for terminator in spec: 47 if terminator.function == "notify": 48 notification_specs.append(terminator) 49 50 for notification in notification_specs: 51 notification_args = notification.args 52 generate_failure_notification = notification_args.get( 53 "generate_failure_notification", False 54 ) 55 56 if generate_failure_notification or ( 57 Notifier.check_if_notification_is_failure_notification(notification) 58 ): 59 failure_notification_spec = notification_args 60 61 failure_notification_spec_args = notification_args.get("args", {}) 62 63 failure_notification_spec_args["exception"] = str(exception) 64 65 failure_notification_spec["args"] = failure_notification_spec_args 66 67 if generate_failure_notification: 68 failure_notification_spec["template"] = ( 69 f"""failure_notification_{notification_args["type"]}""" 70 ) 71 elif "template" in notification_args.keys(): 72 failure_notification_spec["template"] = notification_args[ 73 "template" 74 ] 75 76 failure_spec = TerminatorSpec( 77 function="notification", args=failure_notification_spec 78 ) 79 80 notifier = NotifierFactory.get_notifier(failure_spec) 81 notifier.create_notification() 82 notifier.send_notification()
Class for notification factory.
NOTIFIER_TYPES =
{'email': <class 'lakehouse_engine.terminators.notifiers.email_notifier.EmailNotifier'>}
@classmethod
def
get_notifier( cls, spec: lakehouse_engine.core.definitions.TerminatorSpec) -> lakehouse_engine.terminators.notifier.Notifier:
16 @classmethod 17 def get_notifier(cls, spec: TerminatorSpec) -> Notifier: 18 """Get a notifier according to the terminator specs using a factory. 19 20 Args: 21 spec: terminator specification. 22 23 Returns: 24 Notifier: notifier that will handle notifications. 25 """ 26 notifier_name = spec.args.get("type") 27 notifier = cls.NOTIFIER_TYPES.get(notifier_name) 28 29 if notifier: 30 return notifier(notification_spec=spec) 31 else: 32 raise NotImplementedError( 33 f"The requested notification format {notifier_name} is not supported." 34 )
Get a notifier according to the terminator specs using a factory.
Arguments:
- spec: terminator specification.
Returns:
Notifier: notifier that will handle notifications.
@staticmethod
def
generate_failure_notification(spec: list, exception: Exception) -> None:
36 @staticmethod 37 def generate_failure_notification(spec: list, exception: Exception) -> None: 38 """Check if it is necessary to send a failure notification and generate it. 39 40 Args: 41 spec: List of termination specs 42 exception: Exception that caused the failure. 43 """ 44 notification_specs = [] 45 46 for terminator in spec: 47 if terminator.function == "notify": 48 notification_specs.append(terminator) 49 50 for notification in notification_specs: 51 notification_args = notification.args 52 generate_failure_notification = notification_args.get( 53 "generate_failure_notification", False 54 ) 55 56 if generate_failure_notification or ( 57 Notifier.check_if_notification_is_failure_notification(notification) 58 ): 59 failure_notification_spec = notification_args 60 61 failure_notification_spec_args = notification_args.get("args", {}) 62 63 failure_notification_spec_args["exception"] = str(exception) 64 65 failure_notification_spec["args"] = failure_notification_spec_args 66 67 if generate_failure_notification: 68 failure_notification_spec["template"] = ( 69 f"""failure_notification_{notification_args["type"]}""" 70 ) 71 elif "template" in notification_args.keys(): 72 failure_notification_spec["template"] = notification_args[ 73 "template" 74 ] 75 76 failure_spec = TerminatorSpec( 77 function="notification", args=failure_notification_spec 78 ) 79 80 notifier = NotifierFactory.get_notifier(failure_spec) 81 notifier.create_notification() 82 notifier.send_notification()
Check if it is necessary to send a failure notification and generate it.
Arguments:
- spec: List of termination specs
- exception: Exception that caused the failure.