lakehouse_engine.io.writer

Defines abstract writer behaviour.

  1"""Defines abstract writer behaviour."""
  2
  3from abc import ABC, abstractmethod
  4from typing import Any, Callable, Dict, List, Optional, OrderedDict
  5
  6from pyspark.sql import DataFrame
  7from pyspark.sql.functions import lit
  8
  9from lakehouse_engine.core.definitions import DQSpec, OutputSpec
 10from lakehouse_engine.core.exec_env import ExecEnv
 11from lakehouse_engine.transformers.transformer_factory import TransformerFactory
 12from lakehouse_engine.utils.logging_handler import LoggingHandler
 13
 14
 15class Writer(ABC):
 16    """Abstract Writer class."""
 17
 18    def __init__(
 19        self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict = None
 20    ):
 21        """Construct Writer instances.
 22
 23        Args:
 24            output_spec: output specification to write data.
 25            df: dataframe to write.
 26            data: list of all dfs generated on previous steps before writer.
 27        """
 28        self._logger = LoggingHandler(self.__class__.__name__).get_logger()
 29        self._output_spec = output_spec
 30        self._df = df
 31        self._data = data
 32
 33    @abstractmethod
 34    def write(self) -> Optional[OrderedDict]:
 35        """Abstract write method."""
 36        raise NotImplementedError
 37
 38    @staticmethod
 39    def write_transformed_micro_batch(**kwargs: Any) -> Callable:
 40        """Define how to write a streaming micro batch after transforming it.
 41
 42        This function must define an inner function that manipulates a streaming batch,
 43        and then return that function. Look for concrete implementations of this
 44        function for more clarity.
 45
 46        Args:
 47            kwargs: any keyword arguments.
 48
 49        Returns:
 50            A function to be executed in the foreachBatch spark write method.
 51        """
 52
 53        def inner(batch_df: DataFrame, batch_id: int) -> None:
 54            logger = LoggingHandler(__name__).get_logger()
 55            logger.warning("Skipping transform micro batch... nothing to do.")
 56
 57        return inner
 58
 59    @classmethod
 60    def get_transformed_micro_batch(
 61        cls,
 62        output_spec: OutputSpec,
 63        batch_df: DataFrame,
 64        batch_id: int,
 65        data: OrderedDict,
 66    ) -> DataFrame:
 67        """Get the result of the transformations applied to a micro batch dataframe.
 68
 69        Args:
 70            output_spec: output specification associated with the writer.
 71            batch_df: batch dataframe (given from streaming foreachBatch).
 72            batch_id: if of the batch (given from streaming foreachBatch).
 73            data: list of all dfs generated on previous steps before writer
 74                to be available on micro batch transforms.
 75
 76        Returns:
 77            The transformed dataframe.
 78        """
 79        # forcing session to be available inside forEachBatch on
 80        # Spark Connect
 81        ExecEnv.get_or_create()
 82        transformed_df = batch_df
 83        if output_spec.with_batch_id:
 84            transformed_df = transformed_df.withColumn("lhe_batch_id", lit(batch_id))
 85
 86        for transformer in output_spec.streaming_micro_batch_transformers:
 87            transformed_df = transformed_df.transform(
 88                TransformerFactory.get_transformer(transformer, data)
 89            )
 90
 91        return transformed_df
 92
 93    @classmethod
 94    def get_streaming_trigger(cls, output_spec: OutputSpec) -> Dict:
 95        """Define which streaming trigger will be used.
 96
 97        Args:
 98            output_spec: output specification.
 99
100        Returns:
101            A dict containing streaming trigger.
102        """
103        trigger: Dict[str, Any] = {}
104
105        if output_spec.streaming_available_now:
106            trigger["availableNow"] = output_spec.streaming_available_now
107        elif output_spec.streaming_once:
108            trigger["once"] = output_spec.streaming_once
109        elif output_spec.streaming_processing_time:
110            trigger["processingTime"] = output_spec.streaming_processing_time
111        elif output_spec.streaming_continuous:
112            trigger["continuous"] = output_spec.streaming_continuous
113        else:
114            raise NotImplementedError(
115                "The requested output spec streaming trigger is not supported."
116            )
117
118        return trigger
119
120    @staticmethod
121    def run_micro_batch_dq_process(df: DataFrame, dq_spec: List[DQSpec]) -> DataFrame:
122        """Run the data quality process in a streaming micro batch dataframe.
123
124        Iterates over the specs and performs the checks or analysis depending on the
125        data quality specification provided in the configuration.
126
127        Args:
128            df: the dataframe in which to run the dq process on.
129            dq_spec: data quality specification.
130
131        Returns: the validated dataframe.
132        """
133        from lakehouse_engine.dq_processors.dq_factory import DQFactory
134
135        # forcing session to be available inside forEachBatch on
136        # Spark Connect
137        ExecEnv.get_or_create()
138
139        validated_df = df
140        for spec in dq_spec:
141            validated_df = DQFactory.run_dq_process(spec, df)
142
143        return validated_df
class Writer(abc.ABC):
 16class Writer(ABC):
 17    """Abstract Writer class."""
 18
 19    def __init__(
 20        self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict = None
 21    ):
 22        """Construct Writer instances.
 23
 24        Args:
 25            output_spec: output specification to write data.
 26            df: dataframe to write.
 27            data: list of all dfs generated on previous steps before writer.
 28        """
 29        self._logger = LoggingHandler(self.__class__.__name__).get_logger()
 30        self._output_spec = output_spec
 31        self._df = df
 32        self._data = data
 33
 34    @abstractmethod
 35    def write(self) -> Optional[OrderedDict]:
 36        """Abstract write method."""
 37        raise NotImplementedError
 38
 39    @staticmethod
 40    def write_transformed_micro_batch(**kwargs: Any) -> Callable:
 41        """Define how to write a streaming micro batch after transforming it.
 42
 43        This function must define an inner function that manipulates a streaming batch,
 44        and then return that function. Look for concrete implementations of this
 45        function for more clarity.
 46
 47        Args:
 48            kwargs: any keyword arguments.
 49
 50        Returns:
 51            A function to be executed in the foreachBatch spark write method.
 52        """
 53
 54        def inner(batch_df: DataFrame, batch_id: int) -> None:
 55            logger = LoggingHandler(__name__).get_logger()
 56            logger.warning("Skipping transform micro batch... nothing to do.")
 57
 58        return inner
 59
 60    @classmethod
 61    def get_transformed_micro_batch(
 62        cls,
 63        output_spec: OutputSpec,
 64        batch_df: DataFrame,
 65        batch_id: int,
 66        data: OrderedDict,
 67    ) -> DataFrame:
 68        """Get the result of the transformations applied to a micro batch dataframe.
 69
 70        Args:
 71            output_spec: output specification associated with the writer.
 72            batch_df: batch dataframe (given from streaming foreachBatch).
 73            batch_id: if of the batch (given from streaming foreachBatch).
 74            data: list of all dfs generated on previous steps before writer
 75                to be available on micro batch transforms.
 76
 77        Returns:
 78            The transformed dataframe.
 79        """
 80        # forcing session to be available inside forEachBatch on
 81        # Spark Connect
 82        ExecEnv.get_or_create()
 83        transformed_df = batch_df
 84        if output_spec.with_batch_id:
 85            transformed_df = transformed_df.withColumn("lhe_batch_id", lit(batch_id))
 86
 87        for transformer in output_spec.streaming_micro_batch_transformers:
 88            transformed_df = transformed_df.transform(
 89                TransformerFactory.get_transformer(transformer, data)
 90            )
 91
 92        return transformed_df
 93
 94    @classmethod
 95    def get_streaming_trigger(cls, output_spec: OutputSpec) -> Dict:
 96        """Define which streaming trigger will be used.
 97
 98        Args:
 99            output_spec: output specification.
100
101        Returns:
102            A dict containing streaming trigger.
103        """
104        trigger: Dict[str, Any] = {}
105
106        if output_spec.streaming_available_now:
107            trigger["availableNow"] = output_spec.streaming_available_now
108        elif output_spec.streaming_once:
109            trigger["once"] = output_spec.streaming_once
110        elif output_spec.streaming_processing_time:
111            trigger["processingTime"] = output_spec.streaming_processing_time
112        elif output_spec.streaming_continuous:
113            trigger["continuous"] = output_spec.streaming_continuous
114        else:
115            raise NotImplementedError(
116                "The requested output spec streaming trigger is not supported."
117            )
118
119        return trigger
120
121    @staticmethod
122    def run_micro_batch_dq_process(df: DataFrame, dq_spec: List[DQSpec]) -> DataFrame:
123        """Run the data quality process in a streaming micro batch dataframe.
124
125        Iterates over the specs and performs the checks or analysis depending on the
126        data quality specification provided in the configuration.
127
128        Args:
129            df: the dataframe in which to run the dq process on.
130            dq_spec: data quality specification.
131
132        Returns: the validated dataframe.
133        """
134        from lakehouse_engine.dq_processors.dq_factory import DQFactory
135
136        # forcing session to be available inside forEachBatch on
137        # Spark Connect
138        ExecEnv.get_or_create()
139
140        validated_df = df
141        for spec in dq_spec:
142            validated_df = DQFactory.run_dq_process(spec, df)
143
144        return validated_df

