lakehouse_engine.terminators.terminator_factory

Module with the factory pattern to return terminators.

 1"""Module with the factory pattern to return terminators."""
 2
 3from typing import Optional
 4
 5from pyspark.sql import DataFrame
 6
 7from lakehouse_engine.core.definitions import TerminatorSpec
 8from lakehouse_engine.terminators.notifier import Notifier
 9from lakehouse_engine.terminators.notifier_factory import NotifierFactory
10from lakehouse_engine.utils.logging_handler import LoggingHandler
11
12
13class TerminatorFactory(object):
14    """TerminatorFactory class following the factory pattern."""
15
16    _logger = LoggingHandler(__name__).get_logger()
17
18    @staticmethod
19    def execute_terminator(
20        spec: TerminatorSpec, df: Optional[DataFrame] = None
21    ) -> None:
22        """Execute a terminator following the factory pattern.
23
24        Args:
25            spec: terminator specification.
26            df: dataframe to be used in the terminator. Needed when a
27                terminator requires one dataframe as input.
28
29        Returns:
30            Transformer function to be executed in .transform() spark function.
31        """
32        if spec.function == "optimize_dataset":
33            from lakehouse_engine.terminators.dataset_optimizer import DatasetOptimizer
34
35            DatasetOptimizer.optimize_dataset(**spec.args)
36        elif spec.function == "terminate_spark":
37            from lakehouse_engine.terminators.spark_terminator import SparkTerminator
38
39            SparkTerminator.terminate_spark()
40        elif spec.function == "expose_cdf":
41            from lakehouse_engine.terminators.cdf_processor import CDFProcessor
42
43            CDFProcessor.expose_cdf(spec)
44        elif spec.function == "notify":
45            if not Notifier.check_if_notification_is_failure_notification(spec):
46                notifier = NotifierFactory.get_notifier(spec)
47                notifier.create_notification()
48                notifier.send_notification()
49        else:
50            raise NotImplementedError(
51                f"The requested terminator {spec.function} is not implemented."
52            )
class TerminatorFactory:
14class TerminatorFactory(object):
15    """TerminatorFactory class following the factory pattern."""
16
17    _logger = LoggingHandler(__name__).get_logger()
18
19    @staticmethod
20    def execute_terminator(
21        spec: TerminatorSpec, df: Optional[DataFrame] = None
22    ) -> None:
23        """Execute a terminator following the factory pattern.
24
25        Args:
26            spec: terminator specification.
27            df: dataframe to be used in the terminator. Needed when a
28                terminator requires one dataframe as input.
29
30        Returns:
31            Transformer function to be executed in .transform() spark function.
32        """
33        if spec.function == "optimize_dataset":
34            from lakehouse_engine.terminators.dataset_optimizer import DatasetOptimizer
35
36            DatasetOptimizer.optimize_dataset(**spec.args)
37        elif spec.function == "terminate_spark":
38            from lakehouse_engine.terminators.spark_terminator import SparkTerminator
39
40            SparkTerminator.terminate_spark()
41        elif spec.function == "expose_cdf":
42            from lakehouse_engine.terminators.cdf_processor import CDFProcessor
43
44            CDFProcessor.expose_cdf(spec)
45        elif spec.function == "notify":
46            if not Notifier.check_if_notification_is_failure_notification(spec):
47                notifier = NotifierFactory.get_notifier(spec)
48                notifier.create_notification()
49                notifier.send_notification()
50        else:
51            raise NotImplementedError(
52                f"The requested terminator {spec.function} is not implemented."
53            )

TerminatorFactory class following the factory pattern.

@staticmethod
def execute_terminator( spec: lakehouse_engine.core.definitions.TerminatorSpec, df: Optional[pyspark.sql.dataframe.DataFrame] = None) -> None:
19    @staticmethod
20    def execute_terminator(
21        spec: TerminatorSpec, df: Optional[DataFrame] = None
22    ) -> None:
23        """Execute a terminator following the factory pattern.
24
25        Args:
26            spec: terminator specification.
27            df: dataframe to be used in the terminator. Needed when a
28                terminator requires one dataframe as input.
29
30        Returns:
31            Transformer function to be executed in .transform() spark function.
32        """
33        if spec.function == "optimize_dataset":
34            from lakehouse_engine.terminators.dataset_optimizer import DatasetOptimizer
35
36            DatasetOptimizer.optimize_dataset(**spec.args)
37        elif spec.function == "terminate_spark":
38            from lakehouse_engine.terminators.spark_terminator import SparkTerminator
39
40            SparkTerminator.terminate_spark()
41        elif spec.function == "expose_cdf":
42            from lakehouse_engine.terminators.cdf_processor import CDFProcessor
43
44            CDFProcessor.expose_cdf(spec)
45        elif spec.function == "notify":
46            if not Notifier.check_if_notification_is_failure_notification(spec):
47                notifier = NotifierFactory.get_notifier(spec)
48                notifier.create_notification()
49                notifier.send_notification()
50        else:
51            raise NotImplementedError(
52                f"The requested terminator {spec.function} is not implemented."
53            )

Execute a terminator following the factory pattern.

Arguments:
  • spec: terminator specification.
  • df: dataframe to be used in the terminator. Needed when a terminator requires one dataframe as input.
Returns:

Transformer function to be executed in .transform() spark function.