lakehouse_engine.io.writers.file_writer
Module to define behaviour to write to files.
1"""Module to define behaviour to write to files.""" 2 3from typing import Callable, OrderedDict 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.definitions import OutputSpec 8from lakehouse_engine.io.writer import Writer 9 10 11class FileWriter(Writer): 12 """Class to write data to files.""" 13 14 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 15 """Construct FileWriter instances. 16 17 Args: 18 output_spec: output specification 19 df: dataframe to be written. 20 data: list of all dfs generated on previous steps before writer. 21 """ 22 super().__init__(output_spec, df, data) 23 24 def write(self) -> None: 25 """Write data to files.""" 26 if not self._df.isStreaming: 27 self._write_to_files_in_batch_mode(self._df, self._output_spec) 28 else: 29 self._write_to_files_in_streaming_mode( 30 self._df, self._output_spec, self._data 31 ) 32 33 @staticmethod 34 def _write_to_files_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 35 """Write to files in batch mode. 36 37 Args: 38 df: dataframe to write. 39 output_spec: output specification. 40 """ 41 df.write.format(output_spec.data_format).partitionBy( 42 output_spec.partitions 43 ).options(**output_spec.options if output_spec.options else {}).mode( 44 output_spec.write_type 45 ).save( 46 output_spec.location 47 ) 48 49 @staticmethod 50 def _write_to_files_in_streaming_mode( 51 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 52 ) -> None: 53 """Write to files in streaming mode. 54 55 Args: 56 df: dataframe to write. 57 output_spec: output specification. 58 data: list of all dfs generated on previous steps before writer. 59 """ 60 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 61 62 if ( 63 output_spec.streaming_micro_batch_transformers 64 or output_spec.streaming_micro_batch_dq_processors 65 ): 66 stream_df = ( 67 df_writer.options(**output_spec.options if output_spec.options else {}) 68 .foreachBatch( 69 FileWriter._write_transformed_micro_batch(output_spec, data) 70 ) 71 .start() 72 ) 73 else: 74 stream_df = ( 75 df_writer.format(output_spec.data_format) 76 .partitionBy(output_spec.partitions) 77 .options(**output_spec.options if output_spec.options else {}) 78 .outputMode(output_spec.write_type) 79 .start(output_spec.location) 80 ) 81 82 if output_spec.streaming_await_termination: 83 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 84 85 @staticmethod 86 def _write_transformed_micro_batch( # type: ignore 87 output_spec: OutputSpec, data: OrderedDict 88 ) -> Callable: 89 """Define how to write a streaming micro batch after transforming it. 90 91 Args: 92 output_spec: output specification. 93 data: list of all dfs generated on previous steps before writer. 94 95 Returns: 96 A function to be executed in the foreachBatch spark write method. 97 """ 98 99 def inner(batch_df: DataFrame, batch_id: int) -> None: 100 transformed_df = Writer.get_transformed_micro_batch( 101 output_spec, batch_df, batch_id, data 102 ) 103 104 if output_spec.streaming_micro_batch_dq_processors: 105 transformed_df = Writer.run_micro_batch_dq_process( 106 transformed_df, output_spec.streaming_micro_batch_dq_processors 107 ) 108 109 FileWriter._write_to_files_in_batch_mode(transformed_df, output_spec) 110 111 return inner
12class FileWriter(Writer): 13 """Class to write data to files.""" 14 15 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 16 """Construct FileWriter instances. 17 18 Args: 19 output_spec: output specification 20 df: dataframe to be written. 21 data: list of all dfs generated on previous steps before writer. 22 """ 23 super().__init__(output_spec, df, data) 24 25 def write(self) -> None: 26 """Write data to files.""" 27 if not self._df.isStreaming: 28 self._write_to_files_in_batch_mode(self._df, self._output_spec) 29 else: 30 self._write_to_files_in_streaming_mode( 31 self._df, self._output_spec, self._data 32 ) 33 34 @staticmethod 35 def _write_to_files_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 36 """Write to files in batch mode. 37 38 Args: 39 df: dataframe to write. 40 output_spec: output specification. 41 """ 42 df.write.format(output_spec.data_format).partitionBy( 43 output_spec.partitions 44 ).options(**output_spec.options if output_spec.options else {}).mode( 45 output_spec.write_type 46 ).save( 47 output_spec.location 48 ) 49 50 @staticmethod 51 def _write_to_files_in_streaming_mode( 52 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 53 ) -> None: 54 """Write to files in streaming mode. 55 56 Args: 57 df: dataframe to write. 58 output_spec: output specification. 59 data: list of all dfs generated on previous steps before writer. 60 """ 61 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 62 63 if ( 64 output_spec.streaming_micro_batch_transformers 65 or output_spec.streaming_micro_batch_dq_processors 66 ): 67 stream_df = ( 68 df_writer.options(**output_spec.options if output_spec.options else {}) 69 .foreachBatch( 70 FileWriter._write_transformed_micro_batch(output_spec, data) 71 ) 72 .start() 73 ) 74 else: 75 stream_df = ( 76 df_writer.format(output_spec.data_format) 77 .partitionBy(output_spec.partitions) 78 .options(**output_spec.options if output_spec.options else {}) 79 .outputMode(output_spec.write_type) 80 .start(output_spec.location) 81 ) 82 83 if output_spec.streaming_await_termination: 84 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 85 86 @staticmethod 87 def _write_transformed_micro_batch( # type: ignore 88 output_spec: OutputSpec, data: OrderedDict 89 ) -> Callable: 90 """Define how to write a streaming micro batch after transforming it. 91 92 Args: 93 output_spec: output specification. 94 data: list of all dfs generated on previous steps before writer. 95 96 Returns: 97 A function to be executed in the foreachBatch spark write method. 98 """ 99 100 def inner(batch_df: DataFrame, batch_id: int) -> None: 101 transformed_df = Writer.get_transformed_micro_batch( 102 output_spec, batch_df, batch_id, data 103 ) 104 105 if output_spec.streaming_micro_batch_dq_processors: 106 transformed_df = Writer.run_micro_batch_dq_process( 107 transformed_df, output_spec.streaming_micro_batch_dq_processors 108 ) 109 110 FileWriter._write_to_files_in_batch_mode(transformed_df, output_spec) 111 112 return inner
Class to write data to files.
FileWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
15 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 16 """Construct FileWriter instances. 17 18 Args: 19 output_spec: output specification 20 df: dataframe to be written. 21 data: list of all dfs generated on previous steps before writer. 22 """ 23 super().__init__(output_spec, df, data)
Construct FileWriter instances.
Arguments:
- output_spec: output specification
- df: dataframe to be written.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> None:
25 def write(self) -> None: 26 """Write data to files.""" 27 if not self._df.isStreaming: 28 self._write_to_files_in_batch_mode(self._df, self._output_spec) 29 else: 30 self._write_to_files_in_streaming_mode( 31 self._df, self._output_spec, self._data 32 )
Write data to files.