Abstract Writer class.

Writer( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict = None)
19    def __init__(
20        self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict = None
21    ):
22        """Construct Writer instances.
23
24        Args:
25            output_spec: output specification to write data.
26            df: dataframe to write.
27            data: list of all dfs generated on previous steps before writer.
28        """
29        self._logger = LoggingHandler(self.__class__.__name__).get_logger()
30        self._output_spec = output_spec
31        self._df = df
32        self._data = data

Construct Writer instances.

Arguments:
  • output_spec: output specification to write data.
  • df: dataframe to write.
  • data: list of all dfs generated on previous steps before writer.
@abstractmethod
def write(self) -> Optional[OrderedDict]:
34    @abstractmethod
35    def write(self) -> Optional[OrderedDict]:
36        """Abstract write method."""
37        raise NotImplementedError

Abstract write method.

@staticmethod
def write_transformed_micro_batch(**kwargs: Any) -> Callable:
39    @staticmethod
40    def write_transformed_micro_batch(**kwargs: Any) -> Callable:
41        """Define how to write a streaming micro batch after transforming it.
42
43        This function must define an inner function that manipulates a streaming batch,
44        and then return that function. Look for concrete implementations of this
45        function for more clarity.
46
47        Args:
48            kwargs: any keyword arguments.
49
50        Returns:
51            A function to be executed in the foreachBatch spark write method.
52        """
53
54        def inner(batch_df: DataFrame, batch_id: int) -> None:
55            logger = LoggingHandler(__name__).get_logger()
56            logger.warning("Skipping transform micro batch... nothing to do.")
57
58        return inner

