lakehouse_engine.io.writers.table_writer

Module that defines the behaviour to write to tables.

  1"""Module that defines the behaviour to write to tables."""
  2
  3from typing import Any, Callable, OrderedDict
  4
  5from pyspark.sql import DataFrame
  6
  7from lakehouse_engine.core.definitions import OutputFormat, OutputSpec
  8from lakehouse_engine.core.exec_env import ExecEnv
  9from lakehouse_engine.io.writer import Writer
 10
 11
 12class TableWriter(Writer):
 13    """Class to write to a table."""
 14
 15    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 16        """Construct TableWriter instances.
 17
 18        Args:
 19            output_spec: output specification.
 20            df: dataframe to be written.
 21            data: list of all dfs generated on previous steps before writer.
 22        """
 23        super().__init__(output_spec, df, data)
 24
 25    def write(self) -> None:
 26        """Write data to a table.
 27
 28        After the write operation we repair the table (e.g., update partitions).
 29        However, there's a caveat to this, which is the fact that this repair
 30        operation is not reachable if we are running long-running streaming mode.
 31        Therefore, we recommend not using the TableWriter with formats other than
 32        delta lake for those scenarios (as delta lake does not need msck repair).
 33        So, you can: 1) use delta lake format for the table; 2) use the FileWriter
 34        and run the repair with a certain frequency in a separate task of your
 35        pipeline.
 36        """
 37        if not self._df.isStreaming:
 38            self._write_to_table_in_batch_mode(self._df, self._output_spec)
 39        else:
 40            df_writer = self._df.writeStream.trigger(
 41                **Writer.get_streaming_trigger(self._output_spec)
 42            )
 43
 44            if (
 45                self._output_spec.streaming_micro_batch_transformers
 46                or self._output_spec.streaming_micro_batch_dq_processors
 47            ):
 48                stream_df = (
 49                    df_writer.options(
 50                        **self._output_spec.options if self._output_spec.options else {}
 51                    )
 52                    .foreachBatch(
 53                        self._write_transformed_micro_batch(
 54                            self._output_spec, self._data
 55                        )
 56                    )
 57                    .start()
 58                )
 59
 60                if self._output_spec.streaming_await_termination:
 61                    stream_df.awaitTermination(
 62                        self._output_spec.streaming_await_termination_timeout
 63                    )
 64            else:
 65                self._write_to_table_in_streaming_mode(df_writer, self._output_spec)
 66
 67        if (
 68            self._output_spec.data_format != OutputFormat.DELTAFILES.value
 69            and self._output_spec.partitions
 70        ):
 71            ExecEnv.SESSION.sql(f"MSCK REPAIR TABLE {self._output_spec.db_table}")
 72
 73    @staticmethod
 74    def _write_to_table_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
 75        """Write to a metastore table in batch mode.
 76
 77        Args:
 78            df: dataframe to write.
 79            output_spec: output specification.
 80        """
 81        df_writer = df.write.format(output_spec.data_format)
 82
 83        if output_spec.partitions:
 84            df_writer = df_writer.partitionBy(output_spec.partitions)
 85
 86        if output_spec.location:
 87            df_writer = df_writer.options(
 88                path=output_spec.location,
 89                **output_spec.options if output_spec.options else {},
 90            )
 91        else:
 92            df_writer = df_writer.options(
 93                **output_spec.options if output_spec.options else {}
 94            )
 95
 96        df_writer.mode(output_spec.write_type).saveAsTable(output_spec.db_table)
 97
 98    @staticmethod
 99    def _write_to_table_in_streaming_mode(
100        df_writer: Any, output_spec: OutputSpec
101    ) -> None:
102        """Write to a metastore table in streaming mode.
103
104        Args:
105            df_writer: dataframe writer.
106            output_spec: output specification.
107        """
108        df_writer = df_writer.outputMode(output_spec.write_type).format(
109            output_spec.data_format
110        )
111
112        if output_spec.partitions:
113            df_writer = df_writer.partitionBy(output_spec.partitions)
114
115        if output_spec.location:
116            df_writer = df_writer.options(
117                path=output_spec.location,
118                **output_spec.options if output_spec.options else {},
119            )
120        else:
121            df_writer = df_writer.options(
122                **output_spec.options if output_spec.options else {}
123            )
124
125        if output_spec.streaming_await_termination:
126            df_writer.toTable(output_spec.db_table).awaitTermination(
127                output_spec.streaming_await_termination_timeout
128            )
129        else:
130            df_writer.toTable(output_spec.db_table)
131
132    @staticmethod
133    def _write_transformed_micro_batch(  # type: ignore
134        output_spec: OutputSpec, data: OrderedDict
135    ) -> Callable:
136        """Define how to write a streaming micro batch after transforming it.
137
138        Args:
139            output_spec: output specification.
140            data: list of all dfs generated on previous steps before writer.
141
142        Returns:
143            A function to be executed in the foreachBatch spark write method.
144        """
145
146        def inner(batch_df: DataFrame, batch_id: int) -> None:
147            transformed_df = Writer.get_transformed_micro_batch(
148                output_spec, batch_df, batch_id, data
149            )
150
151            if output_spec.streaming_micro_batch_dq_processors:
152                transformed_df = Writer.run_micro_batch_dq_process(
153                    transformed_df, output_spec.streaming_micro_batch_dq_processors
154                )
155
156            TableWriter._write_to_table_in_batch_mode(transformed_df, output_spec)
157
158        return inner
class TableWriter(lakehouse_engine.io.writer.Writer):
 13class TableWriter(Writer):
 14    """Class to write to a table."""
 15
 16    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 17        """Construct TableWriter instances.
 18
 19        Args:
 20            output_spec: output specification.
 21            df: dataframe to be written.
 22            data: list of all dfs generated on previous steps before writer.
 23        """
 24        super().__init__(output_spec, df, data)
 25
 26    def write(self) -> None:
 27        """Write data to a table.
 28
 29        After the write operation we repair the table (e.g., update partitions).
 30        However, there's a caveat to this, which is the fact that this repair
 31        operation is not reachable if we are running long-running streaming mode.
 32        Therefore, we recommend not using the TableWriter with formats other than
 33        delta lake for those scenarios (as delta lake does not need msck repair).
 34        So, you can: 1) use delta lake format for the table; 2) use the FileWriter
 35        and run the repair with a certain frequency in a separate task of your
 36        pipeline.
 37        """
 38        if not self._df.isStreaming:
 39            self._write_to_table_in_batch_mode(self._df, self._output_spec)
 40        else:
 41            df_writer = self._df.writeStream.trigger(
 42                **Writer.get_streaming_trigger(self._output_spec)
 43            )
 44
 45            if (
 46                self._output_spec.streaming_micro_batch_transformers
 47                or self._output_spec.streaming_micro_batch_dq_processors
 48            ):
 49                stream_df = (
 50                    df_writer.options(
 51                        **self._output_spec.options if self._output_spec.options else {}
 52                    )
 53                    .foreachBatch(
 54                        self._write_transformed_micro_batch(
 55                            self._output_spec, self._data
 56                        )
 57                    )
 58                    .start()
 59                )
 60
 61                if self._output_spec.streaming_await_termination:
 62                    stream_df.awaitTermination(
 63                        self._output_spec.streaming_await_termination_timeout
 64                    )
 65            else:
 66                self._write_to_table_in_streaming_mode(df_writer, self._output_spec)
 67
 68        if (
 69            self._output_spec.data_format != OutputFormat.DELTAFILES.value
 70            and self._output_spec.partitions
 71        ):
 72            ExecEnv.SESSION.sql(f"MSCK REPAIR TABLE {self._output_spec.db_table}")
 73
 74    @staticmethod
 75    def _write_to_table_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
 76        """Write to a metastore table in batch mode.
 77
 78        Args:
 79            df: dataframe to write.
 80            output_spec: output specification.
 81        """
 82        df_writer = df.write.format(output_spec.data_format)
 83
 84        if output_spec.partitions:
 85            df_writer = df_writer.partitionBy(output_spec.partitions)
 86
 87        if output_spec.location:
 88            df_writer = df_writer.options(
 89                path=output_spec.location,
 90                **output_spec.options if output_spec.options else {},
 91            )
 92        else:
 93            df_writer = df_writer.options(
 94                **output_spec.options if output_spec.options else {}
 95            )
 96
 97        df_writer.mode(output_spec.write_type).saveAsTable(output_spec.db_table)
 98
 99    @staticmethod
