lakehouse_engine.terminators.terminator_factory
Module with the factory pattern to return terminators.
1"""Module with the factory pattern to return terminators.""" 2 3from typing import Optional 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.definitions import TerminatorSpec 8from lakehouse_engine.terminators.notifier import Notifier 9from lakehouse_engine.terminators.notifier_factory import NotifierFactory 10from lakehouse_engine.utils.logging_handler import LoggingHandler 11 12 13class TerminatorFactory(object): 14 """TerminatorFactory class following the factory pattern.""" 15 16 _logger = LoggingHandler(__name__).get_logger() 17 18 @staticmethod 19 def execute_terminator( 20 spec: TerminatorSpec, df: Optional[DataFrame] = None 21 ) -> None: 22 """Execute a terminator following the factory pattern. 23 24 Args: 25 spec: terminator specification. 26 df: dataframe to be used in the terminator. Needed when a 27 terminator requires one dataframe as input. 28 29 Returns: 30 Transformer function to be executed in .transform() spark function. 31 """ 32 if spec.function == "optimize_dataset": 33 from lakehouse_engine.terminators.dataset_optimizer import DatasetOptimizer 34 35 DatasetOptimizer.optimize_dataset(**spec.args) 36 elif spec.function == "terminate_spark": 37 from lakehouse_engine.terminators.spark_terminator import SparkTerminator 38 39 SparkTerminator.terminate_spark() 40 elif spec.function == "expose_cdf": 41 from lakehouse_engine.terminators.cdf_processor import CDFProcessor 42 43 CDFProcessor.expose_cdf(spec) 44 elif spec.function == "notify": 45 if not Notifier.check_if_notification_is_failure_notification(spec): 46 notifier = NotifierFactory.get_notifier(spec) 47 notifier.create_notification() 48 notifier.send_notification() 49 else: 50 raise NotImplementedError( 51 f"The requested terminator {spec.function} is not implemented." 52 )
class
TerminatorFactory:
14class TerminatorFactory(object): 15 """TerminatorFactory class following the factory pattern.""" 16 17 _logger = LoggingHandler(__name__).get_logger() 18 19 @staticmethod 20 def execute_terminator( 21 spec: TerminatorSpec, df: Optional[DataFrame] = None 22 ) -> None: 23 """Execute a terminator following the factory pattern. 24 25 Args: 26 spec: terminator specification. 27 df: dataframe to be used in the terminator. Needed when a 28 terminator requires one dataframe as input. 29 30 Returns: 31 Transformer function to be executed in .transform() spark function. 32 """ 33 if spec.function == "optimize_dataset": 34 from lakehouse_engine.terminators.dataset_optimizer import DatasetOptimizer 35 36 DatasetOptimizer.optimize_dataset(**spec.args) 37 elif spec.function == "terminate_spark": 38 from lakehouse_engine.terminators.spark_terminator import SparkTerminator 39 40 SparkTerminator.terminate_spark() 41 elif spec.function == "expose_cdf": 42 from lakehouse_engine.terminators.cdf_processor import CDFProcessor 43 44 CDFProcessor.expose_cdf(spec) 45 elif spec.function == "notify": 46 if not Notifier.check_if_notification_is_failure_notification(spec): 47 notifier = NotifierFactory.get_notifier(spec) 48 notifier.create_notification() 49 notifier.send_notification() 50 else: 51 raise NotImplementedError( 52 f"The requested terminator {spec.function} is not implemented." 53 )
TerminatorFactory class following the factory pattern.
@staticmethod
def
execute_terminator( spec: lakehouse_engine.core.definitions.TerminatorSpec, df: Optional[pyspark.sql.dataframe.DataFrame] = None) -> None:
19 @staticmethod 20 def execute_terminator( 21 spec: TerminatorSpec, df: Optional[DataFrame] = None 22 ) -> None: 23 """Execute a terminator following the factory pattern. 24 25 Args: 26 spec: terminator specification. 27 df: dataframe to be used in the terminator. Needed when a 28 terminator requires one dataframe as input. 29 30 Returns: 31 Transformer function to be executed in .transform() spark function. 32 """ 33 if spec.function == "optimize_dataset": 34 from lakehouse_engine.terminators.dataset_optimizer import DatasetOptimizer 35 36 DatasetOptimizer.optimize_dataset(**spec.args) 37 elif spec.function == "terminate_spark": 38 from lakehouse_engine.terminators.spark_terminator import SparkTerminator 39 40 SparkTerminator.terminate_spark() 41 elif spec.function == "expose_cdf": 42 from lakehouse_engine.terminators.cdf_processor import CDFProcessor 43 44 CDFProcessor.expose_cdf(spec) 45 elif spec.function == "notify": 46 if not Notifier.check_if_notification_is_failure_notification(spec): 47 notifier = NotifierFactory.get_notifier(spec) 48 notifier.create_notification() 49 notifier.send_notification() 50 else: 51 raise NotImplementedError( 52 f"The requested terminator {spec.function} is not implemented." 53 )
Execute a terminator following the factory pattern.
Arguments:
- spec: terminator specification.
- df: dataframe to be used in the terminator. Needed when a terminator requires one dataframe as input.
Returns:
Transformer function to be executed in .transform() spark function.