lakehouse_engine.io.writers.delta_merge_writer

Module to define the behaviour of delta merges.

  1"""Module to define the behaviour of delta merges."""
  2
  3from typing import Callable, Optional, OrderedDict
  4
  5from delta.tables import DeltaMergeBuilder, DeltaTable
  6from pyspark.sql import DataFrame
  7
  8from lakehouse_engine.core.definitions import OutputFormat, OutputSpec
  9from lakehouse_engine.core.exec_env import ExecEnv
 10from lakehouse_engine.io.exceptions import WrongIOFormatException
 11from lakehouse_engine.io.writer import Writer
 12
 13
 14class DeltaMergeWriter(Writer):
 15    """Class to merge data using delta lake."""
 16
 17    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 18        """Construct DeltaMergeWriter instances.
 19
 20        Args:
 21            output_spec: output specification containing merge options and
 22                relevant information.
 23            df: the dataframe containing the new data to be merged.
 24            data: list of all dfs generated on previous steps before writer.
 25        """
 26        super().__init__(output_spec, df, data)
 27
 28    def write(self) -> None:
 29        """Merge new data with current data."""
 30        delta_table = self._get_delta_table(self._output_spec)
 31        if self._df.isStreaming:
 32            stream_df = (
 33                self._df.writeStream.options(
 34                    **self._output_spec.options if self._output_spec.options else {}
 35                )
 36                .foreachBatch(
 37                    self._write_transformed_micro_batch(
 38                        self._output_spec, self._data, delta_table
 39                    )
 40                )
 41                .trigger(**Writer.get_streaming_trigger(self._output_spec))
 42                .start()
 43            )
 44
 45            if self._output_spec.streaming_await_termination:
 46                stream_df.awaitTermination(
 47                    self._output_spec.streaming_await_termination_timeout
 48                )
 49        else:
 50            DeltaMergeWriter._merge(delta_table, self._output_spec, self._df)
 51
 52    @staticmethod
 53    def _get_delta_table(output_spec: OutputSpec) -> DeltaTable:
 54        """Get the delta table given an output specification w/ table name or location.
 55
 56        Args:
 57            output_spec: output specification.
 58
 59        Returns:
 60            DeltaTable: the delta table instance.
 61        """
 62        if output_spec.db_table:
 63            delta_table = DeltaTable.forName(ExecEnv.SESSION, output_spec.db_table)
 64        elif output_spec.data_format == OutputFormat.DELTAFILES.value:
 65            delta_table = DeltaTable.forPath(ExecEnv.SESSION, output_spec.location)
 66        else:
 67            raise WrongIOFormatException(
 68                f"{output_spec.data_format} is not compatible with Delta Merge "
 69                f"Writer."
 70            )
 71
 72        return delta_table
 73
 74    @staticmethod
 75    def _insert(
 76        delta_merge: DeltaMergeBuilder,
 77        insert_predicate: Optional[str],
 78        insert_column_set: Optional[dict],
 79    ) -> DeltaMergeBuilder:
 80        """Get the builder of merge data with insert predicate and column set.
 81
 82        Args:
 83            delta_merge: builder of the merge data.
 84            insert_predicate: condition of the insert.
 85            insert_column_set: rules for setting the values of
 86                columns that need to be inserted.
 87
 88        Returns:
 89            DeltaMergeBuilder: builder of the merge data with insert.
 90        """
 91        if insert_predicate:
 92            if insert_column_set:
 93                delta_merge = delta_merge.whenNotMatchedInsert(
 94                    condition=insert_predicate,
 95                    values=insert_column_set,
 96                )
 97            else:
 98                delta_merge = delta_merge.whenNotMatchedInsertAll(
 99                    condition=insert_predicate
100                )
101        else:
102            if insert_column_set:
103                delta_merge = delta_merge.whenNotMatchedInsert(values=insert_column_set)
104            else:
105                delta_merge = delta_merge.whenNotMatchedInsertAll()
106
107        return delta_merge
108
109    @staticmethod
110    def _merge(delta_table: DeltaTable, output_spec: OutputSpec, df: DataFrame) -> None:
111        """Perform a delta lake merge according to several merge options.
112
113        Args:
114            delta_table: delta table to which to merge data.
115            output_spec: output specification containing the merge options.
116            df: dataframe with the new data to be merged into the delta table.
117        """
118        delta_merge = delta_table.alias("current").merge(
119            df.alias("new"), output_spec.merge_opts.merge_predicate
120        )
121
122        if not output_spec.merge_opts.insert_only:
123            if output_spec.merge_opts.delete_predicate:
124                delta_merge = delta_merge.whenMatchedDelete(
125                    output_spec.merge_opts.delete_predicate
126                )
127            delta_merge = DeltaMergeWriter._update(
128                delta_merge,
129                output_spec.merge_opts.update_predicate,
130                output_spec.merge_opts.update_column_set,
131            )
132
133        delta_merge = DeltaMergeWriter._insert(
134            delta_merge,
135            output_spec.merge_opts.insert_predicate,
136            output_spec.merge_opts.insert_column_set,
137        )
138
139        delta_merge.execute()
140
141    @staticmethod
142    def _update(
143        delta_merge: DeltaMergeBuilder,
144        update_predicate: Optional[str],
145        update_column_set: Optional[dict],
146    ) -> DeltaMergeBuilder:
147        """Get the builder of merge data with update predicate and column set.
148
149        Args:
150            delta_merge: builder of the merge data.
151            update_predicate: condition of the update.
152            update_column_set: rules for setting the values of
153                columns that need to be updated.
154
155        Returns:
156            DeltaMergeBuilder: builder of the merge data with update.
157        """
158        if update_predicate:
159            if update_column_set:
160                delta_merge = delta_merge.whenMatchedUpdate(
161                    condition=update_predicate,
162                    set=update_column_set,
163                )
164            else:
165                delta_merge = delta_merge.whenMatchedUpdateAll(
166                    condition=update_predicate
167                )
168        else:
169            if update_column_set:
170                delta_merge = delta_merge.whenMatchedUpdate(set=update_column_set)
171            else:
172                delta_merge = delta_merge.whenMatchedUpdateAll()
173
174        return delta_merge
175
176    @staticmethod
177    def _write_transformed_micro_batch(  # type: ignore
178        output_spec: OutputSpec,
179        data: OrderedDict,
180        delta_table: Optional[DeltaTable] = None,
181    ) -> Callable:
182        """Perform the merge in streaming mode by specifying a transform function.
183
184        This function returns a function that will be invoked in the foreachBatch in
185        streaming mode, performing a delta lake merge while streaming the micro batches.
186
187        Args:
188            output_spec: output specification.
189            data: list of all dfs generated on previous steps before writer.
190            delta_table: delta table for which to merge the streaming data
191                with.
192
193        Returns:
194            Function to call in .foreachBatch streaming function.
195        """
196
197        def inner(batch_df: DataFrame, batch_id: int) -> None:
198            transformed_df = Writer.get_transformed_micro_batch(
199                output_spec, batch_df, batch_id, data
200            )
201
202            if output_spec.streaming_micro_batch_dq_processors:
203                transformed_df = Writer.run_micro_batch_dq_process(
204                    transformed_df, output_spec.streaming_micro_batch_dq_processors
205                )
206
207            DeltaMergeWriter._merge(delta_table, output_spec, transformed_df)
208
209        return inner
class DeltaMergeWriter(lakehouse_engine.io.writer.Writer):
 15class DeltaMergeWriter(Writer):
 16    """Class to merge data using delta lake."""
 17
 18    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 19        """Construct DeltaMergeWriter instances.
 20
 21        Args:
 22            output_spec: output specification containing merge options and
 23                relevant information.
 24            df: the dataframe containing the new data to be merged.
 25            data: list of all dfs generated on previous steps before writer.
 26        """
 27        super().__init__(output_spec, df, data)
 28
 29    def write(self) -> None:
 30        """Merge new data with current data."""
 31        delta_table = self._get_delta_table(self._output_spec)
 32        if self._df.isStreaming:
 33            stream_df = (
 34                self._df.writeStream.options(
 35                    **self._output_spec.options if self._output_spec.options else {}
 36                )
 37                .foreachBatch(
 38                    self._write_transformed_micro_batch(
 39                        self._output_spec, self._data, delta_table
 40                    )
 41                )
 42                .trigger(**Writer.get_streaming_trigger(self._output_spec))
 43                .start()
 44            )
 45
 46            if self._output_spec.streaming_await_termination:
 47                stream_df.awaitTermination(
 48                    self._output_spec.streaming_await_termination_timeout
 49                )
 50        else:
 51            DeltaMergeWriter._merge(delta_table, self._output_spec, self._df)
 52
 53    @staticmethod
 54    def _get_delta_table(output_spec: OutputSpec) -> DeltaTable:
 55        """Get the delta table given an output specification w/ table name or location.
 56
 57        Args:
 58            output_spec: output specification.
 59
 60        Returns:
 61            DeltaTable: the delta table instance.
 62        """
 63        if output_spec.db_table:
 64            delta_table = DeltaTable.forName(ExecEnv.SESSION, output_spec.db_table)
 65        elif output_spec.data_format == OutputFormat.DELTAFILES.value:
 66            delta_table = DeltaTable.forPath(ExecEnv.SESSION, output_spec.location)
 67        else:
 68            raise WrongIOFormatException(
 69                f"{output_spec.data_format} is not compatible with Delta Merge "
 70                f"Writer."
 71            )
 72
 73        return delta_table
 74
 75    @staticmethod
 76    def _insert(
 77        delta_merge: DeltaMergeBuilder,
 78        insert_predicate: Optional[str],
 79        insert_column_set: Optional[dict],
 80    ) -> DeltaMergeBuilder:
 81        """Get the builder of merge data with insert predicate and column set.
 82
 83        Args:
 84            delta_merge: builder of the merge data.
 85            insert_predicate: condition of the insert.
 86            insert_column_set: rules for setting the values of
 87                columns that need to be inserted.
 88
 89        Returns:
 90            DeltaMergeBuilder: builder of the merge data with insert.
 91        """
 92        if insert_predicate:
 93            if insert_column_set:
 94                delta_merge = delta_merge.whenNotMatchedInsert(
 95                    condition=insert_predicate,
 96                    values=insert_column_set,
 97                )
 98            else:
 99                delta_merge = delta_merge.whenNotMatchedInsertAll(
100                    condition=insert_predicate
101                )
102        else:
103            if insert_column_set:
104                delta_merge = delta_merge.whenNotMatchedInsert(values=insert_column_set)
105            else:
106                delta_merge = delta_merge.whenNotMatchedInsertAll()
107
108        return delta_merge
109
110    @staticmethod
111    def _merge(delta_table: DeltaTable, output_spec: OutputSpec, df: DataFrame) -> None:
112        """Perform a delta lake merge according to several merge options.
113
114        Args:
115            delta_table: delta table to which to merge data.
116            output_spec: output specification containing the merge options.
117            df: dataframe with the new data to be merged into the delta table.
118        """
119        delta_merge = delta_table.alias("current").merge(
120            df.alias("new"), output_spec.merge_opts.merge_predicate
121        )
122
123        if not output_spec.merge_opts.insert_only:
124            if output_spec.merge_opts.delete_predicate:
125                delta_merge = delta_merge.whenMatchedDelete(
126                    output_spec.merge_opts.delete_predicate
127                )
128            delta_merge = DeltaMergeWriter._update(
129                delta_merge,
130                output_spec.merge_opts.update_predicate,
131                output_spec.merge_opts.update_column_set,
132            )
133
134        delta_merge = DeltaMergeWriter._insert(
135            delta_merge,
136            output_spec.merge_opts.insert_predicate,
137            output_spec.merge_opts.insert_column_set,
138        )
139
140        delta_merge.execute()
141
142    @staticmethod
143    def _update(
144        delta_merge: DeltaMergeBuilder,
145        update_predicate: Optional[str],
146        update_column_set: Optional[dict],
147    ) -> DeltaMergeBuilder:
148        """Get the builder of merge data with update predicate and column set.
149
150        Args:
151            delta_merge: builder of the merge data.
152            update_predicate: condition of the update.
153            update_column_set: rules for setting the values of
154                columns that need to be updated.
155
156        Returns:
157            DeltaMergeBuilder: builder of the merge data with update.
158        """
159        if update_predicate:
160            if update_column_set:
161                delta_merge = delta_merge.whenMatchedUpdate(
162                    condition=update_predicate,
163                    set=update_column_set,
164                )
165            else:
166                delta_merge = delta_merge.whenMatchedUpdateAll(
167                    condition=update_predicate
168                )
169        else:
170            if update_column_set:
171                delta_merge = delta_merge.whenMatchedUpdate(set=update_column_set)
172            else:
173                delta_merge = delta_merge.whenMatchedUpdateAll()
174
175        return delta_merge
176
177    @staticmethod
178    def _write_transformed_micro_batch(  # type: ignore
179        output_spec: OutputSpec,
180        data: OrderedDict,
181        delta_table: Optional[DeltaTable] = None,
182    ) -> Callable:
183        """Perform the merge in streaming mode by specifying a transform function.
184
185        This function returns a function that will be invoked in the foreachBatch in
186        streaming mode, performing a delta lake merge while streaming the micro batches.
187
188        Args:
189            output_spec: output specification.
190            data: list of all dfs generated on previous steps before writer.
191            delta_table: delta table for which to merge the streaming data
192                with.
193
194        Returns:
195            Function to call in .foreachBatch streaming function.
196        """
197
198        def inner(batch_df: DataFrame, batch_id: int) -> None:
199            transformed_df = Writer.get_transformed_micro_batch(
200                output_spec, batch_df, batch_id, data
201            )
202
203            if output_spec.streaming_micro_batch_dq_processors:
204                transformed_df = Writer.run_micro_batch_dq_process(
205                    transformed_df, output_spec.streaming_micro_batch_dq_processors
206                )
207
208            DeltaMergeWriter._merge(delta_table, output_spec, transformed_df)
209
210        return inner