100    def _write_to_table_in_streaming_mode(
101        df_writer: Any, output_spec: OutputSpec
102    ) -> None:
103        """Write to a metastore table in streaming mode.
104
105        Args:
106            df_writer: dataframe writer.
107            output_spec: output specification.
108        """
109        df_writer = df_writer.outputMode(output_spec.write_type).format(
110            output_spec.data_format
111        )
112
113        if output_spec.partitions:
114            df_writer = df_writer.partitionBy(output_spec.partitions)
115
116        if output_spec.location:
117            df_writer = df_writer.options(
118                path=output_spec.location,
119                **output_spec.options if output_spec.options else {},
120            )
121        else:
122            df_writer = df_writer.options(
123                **output_spec.options if output_spec.options else {}
124            )
125
126        if output_spec.streaming_await_termination:
127            df_writer.toTable(output_spec.db_table).awaitTermination(
128                output_spec.streaming_await_termination_timeout
129            )
130        else:
131            df_writer.toTable(output_spec.db_table)
132
133    @staticmethod
134    def _write_transformed_micro_batch(  # type: ignore
135        output_spec: OutputSpec, data: OrderedDict
136    ) -> Callable:
137        """Define how to write a streaming micro batch after transforming it.
138
139        Args:
140            output_spec: output specification.
141            data: list of all dfs generated on previous steps before writer.
142
143        Returns:
144            A function to be executed in the foreachBatch spark write method.
145        """
146
147        def inner(batch_df: DataFrame, batch_id: int) -> None:
148            transformed_df = Writer.get_transformed_micro_batch(
149                output_spec, batch_df, batch_id, data
150            )
151
152            if output_spec.streaming_micro_batch_dq_processors:
153                transformed_df = Writer.run_micro_batch_dq_process(
154                    transformed_df, output_spec.streaming_micro_batch_dq_processors
155                )
156
157            TableWriter._write_to_table_in_batch_mode(transformed_df, output_spec)
158
159        return inner

