lakehouse_engine.terminators.notifiers.email_notifier

Module with email notifier.

  1"""Module with email notifier."""
  2
  3import asyncio
  4import smtplib
  5from copy import copy
  6from email.mime.multipart import MIMEMultipart
  7from email.mime.text import MIMEText
  8
  9from lakehouse_engine.core.definitions import TerminatorSpec
 10from lakehouse_engine.core.exec_env import ExecEnv
 11from lakehouse_engine.terminators.notifier import Notifier
 12from lakehouse_engine.terminators.notifiers.notification_templates import (
 13    NotificationsTemplates,
 14)
 15from lakehouse_engine.utils.logging_handler import LoggingHandler
 16
 17
 18class EmailNotifier(Notifier):
 19    """Base Notification class."""
 20
 21    _logger = LoggingHandler(__name__).get_logger()
 22
 23    def __init__(self, notification_spec: TerminatorSpec):
 24        """Construct Email Notification instance.
 25
 26        Args:
 27            notification_spec: notification specification.
 28        """
 29        super().__init__(notification_spec)
 30
 31    def create_notification(self) -> None:
 32        """Creates the notification to be sent."""
 33        given_args = self.notification.get("args", {})
 34
 35        if "template" in self.notification.keys():
 36            template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get(
 37                self.notification["template"], {}
 38            )
 39
 40            if template:
 41                template_args = template["args"]
 42
 43                Notifier._check_args_are_correct(given_args, copy(template_args))
 44
 45                self.notification["message"] = self._render_notification_field(
 46                    template["message"], given_args
 47                )
 48                self.notification["subject"] = self._render_notification_field(
 49                    template["subject"], given_args
 50                )
 51                self.notification["mimetype"] = template["mimetype"]
 52
 53            else:
 54                raise ValueError(
 55                    f"""Template {self.notification["template"]} does not exist"""
 56                )
 57
 58        elif "message" in self.notification.keys():
 59            if given_args:
 60                self.notification["message"] = self._render_notification_field(
 61                    self.notification["message"], given_args
 62                )
 63                self.notification["subject"] = self._render_notification_field(
 64                    self.notification["subject"], given_args
 65                )
 66        else:
 67            raise ValueError("Malformed Notification Definition")
 68
 69    def send_notification(self) -> None:
 70        """Sends the notification by using a series of methods."""
 71        server = self.notification["server"]
 72        notification_office_email_servers = ["smtp.office365.com"]
 73
 74        if (
 75            ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers is not None
 76            and server in ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers
 77        ):
 78            raise ValueError(
 79                f"Trying to use disallowed smtp server: '{server}'.\n"
 80                f"Disallowed smtp servers: "
 81                f"{str(ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers)}"
 82            )
 83        elif server in notification_office_email_servers:
 84            self._authenticate_and_send_office365()
 85        else:
 86            self._authenticate_and_send_simple_smtp()
 87
 88    def _authenticate_and_send_office365(self) -> None:
 89        """Authenticates and sends an email notification using Graph API."""
 90        from azure.identity.aio import ClientSecretCredential
 91        from msgraph import GraphServiceClient
 92        from msgraph.generated.models.email_address import EmailAddress
 93        from msgraph.generated.models.item_body import ItemBody
 94        from msgraph.generated.models.message import Message
 95        from msgraph.generated.models.recipient import Recipient
 96        from msgraph.generated.users.item.send_mail.send_mail_post_request_body import (
 97            SendMailPostRequestBody,
 98        )
 99