Define how to write a streaming micro batch after transforming it.

This function must define an inner function that manipulates a streaming batch, and then return that function. Look for concrete implementations of this function for more clarity.

Arguments:
  • kwargs: any keyword arguments.
Returns:

A function to be executed in the foreachBatch spark write method.

@classmethod
def get_transformed_micro_batch( cls, output_spec: lakehouse_engine.core.definitions.OutputSpec, batch_df: pyspark.sql.dataframe.DataFrame, batch_id: int, data: OrderedDict) -> pyspark.sql.dataframe.DataFrame:
60    @classmethod
61    def get_transformed_micro_batch(
62        cls,
63        output_spec: OutputSpec,
64        batch_df: DataFrame,
65        batch_id: int,
66        data: OrderedDict,
67    ) -> DataFrame:
68        """Get the result of the transformations applied to a micro batch dataframe.
69
70        Args:
71            output_spec: output specification associated with the writer.
72            batch_df: batch dataframe (given from streaming foreachBatch).
73            batch_id: if of the batch (given from streaming foreachBatch).
74            data: list of all dfs generated on previous steps before writer
75                to be available on micro batch transforms.
76
77        Returns:
78            The transformed dataframe.
79        """
80        # forcing session to be available inside forEachBatch on
81        # Spark Connect
82        ExecEnv.get_or_create()
83        transformed_df = batch_df
84        if output_spec.with_batch_id:
85            transformed_df = transformed_df.withColumn("lhe_batch_id", lit(batch_id))
86
87        for transformer in output_spec.streaming_micro_batch_transformers:
88            transformed_df = transformed_df.transform(
89                TransformerFactory.get_transformer(transformer, data)
90            )
91
92        return transformed_df