Class to write to a table.

TableWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
16    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
17        """Construct TableWriter instances.
18
19        Args:
20            output_spec: output specification.
21            df: dataframe to be written.
22            data: list of all dfs generated on previous steps before writer.
23        """
24        super().__init__(output_spec, df, data)

Construct TableWriter instances.

Arguments:
  • output_spec: output specification.
  • df: dataframe to be written.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> None:
26    def write(self) -> None:
27        """Write data to a table.
28
29        After the write operation we repair the table (e.g., update partitions).
30        However, there's a caveat to this, which is the fact that this repair
31        operation is not reachable if we are running long-running streaming mode.
32        Therefore, we recommend not using the TableWriter with formats other than
33        delta lake for those scenarios (as delta lake does not need msck repair).
34        So, you can: 1) use delta lake format for the table; 2) use the FileWriter
35        and run the repair with a certain frequency in a separate task of your
36        pipeline.
37        """
38        if not self._df.isStreaming:
39            self._write_to_table_in_batch_mode(self._df, self._output_spec)
40        else:
41            df_writer = self._df.writeStream.trigger(
42                **Writer.get_streaming_trigger(self._output_spec)
43            )
44
45            if (
46                self._output_spec.streaming_micro_batch_transformers
47                or self._output_spec.streaming_micro_batch_dq_processors
48            ):
49                stream_df = (
50                    df_writer.options(
51                        **self._output_spec.options if self._output_spec.options else {}
52                    )
53                    .foreachBatch(
54                        self._write_transformed_micro_batch(
55                            self._output_spec, self._data
56                        )
57                    )
58                    .start()
59                )
60
61                if self._output_spec.streaming_await_termination:
62                    stream_df.awaitTermination(
63                        self._output_spec.streaming_await_termination_timeout
64                    )
65            else:
66                self._write_to_table_in_streaming_mode(df_writer, self._output_spec)
67
68        if (
69            self._output_spec.data_format != OutputFormat.DELTAFILES.value
70            and self._output_spec.partitions
71        ):
72            ExecEnv.SESSION.sql(f"MSCK REPAIR TABLE {self._output_spec.db_table}")

Write data to a table.

After the write operation we repair the table (e.g., update partitions). However, there's a caveat to this, which is the fact that this repair operation is not reachable if we are running long-running streaming mode. Therefore, we recommend not using the TableWriter with formats other than delta lake for those scenarios (as delta lake does not need msck repair). So, you can: 1) use delta lake format for the table; 2) use the FileWriter and run the repair with a certain frequency in a separate task of your pipeline.