100        self._logger.info("Attempting authentication using Graph API.")
101
102        credential = ClientSecretCredential(
103            tenant_id=self.notification["tenant_id"],
104            client_id=self.notification["user"],
105            client_secret=self.notification["password"],
106        )
107        client = GraphServiceClient(credentials=credential)
108
109        request_body = SendMailPostRequestBody()
110        message = Message()
111        message.subject = self.notification["subject"]
112
113        message_body = ItemBody()
114
115        message_body.content = self.notification["message"]
116
117        message.body = message_body
118
119        recipients = []
120        for email in self.notification["to"]:
121            recipient = Recipient()
122            recipient_address = EmailAddress()
123            recipient_address.address = email
124            recipient.email_address = recipient_address
125
126            recipients.append(recipient)
127
128        message.to_recipients = recipients
129
130        request_body.message = message
131        request_body.save_to_sent_items = False
132
133        self._logger.info(f"Sending notification email with body: {request_body}")
134
135        import nest_asyncio
136
137        nest_asyncio.apply()
138        asyncio.get_event_loop().run_until_complete(
139            client.users.by_user_id(self.notification["from"]).send_mail.post(
140                body=request_body
141            )
142        )
143
144        self._logger.info("Notification email sent successfully.")
145
146    def _authenticate_and_send_simple_smtp(self) -> None:
147        """Authenticates and sends an email notification using simple authentication."""
148        with smtplib.SMTP(
149            self.notification["server"], self.notification["port"]
150        ) as smtp:
151            try:
152                smtp.starttls()
153                smtp.login(
154                    self.notification.get("user", ""),
155                    self.notification.get("password", ""),
156                )
157            except smtplib.SMTPException as e:
158                self._logger.exception(
159                    f"Exception while authenticating to smtp: {str(e)}"
160                )
161                self._logger.exception(
162                    "Attempting to send the notification without authentication"
163                )
164
165            mesg = MIMEMultipart()
166            mesg["From"] = self.notification["from"]
167            mesg["To"] = ", ".join(self.notification["to"])
168            mesg["Subject"] = self.notification["subject"]
169
170            body = MIMEText(self.notification["message"], self.notification["mimetype"])
171            mesg.attach(body)
172            try:
173                smtp.sendmail(
174                    self.notification["from"], self.notification["to"], mesg.as_string()
175                )
176                self._logger.info("Email sent successfully.")
177            except smtplib.SMTPException as e:
178                self._logger.exception(f"Exception while sending email: {str(e)}")
class EmailNotifier(lakehouse_engine.terminators.notifier.Notifier):
 19class EmailNotifier(Notifier):
 20    """Base Notification class."""
 21
 22    _logger = LoggingHandler(__name__).get_logger()
 23
 24    def __init__(self, notification_spec: TerminatorSpec):
 25        """Construct Email Notification instance.
 26
 27        Args:
 28            notification_spec: notification specification.
 29        """
 30        super().__init__(notification_spec)
 31
 32    def create_notification(self) -> None:
 33        """Creates the notification to be sent."""
 34        given_args = self.notification.get("args", {})
 35
 36        if "template" in self.notification.keys():
 37            template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get(
 38                self.notification["template"], {}
 39            )
 40
 41            if template:
 42                template_args = template["args"]
 43
 44                Notifier._check_args_are_correct(given_args, copy(template_args))
 45
 46                self.notification["message"] = self._render_notification_field(
 47                    template["message"], given_args
 48                )
 49                self.notification["subject"] = self._render_notification_field(
 50                    template["subject"], given_args
 51                )
 52                self.notification["mimetype"] = template["mimetype"]
 53
 54            else:
 55                raise ValueError(
 56                    f"""Template {self.notification["template"]} does not exist"""
 57                )
 58
 59        elif "message" in self.notification.keys():
 60            if given_args:
 61                self.notification["message"] = self._render_notification_field(
 62                    self.notification["message"], given_args
 63                )
 64                self.notification["subject"] = self._render_notification_field(
 65                    self.notification["subject"], given_args
 66                )
 67        else:
 68            raise ValueError("Malformed Notification Definition")
 69
 70    def send_notification(self) -> None:
 71        """Sends the notification by using a series of methods."""
 72        server = self.notification["server"]
 73        notification_office_email_servers = ["smtp.office365.com"]
 74
 75        if (
 76            ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers is not None
 77            and server in ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers
 78        ):
 79            raise ValueError(
 80                f"Trying to use disallowed smtp server: '{server}'.\n"
 81                f"Disallowed smtp servers: "
 82                f"{str(ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers)}"
 83            )
 84        elif server in notification_office_email_servers:
 85            self._authenticate_and_send_office365()
 86        else:
 87            self._authenticate_and_send_simple_smtp()
 88
 89    def _authenticate_and_send_office365(self) -> None:
 90        """Authenticates and sends an email notification using Graph API."""
 91        from azure.identity.aio import ClientSecretCredential
 92        from msgraph import GraphServiceClient
 93        from msgraph.generated.models.email_address import EmailAddress
 94        from msgraph.generated.models.item_body import ItemBody
 95        from msgraph.generated.models.message import Message
 96        from msgraph.generated.models.recipient import Recipient
 97        from msgraph.generated.users.item.send_mail.send_mail_post_request_body import (
 98            SendMailPostRequestBody,
 99        )
