lakehouse_engine.terminators.notifier_factory

Module for notifier factory.

 1"""Module for notifier factory."""
 2
 3from abc import ABC
 4
 5from lakehouse_engine.core.definitions import NotifierType, TerminatorSpec
 6from lakehouse_engine.terminators.notifier import Notifier
 7from lakehouse_engine.terminators.notifiers.email_notifier import EmailNotifier
 8
 9
10class NotifierFactory(ABC):
11    """Class for notification factory."""
12
13    NOTIFIER_TYPES = {NotifierType.EMAIL.value: EmailNotifier}
14
15    @classmethod
16    def get_notifier(cls, spec: TerminatorSpec) -> Notifier:
17        """Get a notifier according to the terminator specs using a factory.
18
19        Args:
20            spec: terminator specification.
21
22        Returns:
23            Notifier: notifier that will handle notifications.
24        """
25        notifier_name = spec.args.get("type")
26        notifier = cls.NOTIFIER_TYPES.get(notifier_name)
27
28        if notifier:
29            return notifier(notification_spec=spec)
30        else:
31            raise NotImplementedError(
32                f"The requested notification format {notifier_name} is not supported."
33            )
34
35    @staticmethod
36    def generate_failure_notification(spec: list, exception: Exception) -> None:
37        """Check if it is necessary to send a failure notification and generate it.
38
39        Args:
40            spec: List of termination specs
41            exception: Exception that caused the failure.
42        """
43        notification_specs = []
44
45        for terminator in spec:
46            if terminator.function == "notify":
47                notification_specs.append(terminator)
48
49        for notification in notification_specs:
50            notification_args = notification.args
51            generate_failure_notification = notification_args.get(
52                "generate_failure_notification", False
53            )
54
55            if generate_failure_notification or (
56                Notifier.check_if_notification_is_failure_notification(notification)
57            ):
58                failure_notification_spec = notification_args
59
60                failure_notification_spec_args = notification_args.get("args", {})
61
62                failure_notification_spec_args["exception"] = str(exception)
63
64                failure_notification_spec["args"] = failure_notification_spec_args
65
66                if generate_failure_notification:
67                    failure_notification_spec["template"] = (
68                        f"""failure_notification_{notification_args["type"]}"""
69                    )
70                elif "template" in notification_args.keys():
71                    failure_notification_spec["template"] = notification_args[
72                        "template"
73                    ]
74
75                failure_spec = TerminatorSpec(
76                    function="notification", args=failure_notification_spec
77                )
78
79                notifier = NotifierFactory.get_notifier(failure_spec)
80                notifier.create_notification()
81                notifier.send_notification()
class NotifierFactory(abc.ABC):
11class NotifierFactory(ABC):
12    """Class for notification factory."""
13
14    NOTIFIER_TYPES = {NotifierType.EMAIL.value: EmailNotifier}
15
16    @classmethod
17    def get_notifier(cls, spec: TerminatorSpec) -> Notifier:
18        """Get a notifier according to the terminator specs using a factory.
19
20        Args:
21            spec: terminator specification.
22
23        Returns:
24            Notifier: notifier that will handle notifications.
25        """
26        notifier_name = spec.args.get("type")
27        notifier = cls.NOTIFIER_TYPES.get(notifier_name)
28
29        if notifier:
30            return notifier(notification_spec=spec)
31        else:
32            raise NotImplementedError(
33                f"The requested notification format {notifier_name} is not supported."
34            )
35
36    @staticmethod
37    def generate_failure_notification(spec: list, exception: Exception) -> None:
38        """Check if it is necessary to send a failure notification and generate it.
39
40        Args:
41            spec: List of termination specs
42            exception: Exception that caused the failure.
43        """
44        notification_specs = []
45
46        for terminator in spec:
47            if terminator.function == "notify":
48                notification_specs.append(terminator)
49
50        for notification in notification_specs:
51            notification_args = notification.args
52            generate_failure_notification = notification_args.get(
53                "generate_failure_notification", False
54            )
55
56            if generate_failure_notification or (
57                Notifier.check_if_notification_is_failure_notification(notification)
58            ):
59                failure_notification_spec = notification_args
60
61                failure_notification_spec_args = notification_args.get("args", {})
62
63                failure_notification_spec_args["exception"] = str(exception)
64
65                failure_notification_spec["args"] = failure_notification_spec_args
66
67                if generate_failure_notification:
68                    failure_notification_spec["template"] = (
69                        f"""failure_notification_{notification_args["type"]}"""
70                    )
71                elif "template" in notification_args.keys():
72                    failure_notification_spec["template"] = notification_args[
73                        "template"
74                    ]
75
76                failure_spec = TerminatorSpec(
77                    function="notification", args=failure_notification_spec
78                )
79
80                notifier = NotifierFactory.get_notifier(failure_spec)
81                notifier.create_notification()
82                notifier.send_notification()

Class for notification factory.

16    @classmethod
17    def get_notifier(cls, spec: TerminatorSpec) -> Notifier:
18        """Get a notifier according to the terminator specs using a factory.
19
20        Args:
21            spec: terminator specification.
22
23        Returns:
24            Notifier: notifier that will handle notifications.
25        """
26        notifier_name = spec.args.get("type")
27        notifier = cls.NOTIFIER_TYPES.get(notifier_name)
28
29        if notifier:
30            return notifier(notification_spec=spec)
31        else:
32            raise NotImplementedError(
33                f"The requested notification format {notifier_name} is not supported."
34            )

Get a notifier according to the terminator specs using a factory.

Arguments:
  • spec: terminator specification.
Returns:

Notifier: notifier that will handle notifications.

@staticmethod
def generate_failure_notification(spec: list, exception: Exception) -> None:
36    @staticmethod
37    def generate_failure_notification(spec: list, exception: Exception) -> None:
38        """Check if it is necessary to send a failure notification and generate it.
39
40        Args:
41            spec: List of termination specs
42            exception: Exception that caused the failure.
43        """
44        notification_specs = []
45
46        for terminator in spec:
47            if terminator.function == "notify":
48                notification_specs.append(terminator)
49
50        for notification in notification_specs:
51            notification_args = notification.args
52            generate_failure_notification = notification_args.get(
53                "generate_failure_notification", False
54            )
55
56            if generate_failure_notification or (
57                Notifier.check_if_notification_is_failure_notification(notification)
58            ):
59                failure_notification_spec = notification_args
60
61                failure_notification_spec_args = notification_args.get("args", {})
62
63                failure_notification_spec_args["exception"] = str(exception)
64
65                failure_notification_spec["args"] = failure_notification_spec_args
66
67                if generate_failure_notification:
68                    failure_notification_spec["template"] = (
69                        f"""failure_notification_{notification_args["type"]}"""
70                    )
71                elif "template" in notification_args.keys():
72                    failure_notification_spec["template"] = notification_args[
73                        "template"
74                    ]
75
76                failure_spec = TerminatorSpec(
77                    function="notification", args=failure_notification_spec
78                )
79
80                notifier = NotifierFactory.get_notifier(failure_spec)
81                notifier.create_notification()
82                notifier.send_notification()

Check if it is necessary to send a failure notification and generate it.

Arguments:
  • spec: List of termination specs
  • exception: Exception that caused the failure.