lakehouse_engine.terminators.notifiers.email_notifier
Module with email notifier.
1"""Module with email notifier.""" 2 3import asyncio 4import smtplib 5from copy import copy 6from email.mime.multipart import MIMEMultipart 7from email.mime.text import MIMEText 8 9from lakehouse_engine.core.definitions import TerminatorSpec 10from lakehouse_engine.core.exec_env import ExecEnv 11from lakehouse_engine.terminators.notifier import Notifier 12from lakehouse_engine.terminators.notifiers.notification_templates import ( 13 NotificationsTemplates, 14) 15from lakehouse_engine.utils.logging_handler import LoggingHandler 16 17 18class EmailNotifier(Notifier): 19 """Base Notification class.""" 20 21 _logger = LoggingHandler(__name__).get_logger() 22 23 def __init__(self, notification_spec: TerminatorSpec): 24 """Construct Email Notification instance. 25 26 Args: 27 notification_spec: notification specification. 28 """ 29 super().__init__(notification_spec) 30 31 def create_notification(self) -> None: 32 """Creates the notification to be sent.""" 33 given_args = self.notification.get("args", {}) 34 35 if "template" in self.notification.keys(): 36 template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get( 37 self.notification["template"], {} 38 ) 39 40 if template: 41 template_args = template["args"] 42 43 Notifier._check_args_are_correct(given_args, copy(template_args)) 44 45 self.notification["message"] = self._render_notification_field( 46 template["message"], given_args 47 ) 48 self.notification["subject"] = self._render_notification_field( 49 template["subject"], given_args 50 ) 51 self.notification["mimetype"] = template["mimetype"] 52 53 else: 54 raise ValueError( 55 f"""Template {self.notification["template"]} does not exist""" 56 ) 57 58 elif "message" in self.notification.keys(): 59 if given_args: 60 self.notification["message"] = self._render_notification_field( 61 self.notification["message"], given_args 62 ) 63 self.notification["subject"] = self._render_notification_field( 64 self.notification["subject"], given_args 65 ) 66 else: 67 raise ValueError("Malformed Notification Definition") 68 69 def send_notification(self) -> None: 70 """Sends the notification by using a series of methods.""" 71 server = self.notification["server"] 72 notification_office_email_servers = ["smtp.office365.com"] 73 74 if ( 75 ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers is not None 76 and server in ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers 77 ): 78 raise ValueError( 79 f"Trying to use disallowed smtp server: '{server}'.\n" 80 f"Disallowed smtp servers: " 81 f"{str(ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers)}" 82 ) 83 elif server in notification_office_email_servers: 84 self._authenticate_and_send_office365() 85 else: 86 self._authenticate_and_send_simple_smtp() 87 88 def _authenticate_and_send_office365(self) -> None: 89 """Authenticates and sends an email notification using Graph API.""" 90 from azure.identity.aio import ClientSecretCredential 91 from msgraph import GraphServiceClient 92 from msgraph.generated.models.email_address import EmailAddress 93 from msgraph.generated.models.item_body import ItemBody 94 from msgraph.generated.models.message import Message 95 from msgraph.generated.models.recipient import Recipient 96 from msgraph.generated.users.item.send_mail.send_mail_post_request_body import ( 97 SendMailPostRequestBody, 98 ) 99 100 self._logger.info("Attempting authentication using Graph API.") 101 102 credential = ClientSecretCredential( 103 tenant_id=self.notification["tenant_id"], 104 client_id=self.notification["user"], 105 client_secret=self.notification["password"], 106 ) 107 client = GraphServiceClient(credentials=credential) 108 109 request_body = SendMailPostRequestBody() 110 message = Message() 111 message.subject = self.notification["subject"] 112 113 message_body = ItemBody() 114 115 message_body.content = self.notification["message"] 116 117 message.body = message_body 118 119 recipients = [] 120 for email in self.notification["to"]: 121 recipient = Recipient() 122 recipient_address = EmailAddress() 123 recipient_address.address = email 124 recipient.email_address = recipient_address 125 126 recipients.append(recipient) 127 128 message.to_recipients = recipients 129 130 request_body.message = message 131 request_body.save_to_sent_items = False 132 133 self._logger.info(f"Sending notification email with body: {request_body}") 134 135 import nest_asyncio 136 137 nest_asyncio.apply() 138 asyncio.get_event_loop().run_until_complete( 139 client.users.by_user_id(self.notification["from"]).send_mail.post( 140 body=request_body 141 ) 142 ) 143 144 self._logger.info("Notification email sent successfully.") 145 146 def _authenticate_and_send_simple_smtp(self) -> None: 147 """Authenticates and sends an email notification using simple authentication.""" 148 with smtplib.SMTP( 149 self.notification["server"], self.notification["port"] 150 ) as smtp: 151 try: 152 smtp.starttls() 153 smtp.login( 154 self.notification.get("user", ""), 155 self.notification.get("password", ""), 156 ) 157 except smtplib.SMTPException as e: 158 self._logger.exception( 159 f"Exception while authenticating to smtp: {str(e)}" 160 ) 161 self._logger.exception( 162 "Attempting to send the notification without authentication" 163 ) 164 165 mesg = MIMEMultipart() 166 mesg["From"] = self.notification["from"] 167 mesg["To"] = ", ".join(self.notification["to"]) 168 mesg["Subject"] = self.notification["subject"] 169 170 body = MIMEText(self.notification["message"], self.notification["mimetype"]) 171 mesg.attach(body) 172 try: 173 smtp.sendmail( 174 self.notification["from"], self.notification["to"], mesg.as_string() 175 ) 176 self._logger.info("Email sent successfully.") 177 except smtplib.SMTPException as e: 178 self._logger.exception(f"Exception while sending email: {str(e)}")
19class EmailNotifier(Notifier): 20 """Base Notification class.""" 21 22 _logger = LoggingHandler(__name__).get_logger() 23 24 def __init__(self, notification_spec: TerminatorSpec): 25 """Construct Email Notification instance. 26 27 Args: 28 notification_spec: notification specification. 29 """ 30 super().__init__(notification_spec) 31 32 def create_notification(self) -> None: 33 """Creates the notification to be sent.""" 34 given_args = self.notification.get("args", {}) 35 36 if "template" in self.notification.keys(): 37 template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get( 38 self.notification["template"], {} 39 ) 40 41 if template: 42 template_args = template["args"] 43 44 Notifier._check_args_are_correct(given_args, copy(template_args)) 45 46 self.notification["message"] = self._render_notification_field( 47 template["message"], given_args 48 ) 49 self.notification["subject"] = self._render_notification_field( 50 template["subject"], given_args 51 ) 52 self.notification["mimetype"] = template["mimetype"] 53 54 else: 55 raise ValueError( 56 f"""Template {self.notification["template"]} does not exist""" 57 ) 58 59 elif "message" in self.notification.keys(): 60 if given_args: 61 self.notification["message"] = self._render_notification_field( 62 self.notification["message"], given_args 63 ) 64 self.notification["subject"] = self._render_notification_field( 65 self.notification["subject"], given_args 66 ) 67 else: 68 raise ValueError("Malformed Notification Definition") 69 70 def send_notification(self) -> None: 71 """Sends the notification by using a series of methods.""" 72 server = self.notification["server"] 73 notification_office_email_servers = ["smtp.office365.com"] 74 75 if ( 76 ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers is not None 77 and server in ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers 78 ): 79 raise ValueError( 80 f"Trying to use disallowed smtp server: '{server}'.\n" 81 f"Disallowed smtp servers: " 82 f"{str(ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers)}" 83 ) 84 elif server in notification_office_email_servers: 85 self._authenticate_and_send_office365() 86 else: 87 self._authenticate_and_send_simple_smtp() 88 89 def _authenticate_and_send_office365(self) -> None: 90 """Authenticates and sends an email notification using Graph API.""" 91 from azure.identity.aio import ClientSecretCredential 92 from msgraph import GraphServiceClient 93 from msgraph.generated.models.email_address import EmailAddress 94 from msgraph.generated.models.item_body import ItemBody 95 from msgraph.generated.models.message import Message 96 from msgraph.generated.models.recipient import Recipient 97 from msgraph.generated.users.item.send_mail.send_mail_post_request_body import ( 98 SendMailPostRequestBody, 99 ) 100 101 self._logger.info("Attempting authentication using Graph API.") 102 103 credential = ClientSecretCredential( 104 tenant_id=self.notification["tenant_id"], 105 client_id=self.notification["user"], 106 client_secret=self.notification["password"], 107 ) 108 client = GraphServiceClient(credentials=credential) 109 110 request_body = SendMailPostRequestBody() 111 message = Message() 112 message.subject = self.notification["subject"] 113 114 message_body = ItemBody() 115 116 message_body.content = self.notification["message"] 117 118 message.body = message_body 119 120 recipients = [] 121 for email in self.notification["to"]: 122 recipient = Recipient() 123 recipient_address = EmailAddress() 124 recipient_address.address = email 125 recipient.email_address = recipient_address 126 127 recipients.append(recipient) 128 129 message.to_recipients = recipients 130 131 request_body.message = message 132 request_body.save_to_sent_items = False 133 134 self._logger.info(f"Sending notification email with body: {request_body}") 135 136 import nest_asyncio 137 138 nest_asyncio.apply() 139 asyncio.get_event_loop().run_until_complete( 140 client.users.by_user_id(self.notification["from"]).send_mail.post( 141 body=request_body 142 ) 143 ) 144 145 self._logger.info("Notification email sent successfully.") 146 147 def _authenticate_and_send_simple_smtp(self) -> None: 148 """Authenticates and sends an email notification using simple authentication.""" 149 with smtplib.SMTP( 150 self.notification["server"], self.notification["port"] 151 ) as smtp: 152 try: 153 smtp.starttls() 154 smtp.login( 155 self.notification.get("user", ""), 156 self.notification.get("password", ""), 157 ) 158 except smtplib.SMTPException as e: 159 self._logger.exception( 160 f"Exception while authenticating to smtp: {str(e)}" 161 ) 162 self._logger.exception( 163 "Attempting to send the notification without authentication" 164 ) 165 166 mesg = MIMEMultipart() 167 mesg["From"] = self.notification["from"] 168 mesg["To"] = ", ".join(self.notification["to"]) 169 mesg["Subject"] = self.notification["subject"] 170 171 body = MIMEText(self.notification["message"], self.notification["mimetype"]) 172 mesg.attach(body) 173 try: 174 smtp.sendmail( 175 self.notification["from"], self.notification["to"], mesg.as_string() 176 ) 177 self._logger.info("Email sent successfully.") 178 except smtplib.SMTPException as e: 179 self._logger.exception(f"Exception while sending email: {str(e)}")
Base Notification class.
EmailNotifier(notification_spec: lakehouse_engine.core.definitions.TerminatorSpec)
24 def __init__(self, notification_spec: TerminatorSpec): 25 """Construct Email Notification instance. 26 27 Args: 28 notification_spec: notification specification. 29 """ 30 super().__init__(notification_spec)
Construct Email Notification instance.
Arguments:
- notification_spec: notification specification.
def
create_notification(self) -> None:
32 def create_notification(self) -> None: 33 """Creates the notification to be sent.""" 34 given_args = self.notification.get("args", {}) 35 36 if "template" in self.notification.keys(): 37 template: dict = NotificationsTemplates.EMAIL_NOTIFICATIONS_TEMPLATES.get( 38 self.notification["template"], {} 39 ) 40 41 if template: 42 template_args = template["args"] 43 44 Notifier._check_args_are_correct(given_args, copy(template_args)) 45 46 self.notification["message"] = self._render_notification_field( 47 template["message"], given_args 48 ) 49 self.notification["subject"] = self._render_notification_field( 50 template["subject"], given_args 51 ) 52 self.notification["mimetype"] = template["mimetype"] 53 54 else: 55 raise ValueError( 56 f"""Template {self.notification["template"]} does not exist""" 57 ) 58 59 elif "message" in self.notification.keys(): 60 if given_args: 61 self.notification["message"] = self._render_notification_field( 62 self.notification["message"], given_args 63 ) 64 self.notification["subject"] = self._render_notification_field( 65 self.notification["subject"], given_args 66 ) 67 else: 68 raise ValueError("Malformed Notification Definition")
Creates the notification to be sent.
def
send_notification(self) -> None:
70 def send_notification(self) -> None: 71 """Sends the notification by using a series of methods.""" 72 server = self.notification["server"] 73 notification_office_email_servers = ["smtp.office365.com"] 74 75 if ( 76 ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers is not None 77 and server in ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers 78 ): 79 raise ValueError( 80 f"Trying to use disallowed smtp server: '{server}'.\n" 81 f"Disallowed smtp servers: " 82 f"{str(ExecEnv.ENGINE_CONFIG.notif_disallowed_email_servers)}" 83 ) 84 elif server in notification_office_email_servers: 85 self._authenticate_and_send_office365() 86 else: 87 self._authenticate_and_send_simple_smtp()
Sends the notification by using a series of methods.