Get the result of the transformations applied to a micro batch dataframe.

Arguments:
  • output_spec: output specification associated with the writer.
  • batch_df: batch dataframe (given from streaming foreachBatch).
  • batch_id: if of the batch (given from streaming foreachBatch).
  • data: list of all dfs generated on previous steps before writer to be available on micro batch transforms.
Returns:

The transformed dataframe.

@classmethod
def get_streaming_trigger(cls, output_spec: lakehouse_engine.core.definitions.OutputSpec) -> Dict:
 94    @classmethod
 95    def get_streaming_trigger(cls, output_spec: OutputSpec) -> Dict:
 96        """Define which streaming trigger will be used.
 97
 98        Args:
 99            output_spec: output specification.
100
101        Returns:
102            A dict containing streaming trigger.
103        """
104        trigger: Dict[str, Any] = {}
105
106        if output_spec.streaming_available_now:
107            trigger["availableNow"] = output_spec.streaming_available_now
108        elif output_spec.streaming_once:
109            trigger["once"] = output_spec.streaming_once
110        elif output_spec.streaming_processing_time:
111            trigger["processingTime"] = output_spec.streaming_processing_time
112        elif output_spec.streaming_continuous:
113            trigger["continuous"] = output_spec.streaming_continuous
114        else:
115            raise NotImplementedError(
116                "The requested output spec streaming trigger is not supported."
117            )
118
119        return trigger

Define which streaming trigger will be used.

Arguments:
  • output_spec: output specification.
Returns:

A dict containing streaming trigger.

@staticmethod
def run_micro_batch_dq_process( df: pyspark.sql.dataframe.DataFrame, dq_spec: List[lakehouse_engine.core.definitions.DQSpec]) -> pyspark.sql.dataframe.DataFrame:
121    @staticmethod
122    def run_micro_batch_dq_process(df: DataFrame, dq_spec: List[DQSpec]) -> DataFrame:
123        """Run the data quality process in a streaming micro batch dataframe.
124
125        Iterates over the specs and performs the checks or analysis depending on the
126        data quality specification provided in the configuration.
127
128        Args:
129            df: the dataframe in which to run the dq process on.
130            dq_spec: data quality specification.
131
132        Returns: the validated dataframe.
133        """
134        from lakehouse_engine.dq_processors.dq_factory import DQFactory
135
136        # forcing session to be available inside forEachBatch on
137        # Spark Connect
138        ExecEnv.get_or_create()
139
140        validated_df = df
141        for spec in dq_spec:
142            validated_df = DQFactory.run_dq_process(spec, df)
143
144        return validated_df

Run the data quality process in a streaming micro batch dataframe.

Iterates over the specs and performs the checks or analysis depending on the data quality specification provided in the configuration.

Arguments:
  • df: the dataframe in which to run the dq process on.
  • dq_spec: data quality specification.

Returns: the validated dataframe.