lakehouse_engine.io.writers.file_writer

Module to define behaviour to write to files.

  1"""Module to define behaviour to write to files."""
  2
  3from typing import Callable, OrderedDict
  4
  5from pyspark.sql import DataFrame
  6
  7from lakehouse_engine.core.definitions import OutputSpec
  8from lakehouse_engine.io.writer import Writer
  9
 10
 11class FileWriter(Writer):
 12    """Class to write data to files."""
 13
 14    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 15        """Construct FileWriter instances.
 16
 17        Args:
 18            output_spec: output specification
 19            df: dataframe to be written.
 20            data: list of all dfs generated on previous steps before writer.
 21        """
 22        super().__init__(output_spec, df, data)
 23
 24    def write(self) -> None:
 25        """Write data to files."""
 26        if not self._df.isStreaming:
 27            self._write_to_files_in_batch_mode(self._df, self._output_spec)
 28        else:
 29            self._write_to_files_in_streaming_mode(
 30                self._df, self._output_spec, self._data
 31            )
 32
 33    @staticmethod
 34    def _write_to_files_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
 35        """Write to files in batch mode.
 36
 37        Args:
 38            df: dataframe to write.
 39            output_spec: output specification.
 40        """
 41        df.write.format(output_spec.data_format).partitionBy(
 42            output_spec.partitions
 43        ).options(**output_spec.options if output_spec.options else {}).mode(
 44            output_spec.write_type
 45        ).save(
 46            output_spec.location
 47        )
 48
 49    @staticmethod
 50    def _write_to_files_in_streaming_mode(
 51        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 52    ) -> None:
 53        """Write to files in streaming mode.
 54
 55        Args:
 56            df: dataframe to write.
 57            output_spec: output specification.
 58            data: list of all dfs generated on previous steps before writer.
 59        """
 60        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
 61
 62        if (
 63            output_spec.streaming_micro_batch_transformers
 64            or output_spec.streaming_micro_batch_dq_processors
 65        ):
 66            stream_df = (
 67                df_writer.options(**output_spec.options if output_spec.options else {})
 68                .foreachBatch(
 69                    FileWriter._write_transformed_micro_batch(output_spec, data)
 70                )
 71                .start()
 72            )
 73        else:
 74            stream_df = (
 75                df_writer.format(output_spec.data_format)
 76                .partitionBy(output_spec.partitions)
 77                .options(**output_spec.options if output_spec.options else {})
 78                .outputMode(output_spec.write_type)
 79                .start(output_spec.location)
 80            )
 81
 82        if output_spec.streaming_await_termination:
 83            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
 84
 85    @staticmethod
 86    def _write_transformed_micro_batch(  # type: ignore
 87        output_spec: OutputSpec, data: OrderedDict
 88    ) -> Callable:
 89        """Define how to write a streaming micro batch after transforming it.
 90
 91        Args:
 92            output_spec: output specification.
 93            data: list of all dfs generated on previous steps before writer.
 94
 95        Returns:
 96            A function to be executed in the foreachBatch spark write method.
 97        """
 98
 99        def inner(batch_df: DataFrame, batch_id: int) -> None:
100            transformed_df = Writer.get_transformed_micro_batch(
101                output_spec, batch_df, batch_id, data
102            )
103
104            if output_spec.streaming_micro_batch_dq_processors:
105                transformed_df = Writer.run_micro_batch_dq_process(
106                    transformed_df, output_spec.streaming_micro_batch_dq_processors
107                )
108
109            FileWriter._write_to_files_in_batch_mode(transformed_df, output_spec)
110
111        return inner
class FileWriter(lakehouse_engine.io.writer.Writer):
 12class FileWriter(Writer):
 13    """Class to write data to files."""
 14
 15    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 16        """Construct FileWriter instances.
 17
 18        Args:
 19            output_spec: output specification
 20            df: dataframe to be written.
 21            data: list of all dfs generated on previous steps before writer.
 22        """
 23        super().__init__(output_spec, df, data)
 24
 25    def write(self) -> None:
 26        """Write data to files."""
 27        if not self._df.isStreaming:
 28            self._write_to_files_in_batch_mode(self._df, self._output_spec)
 29        else:
 30            self._write_to_files_in_streaming_mode(
 31                self._df, self._output_spec, self._data
 32            )
 33
 34    @staticmethod
 35    def _write_to_files_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
 36        """Write to files in batch mode.
 37
 38        Args:
 39            df: dataframe to write.
 40            output_spec: output specification.
 41        """
 42        df.write.format(output_spec.data_format).partitionBy(
 43            output_spec.partitions
 44        ).options(**output_spec.options if output_spec.options else {}).mode(
 45            output_spec.write_type
 46        ).save(
 47            output_spec.location
 48        )
 49
 50    @staticmethod
 51    def _write_to_files_in_streaming_mode(
 52        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 53    ) -> None:
 54        """Write to files in streaming mode.
 55
 56        Args:
 57            df: dataframe to write.
 58            output_spec: output specification.
 59            data: list of all dfs generated on previous steps before writer.
 60        """
 61        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
 62
 63        if (
 64            output_spec.streaming_micro_batch_transformers
 65            or output_spec.streaming_micro_batch_dq_processors
 66        ):
 67            stream_df = (
 68                df_writer.options(**output_spec.options if output_spec.options else {})
 69                .foreachBatch(
 70                    FileWriter._write_transformed_micro_batch(output_spec, data)
 71                )
 72                .start()
 73            )
 74        else:
 75            stream_df = (
 76                df_writer.format(output_spec.data_format)
 77                .partitionBy(output_spec.partitions)
 78                .options(**output_spec.options if output_spec.options else {})
 79                .outputMode(output_spec.write_type)
 80                .start(output_spec.location)
 81            )
 82
 83        if output_spec.streaming_await_termination:
 84            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
 85
 86    @staticmethod
 87    def _write_transformed_micro_batch(  # type: ignore
 88        output_spec: OutputSpec, data: OrderedDict
 89    ) -> Callable:
 90        """Define how to write a streaming micro batch after transforming it.
 91
 92        Args:
 93            output_spec: output specification.
 94            data: list of all dfs generated on previous steps before writer.
 95
 96        Returns:
 97            A function to be executed in the foreachBatch spark write method.
 98        """
 99
100        def inner(batch_df: DataFrame, batch_id: int) -> None:
101            transformed_df = Writer.get_transformed_micro_batch(
102                output_spec, batch_df, batch_id, data
103            )
104
105            if output_spec.streaming_micro_batch_dq_processors:
106                transformed_df = Writer.run_micro_batch_dq_process(
107                    transformed_df, output_spec.streaming_micro_batch_dq_processors
108                )
109
110            FileWriter._write_to_files_in_batch_mode(transformed_df, output_spec)
111
112        return inner

Class to write data to files.

FileWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
15    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
16        """Construct FileWriter instances.
17
18        Args:
19            output_spec: output specification
20            df: dataframe to be written.
21            data: list of all dfs generated on previous steps before writer.
22        """
23        super().__init__(output_spec, df, data)

Construct FileWriter instances.

Arguments:
  • output_spec: output specification
  • df: dataframe to be written.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> None:
25    def write(self) -> None:
26        """Write data to files."""
27        if not self._df.isStreaming:
28            self._write_to_files_in_batch_mode(self._df, self._output_spec)
29        else:
30            self._write_to_files_in_streaming_mode(
31                self._df, self._output_spec, self._data
32            )

Write data to files.