100
101        self._logger.info("Attempting authentication using Graph API.")
102
103        credential = ClientSecretCredential(
104            tenant_id=self.notification["tenant_id"],
105            client_id=self.notification["user"],
106            client_secret=self.notification["password"],
107        )
108        client = GraphServiceClient(credentials=credential)
109
110        request_body = SendMailPostRequestBody()
111        message = Message()
112        message.subject = self.notification["subject"]
113
114        message_body = ItemBody()
115
116        message_body.content = self.notification["message"]
117
118        message.body = message_body
119
120        recipients = []
121        for email in self.notification["to"]:
122            recipient = Recipient()
123            recipient_address = EmailAddress()
124            recipient_address.address = email
125            recipient.email_address = recipient_address
126
127            recipients.append(recipient)
128
129        message.to_recipients = recipients
130
131        request_body.message = message
132        request_body.save_to_sent_items = False
133
134        self._logger.info(f"Sending notification email with body: {request_body}")
135
136        import nest_asyncio
137
138        nest_asyncio.apply()
139        asyncio.get_event_loop().run_until_complete(
140            client.users.by_user_id(self.notification["from"]).send_mail.post(
141                body=request_body
142            )
143        )
144
145        self._logger.info("Notification email sent successfully.")
146
147    def _authenticate_and_send_simple_smtp(self) -> None:
148        """Authenticates and sends an email notification using simple authentication."""
149        with smtplib.SMTP(
150            self.notification["server"], self.notification["port"]
151        ) as smtp:
152            try:
153                smtp.starttls()
154                smtp.login(
155                    self.notification.get("user", ""),
156                    self.notification.get("password", ""),
157                )
158            except smtplib.SMTPException as e:
159                self._logger.exception(
160                    f"Exception while authenticating to smtp: {str(e)}"
161                )
162                self._logger.exception(
163                    "Attempting to send the notification without authentication"
164                )
165
166            mesg = MIMEMultipart()
167            mesg["From"] = self.notification["from"]
168            mesg["To"] = ", ".join(self.notification["to"])
169            mesg["Subject"] = self.notification["subject"]
170
171            body = MIMEText(self.notification["message"], self.notification["mimetype"])
172            mesg.attach(body)
173            try:
174                smtp.sendmail(
175                    self.notification["from"], self.notification["to"], mesg.as_string()
176                )
177                self._logger.info("Email sent successfully.")
178            except smtplib.SMTPException as e:
179                self._logger.exception(f"Exception while sending email: {str(e)}")

Base Notification class.

EmailNotifier(notification_spec: lakehouse_engine.core.definitions.TerminatorSpec)
24    def __init__(self, notification_spec: TerminatorSpec):
25        """Construct Email Notification instance.
26
27        Args:
28            notification_spec: notification specification.
29        """
30        super().__init__(notification_spec)

Construct Email Notification instance.

Arguments:
  • notification_spec: notification specification.
def create_notification(self) -> None:
32    def create_notification(self) -> None:
33        """Creates the notification to be sent."""
34        given_args = self.notification.get("args", {})
35
36        if "template" in self.notification.keys():
37            template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get(
38                self.notification["template"], {}
39            )
40
41            if template:
42                template_args = template["args"]
43
44                Notifier._check_args_are_correct(given_args, copy(template_args))
45
46                self.notification["message"] = self._render_notification_field(
47                    template["message"], given_args
48                )
49                self.notification["subject"] = self._render_notification_field(
50                    template["subject"], given_args
51                )
52                self.notification["mimetype"] = template["mimetype"]
53
54            else:
55                raise ValueError(
56                    f"""Template {self.notification["template"]} does not exist"""
57                )
58
59        elif "message" in self.notification.keys():
60            if given_args:
61                self.notification["message"] = self._render_notification_field(
62                    self.notification["message"], given_args
63                )
64                self.notification["subject"] = self._render_notification_field(
65                    self.notification["subject"], given_args
66                )
67        else:
68            raise ValueError("Malformed Notification Definition")

Creates the notification to be sent.

def send_notification(self) -> None:
70    def send_notification(self) -> None:
71        """Sends the notification by using a series of methods."""
72        server = self.notification["server"]
73        notification_office_email_servers = ["smtp.office365.com"]
74
75        if (
76            ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers is not None
77            and server in ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers
78        ):
79            raise ValueError(
80                f"Trying to use disallowed smtp server: '{server}'.\n"
81                f"Disallowed smtp servers: "
82                f"{str(ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers)}"
83            )
84        elif server in notification_office_email_servers:
85            self._authenticate_and_send_office365()
86        else:
87            self._authenticate_and_send_simple_smtp()

Sends the notification by using a series of methods.