Class to merge data using delta lake.

DeltaMergeWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
18    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
19        """Construct DeltaMergeWriter instances.
20
21        Args:
22            output_spec: output specification containing merge options and
23                relevant information.
24            df: the dataframe containing the new data to be merged.
25            data: list of all dfs generated on previous steps before writer.
26        """
27        super().__init__(output_spec, df, data)

Construct DeltaMergeWriter instances.

Arguments:
  • output_spec: output specification containing merge options and relevant information.
  • df: the dataframe containing the new data to be merged.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> None:
29    def write(self) -> None:
30        """Merge new data with current data."""
31        delta_table = self._get_delta_table(self._output_spec)
32        if self._df.isStreaming:
33            stream_df = (
34                self._df.writeStream.options(
35                    **self._output_spec.options if self._output_spec.options else {}
36                )
37                .foreachBatch(
38                    self._write_transformed_micro_batch(
39                        self._output_spec, self._data, delta_table
40                    )
41                )
42                .trigger(**Writer.get_streaming_trigger(self._output_spec))
43                .start()
44            )
45
46            if self._output_spec.streaming_await_termination:
47                stream_df.awaitTermination(
48                    self._output_spec.streaming_await_termination_timeout
49                )
50        else:
51            DeltaMergeWriter._merge(delta_table, self._output_spec, self._df)

Merge new data with current data.