lakehouse_engine.io.writer
Defines abstract writer behaviour.
1"""Defines abstract writer behaviour.""" 2 3from abc import ABC, abstractmethod 4from typing import Any, Callable, Dict, List, Optional, OrderedDict 5 6from pyspark.sql import DataFrame 7from pyspark.sql.functions import lit 8 9from lakehouse_engine.core.definitions import DQSpec, OutputSpec 10from lakehouse_engine.core.exec_env import ExecEnv 11from lakehouse_engine.transformers.transformer_factory import TransformerFactory 12from lakehouse_engine.utils.logging_handler import LoggingHandler 13 14 15class Writer(ABC): 16 """Abstract Writer class.""" 17 18 def __init__( 19 self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict = None 20 ): 21 """Construct Writer instances. 22 23 Args: 24 output_spec: output specification to write data. 25 df: dataframe to write. 26 data: list of all dfs generated on previous steps before writer. 27 """ 28 self._logger = LoggingHandler(self.__class__.__name__).get_logger() 29 self._output_spec = output_spec 30 self._df = df 31 self._data = data 32 33 @abstractmethod 34 def write(self) -> Optional[OrderedDict]: 35 """Abstract write method.""" 36 raise NotImplementedError 37 38 @staticmethod 39 def write_transformed_micro_batch(**kwargs: Any) -> Callable: 40 """Define how to write a streaming micro batch after transforming it. 41 42 This function must define an inner function that manipulates a streaming batch, 43 and then return that function. Look for concrete implementations of this 44 function for more clarity. 45 46 Args: 47 kwargs: any keyword arguments. 48 49 Returns: 50 A function to be executed in the foreachBatch spark write method. 51 """ 52 53 def inner(batch_df: DataFrame, batch_id: int) -> None: 54 logger = LoggingHandler(__name__).get_logger() 55 logger.warning("Skipping transform micro batch... nothing to do.") 56 57 return inner 58 59 @classmethod 60 def get_transformed_micro_batch( 61 cls, 62 output_spec: OutputSpec, 63 batch_df: DataFrame, 64 batch_id: int, 65 data: OrderedDict, 66 ) -> DataFrame: 67 """Get the result of the transformations applied to a micro batch dataframe. 68 69 Args: 70 output_spec: output specification associated with the writer. 71 batch_df: batch dataframe (given from streaming foreachBatch). 72 batch_id: if of the batch (given from streaming foreachBatch). 73 data: list of all dfs generated on previous steps before writer 74 to be available on micro batch transforms. 75 76 Returns: 77 The transformed dataframe. 78 """ 79 # forcing session to be available inside forEachBatch on 80 # Spark Connect 81 ExecEnv.get_or_create() 82 transformed_df = batch_df 83 if output_spec.with_batch_id: 84 transformed_df = transformed_df.withColumn("lhe_batch_id", lit(batch_id)) 85 86 for transformer in output_spec.streaming_micro_batch_transformers: 87 transformed_df = transformed_df.transform( 88 TransformerFactory.get_transformer(transformer, data) 89 ) 90 91 return transformed_df 92 93 @classmethod 94 def get_streaming_trigger(cls, output_spec: OutputSpec) -> Dict: 95 """Define which streaming trigger will be used. 96 97 Args: 98 output_spec: output specification. 99 100 Returns: 101 A dict containing streaming trigger. 102 """ 103 trigger: Dict[str, Any] = {} 104 105 if output_spec.streaming_available_now: 106 trigger["availableNow"] = output_spec.streaming_available_now 107 elif output_spec.streaming_once: 108 trigger["once"] = output_spec.streaming_once 109 elif output_spec.streaming_processing_time: 110 trigger["processingTime"] = output_spec.streaming_processing_time 111 elif output_spec.streaming_continuous: 112 trigger["continuous"] = output_spec.streaming_continuous 113 else: 114 raise NotImplementedError( 115 "The requested output spec streaming trigger is not supported." 116 ) 117 118 return trigger 119 120 @staticmethod 121 def run_micro_batch_dq_process(df: DataFrame, dq_spec: List[DQSpec]) -> DataFrame: 122 """Run the data quality process in a streaming micro batch dataframe. 123 124 Iterates over the specs and performs the checks or analysis depending on the 125 data quality specification provided in the configuration. 126 127 Args: 128 df: the dataframe in which to run the dq process on. 129 dq_spec: data quality specification. 130 131 Returns: the validated dataframe. 132 """ 133 from lakehouse_engine.dq_processors.dq_factory import DQFactory 134 135 # forcing session to be available inside forEachBatch on 136 # Spark Connect 137 ExecEnv.get_or_create() 138 139 validated_df = df 140 for spec in dq_spec: 141 validated_df = DQFactory.run_dq_process(spec, df) 142 143 return validated_df
16class Writer(ABC): 17 """Abstract Writer class.""" 18 19 def __init__( 20 self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict = None 21 ): 22 """Construct Writer instances. 23 24 Args: 25 output_spec: output specification to write data. 26 df: dataframe to write. 27 data: list of all dfs generated on previous steps before writer. 28 """ 29 self._logger = LoggingHandler(self.__class__.__name__).get_logger() 30 self._output_spec = output_spec 31 self._df = df 32 self._data = data 33 34 @abstractmethod 35 def write(self) -> Optional[OrderedDict]: 36 """Abstract write method.""" 37 raise NotImplementedError 38 39 @staticmethod 40 def write_transformed_micro_batch(**kwargs: Any) -> Callable: 41 """Define how to write a streaming micro batch after transforming it. 42 43 This function must define an inner function that manipulates a streaming batch, 44 and then return that function. Look for concrete implementations of this 45 function for more clarity. 46 47 Args: 48 kwargs: any keyword arguments. 49 50 Returns: 51 A function to be executed in the foreachBatch spark write method. 52 """ 53 54 def inner(batch_df: DataFrame, batch_id: int) -> None: 55 logger = LoggingHandler(__name__).get_logger() 56 logger.warning("Skipping transform micro batch... nothing to do.") 57 58 return inner 59 60 @classmethod 61 def get_transformed_micro_batch( 62 cls, 63 output_spec: OutputSpec, 64 batch_df: DataFrame, 65 batch_id: int, 66 data: OrderedDict, 67 ) -> DataFrame: 68 """Get the result of the transformations applied to a micro batch dataframe. 69 70 Args: 71 output_spec: output specification associated with the writer. 72 batch_df: batch dataframe (given from streaming foreachBatch). 73 batch_id: if of the batch (given from streaming foreachBatch). 74 data: list of all dfs generated on previous steps before writer 75 to be available on micro batch transforms. 76 77 Returns: 78 The transformed dataframe. 79 """ 80 # forcing session to be available inside forEachBatch on 81 # Spark Connect 82 ExecEnv.get_or_create() 83 transformed_df = batch_df 84 if output_spec.with_batch_id: 85 transformed_df = transformed_df.withColumn("lhe_batch_id", lit(batch_id)) 86 87 for transformer in output_spec.streaming_micro_batch_transformers: 88 transformed_df = transformed_df.transform( 89 TransformerFactory.get_transformer(transformer, data) 90 ) 91 92 return transformed_df 93 94 @classmethod 95 def get_streaming_trigger(cls, output_spec: OutputSpec) -> Dict: 96 """Define which streaming trigger will be used. 97 98 Args: 99 output_spec: output specification. 100 101 Returns: 102 A dict containing streaming trigger. 103 """ 104 trigger: Dict[str, Any] = {} 105 106 if output_spec.streaming_available_now: 107 trigger["availableNow"] = output_spec.streaming_available_now 108 elif output_spec.streaming_once: 109 trigger["once"] = output_spec.streaming_once 110 elif output_spec.streaming_processing_time: 111 trigger["processingTime"] = output_spec.streaming_processing_time 112 elif output_spec.streaming_continuous: 113 trigger["continuous"] = output_spec.streaming_continuous 114 else: 115 raise NotImplementedError( 116 "The requested output spec streaming trigger is not supported." 117 ) 118 119 return trigger 120 121 @staticmethod 122 def run_micro_batch_dq_process(df: DataFrame, dq_spec: List[DQSpec]) -> DataFrame: 123 """Run the data quality process in a streaming micro batch dataframe. 124 125 Iterates over the specs and performs the checks or analysis depending on the 126 data quality specification provided in the configuration. 127 128 Args: 129 df: the dataframe in which to run the dq process on. 130 dq_spec: data quality specification. 131 132 Returns: the validated dataframe. 133 """ 134 from lakehouse_engine.dq_processors.dq_factory import DQFactory 135 136 # forcing session to be available inside forEachBatch on 137 # Spark Connect 138 ExecEnv.get_or_create() 139 140 validated_df = df 141 for spec in dq_spec: 142 validated_df = DQFactory.run_dq_process(spec, df) 143 144 return validated_df
Abstract Writer class.
19 def __init__( 20 self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict = None 21 ): 22 """Construct Writer instances. 23 24 Args: 25 output_spec: output specification to write data. 26 df: dataframe to write. 27 data: list of all dfs generated on previous steps before writer. 28 """ 29 self._logger = LoggingHandler(self.__class__.__name__).get_logger() 30 self._output_spec = output_spec 31 self._df = df 32 self._data = data
Construct Writer instances.
Arguments:
- output_spec: output specification to write data.
- df: dataframe to write.
- data: list of all dfs generated on previous steps before writer.
34 @abstractmethod 35 def write(self) -> Optional[OrderedDict]: 36 """Abstract write method.""" 37 raise NotImplementedError
Abstract write method.
39 @staticmethod 40 def write_transformed_micro_batch(**kwargs: Any) -> Callable: 41 """Define how to write a streaming micro batch after transforming it. 42 43 This function must define an inner function that manipulates a streaming batch, 44 and then return that function. Look for concrete implementations of this 45 function for more clarity. 46 47 Args: 48 kwargs: any keyword arguments. 49 50 Returns: 51 A function to be executed in the foreachBatch spark write method. 52 """ 53 54 def inner(batch_df: DataFrame, batch_id: int) -> None: 55 logger = LoggingHandler(__name__).get_logger() 56 logger.warning("Skipping transform micro batch... nothing to do.") 57 58 return inner
Define how to write a streaming micro batch after transforming it.
This function must define an inner function that manipulates a streaming batch, and then return that function. Look for concrete implementations of this function for more clarity.
Arguments:
- kwargs: any keyword arguments.
Returns:
A function to be executed in the foreachBatch spark write method.
60 @classmethod 61 def get_transformed_micro_batch( 62 cls, 63 output_spec: OutputSpec, 64 batch_df: DataFrame, 65 batch_id: int, 66 data: OrderedDict, 67 ) -> DataFrame: 68 """Get the result of the transformations applied to a micro batch dataframe. 69 70 Args: 71 output_spec: output specification associated with the writer. 72 batch_df: batch dataframe (given from streaming foreachBatch). 73 batch_id: if of the batch (given from streaming foreachBatch). 74 data: list of all dfs generated on previous steps before writer 75 to be available on micro batch transforms. 76 77 Returns: 78 The transformed dataframe. 79 """ 80 # forcing session to be available inside forEachBatch on 81 # Spark Connect 82 ExecEnv.get_or_create() 83 transformed_df = batch_df 84 if output_spec.with_batch_id: 85 transformed_df = transformed_df.withColumn("lhe_batch_id", lit(batch_id)) 86 87 for transformer in output_spec.streaming_micro_batch_transformers: 88 transformed_df = transformed_df.transform( 89 TransformerFactory.get_transformer(transformer, data) 90 ) 91 92 return transformed_df
Get the result of the transformations applied to a micro batch dataframe.
Arguments:
- output_spec: output specification associated with the writer.
- batch_df: batch dataframe (given from streaming foreachBatch).
- batch_id: if of the batch (given from streaming foreachBatch).
- data: list of all dfs generated on previous steps before writer to be available on micro batch transforms.
Returns:
The transformed dataframe.
94 @classmethod 95 def get_streaming_trigger(cls, output_spec: OutputSpec) -> Dict: 96 """Define which streaming trigger will be used. 97 98 Args: 99 output_spec: output specification. 100 101 Returns: 102 A dict containing streaming trigger. 103 """ 104 trigger: Dict[str, Any] = {} 105 106 if output_spec.streaming_available_now: 107 trigger["availableNow"] = output_spec.streaming_available_now 108 elif output_spec.streaming_once: 109 trigger["once"] = output_spec.streaming_once 110 elif output_spec.streaming_processing_time: 111 trigger["processingTime"] = output_spec.streaming_processing_time 112 elif output_spec.streaming_continuous: 113 trigger["continuous"] = output_spec.streaming_continuous 114 else: 115 raise NotImplementedError( 116 "The requested output spec streaming trigger is not supported." 117 ) 118 119 return trigger
Define which streaming trigger will be used.
Arguments:
- output_spec: output specification.
Returns:
A dict containing streaming trigger.
121 @staticmethod 122 def run_micro_batch_dq_process(df: DataFrame, dq_spec: List[DQSpec]) -> DataFrame: 123 """Run the data quality process in a streaming micro batch dataframe. 124 125 Iterates over the specs and performs the checks or analysis depending on the 126 data quality specification provided in the configuration. 127 128 Args: 129 df: the dataframe in which to run the dq process on. 130 dq_spec: data quality specification. 131 132 Returns: the validated dataframe. 133 """ 134 from lakehouse_engine.dq_processors.dq_factory import DQFactory 135 136 # forcing session to be available inside forEachBatch on 137 # Spark Connect 138 ExecEnv.get_or_create() 139 140 validated_df = df 141 for spec in dq_spec: 142 validated_df = DQFactory.run_dq_process(spec, df) 143 144 return validated_df
Run the data quality process in a streaming micro batch dataframe.
Iterates over the specs and performs the checks or analysis depending on the data quality specification provided in the configuration.
Arguments:
- df: the dataframe in which to run the dq process on.
- dq_spec: data quality specification.
Returns: the validated dataframe.