lakehouse_engine.io.writers.delta_merge_writer
Module to define the behaviour of delta merges.
1"""Module to define the behaviour of delta merges.""" 2 3from typing import Callable, Optional, OrderedDict 4 5from delta.tables import DeltaMergeBuilder, DeltaTable 6from pyspark.sql import DataFrame 7 8from lakehouse_engine.core.definitions import OutputFormat, OutputSpec 9from lakehouse_engine.core.exec_env import ExecEnv 10from lakehouse_engine.io.exceptions import WrongIOFormatException 11from lakehouse_engine.io.writer import Writer 12 13 14class DeltaMergeWriter(Writer): 15 """Class to merge data using delta lake.""" 16 17 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 18 """Construct DeltaMergeWriter instances. 19 20 Args: 21 output_spec: output specification containing merge options and 22 relevant information. 23 df: the dataframe containing the new data to be merged. 24 data: list of all dfs generated on previous steps before writer. 25 """ 26 super().__init__(output_spec, df, data) 27 28 def write(self) -> None: 29 """Merge new data with current data.""" 30 delta_table = self._get_delta_table(self._output_spec) 31 if self._df.isStreaming: 32 stream_df = ( 33 self._df.writeStream.options( 34 **self._output_spec.options if self._output_spec.options else {} 35 ) 36 .foreachBatch( 37 self._write_transformed_micro_batch( 38 self._output_spec, self._data, delta_table 39 ) 40 ) 41 .trigger(**Writer.get_streaming_trigger(self._output_spec)) 42 .start() 43 ) 44 45 if self._output_spec.streaming_await_termination: 46 stream_df.awaitTermination( 47 self._output_spec.streaming_await_termination_timeout 48 ) 49 else: 50 DeltaMergeWriter._merge(delta_table, self._output_spec, self._df) 51 52 @staticmethod 53 def _get_delta_table(output_spec: OutputSpec) -> DeltaTable: 54 """Get the delta table given an output specification w/ table name or location. 55 56 Args: 57 output_spec: output specification. 58 59 Returns: 60 DeltaTable: the delta table instance. 61 """ 62 if output_spec.db_table: 63 delta_table = DeltaTable.forName(ExecEnv.SESSION, output_spec.db_table) 64 elif output_spec.data_format == OutputFormat.DELTAFILES.value: 65 delta_table = DeltaTable.forPath(ExecEnv.SESSION, output_spec.location) 66 else: 67 raise WrongIOFormatException( 68 f"{output_spec.data_format} is not compatible with Delta Merge " 69 f"Writer." 70 ) 71 72 return delta_table 73 74 @staticmethod 75 def _insert( 76 delta_merge: DeltaMergeBuilder, 77 insert_predicate: Optional[str], 78 insert_column_set: Optional[dict], 79 ) -> DeltaMergeBuilder: 80 """Get the builder of merge data with insert predicate and column set. 81 82 Args: 83 delta_merge: builder of the merge data. 84 insert_predicate: condition of the insert. 85 insert_column_set: rules for setting the values of 86 columns that need to be inserted. 87 88 Returns: 89 DeltaMergeBuilder: builder of the merge data with insert. 90 """ 91 if insert_predicate: 92 if insert_column_set: 93 delta_merge = delta_merge.whenNotMatchedInsert( 94 condition=insert_predicate, 95 values=insert_column_set, 96 ) 97 else: 98 delta_merge = delta_merge.whenNotMatchedInsertAll( 99 condition=insert_predicate 100 ) 101 else: 102 if insert_column_set: 103 delta_merge = delta_merge.whenNotMatchedInsert(values=insert_column_set) 104 else: 105 delta_merge = delta_merge.whenNotMatchedInsertAll() 106 107 return delta_merge 108 109 @staticmethod 110 def _merge(delta_table: DeltaTable, output_spec: OutputSpec, df: DataFrame) -> None: 111 """Perform a delta lake merge according to several merge options. 112 113 Args: 114 delta_table: delta table to which to merge data. 115 output_spec: output specification containing the merge options. 116 df: dataframe with the new data to be merged into the delta table. 117 """ 118 delta_merge = delta_table.alias("current").merge( 119 df.alias("new"), output_spec.merge_opts.merge_predicate 120 ) 121 122 if not output_spec.merge_opts.insert_only: 123 if output_spec.merge_opts.delete_predicate: 124 delta_merge = delta_merge.whenMatchedDelete( 125 output_spec.merge_opts.delete_predicate 126 ) 127 delta_merge = DeltaMergeWriter._update( 128 delta_merge, 129 output_spec.merge_opts.update_predicate, 130 output_spec.merge_opts.update_column_set, 131 ) 132 133 delta_merge = DeltaMergeWriter._insert( 134 delta_merge, 135 output_spec.merge_opts.insert_predicate, 136 output_spec.merge_opts.insert_column_set, 137 ) 138 139 delta_merge.execute() 140 141 @staticmethod 142 def _update( 143 delta_merge: DeltaMergeBuilder, 144 update_predicate: Optional[str], 145 update_column_set: Optional[dict], 146 ) -> DeltaMergeBuilder: 147 """Get the builder of merge data with update predicate and column set. 148 149 Args: 150 delta_merge: builder of the merge data. 151 update_predicate: condition of the update. 152 update_column_set: rules for setting the values of 153 columns that need to be updated. 154 155 Returns: 156 DeltaMergeBuilder: builder of the merge data with update. 157 """ 158 if update_predicate: 159 if update_column_set: 160 delta_merge = delta_merge.whenMatchedUpdate( 161 condition=update_predicate, 162 set=update_column_set, 163 ) 164 else: 165 delta_merge = delta_merge.whenMatchedUpdateAll( 166 condition=update_predicate 167 ) 168 else: 169 if update_column_set: 170 delta_merge = delta_merge.whenMatchedUpdate(set=update_column_set) 171 else: 172 delta_merge = delta_merge.whenMatchedUpdateAll() 173 174 return delta_merge 175 176 @staticmethod 177 def _write_transformed_micro_batch( # type: ignore 178 output_spec: OutputSpec, 179 data: OrderedDict, 180 delta_table: Optional[DeltaTable] = None, 181 ) -> Callable: 182 """Perform the merge in streaming mode by specifying a transform function. 183 184 This function returns a function that will be invoked in the foreachBatch in 185 streaming mode, performing a delta lake merge while streaming the micro batches. 186 187 Args: 188 output_spec: output specification. 189 data: list of all dfs generated on previous steps before writer. 190 delta_table: delta table for which to merge the streaming data 191 with. 192 193 Returns: 194 Function to call in .foreachBatch streaming function. 195 """ 196 197 def inner(batch_df: DataFrame, batch_id: int) -> None: 198 transformed_df = Writer.get_transformed_micro_batch( 199 output_spec, batch_df, batch_id, data 200 ) 201 202 if output_spec.streaming_micro_batch_dq_processors: 203 transformed_df = Writer.run_micro_batch_dq_process( 204 transformed_df, output_spec.streaming_micro_batch_dq_processors 205 ) 206 207 DeltaMergeWriter._merge(delta_table, output_spec, transformed_df) 208 209 return inner
15class DeltaMergeWriter(Writer): 16 """Class to merge data using delta lake.""" 17 18 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 19 """Construct DeltaMergeWriter instances. 20 21 Args: 22 output_spec: output specification containing merge options and 23 relevant information. 24 df: the dataframe containing the new data to be merged. 25 data: list of all dfs generated on previous steps before writer. 26 """ 27 super().__init__(output_spec, df, data) 28 29 def write(self) -> None: 30 """Merge new data with current data.""" 31 delta_table = self._get_delta_table(self._output_spec) 32 if self._df.isStreaming: 33 stream_df = ( 34 self._df.writeStream.options( 35 **self._output_spec.options if self._output_spec.options else {} 36 ) 37 .foreachBatch( 38 self._write_transformed_micro_batch( 39 self._output_spec, self._data, delta_table 40 ) 41 ) 42 .trigger(**Writer.get_streaming_trigger(self._output_spec)) 43 .start() 44 ) 45 46 if self._output_spec.streaming_await_termination: 47 stream_df.awaitTermination( 48 self._output_spec.streaming_await_termination_timeout 49 ) 50 else: 51 DeltaMergeWriter._merge(delta_table, self._output_spec, self._df) 52 53 @staticmethod 54 def _get_delta_table(output_spec: OutputSpec) -> DeltaTable: 55 """Get the delta table given an output specification w/ table name or location. 56 57 Args: 58 output_spec: output specification. 59 60 Returns: 61 DeltaTable: the delta table instance. 62 """ 63 if output_spec.db_table: 64 delta_table = DeltaTable.forName(ExecEnv.SESSION, output_spec.db_table) 65 elif output_spec.data_format == OutputFormat.DELTAFILES.value: 66 delta_table = DeltaTable.forPath(ExecEnv.SESSION, output_spec.location) 67 else: 68 raise WrongIOFormatException( 69 f"{output_spec.data_format} is not compatible with Delta Merge " 70 f"Writer." 71 ) 72 73 return delta_table 74 75 @staticmethod 76 def _insert( 77 delta_merge: DeltaMergeBuilder, 78 insert_predicate: Optional[str], 79 insert_column_set: Optional[dict], 80 ) -> DeltaMergeBuilder: 81 """Get the builder of merge data with insert predicate and column set. 82 83 Args: 84 delta_merge: builder of the merge data. 85 insert_predicate: condition of the insert. 86 insert_column_set: rules for setting the values of 87 columns that need to be inserted. 88 89 Returns: 90 DeltaMergeBuilder: builder of the merge data with insert. 91 """ 92 if insert_predicate: 93 if insert_column_set: 94 delta_merge = delta_merge.whenNotMatchedInsert( 95 condition=insert_predicate, 96 values=insert_column_set, 97 ) 98 else: 99 delta_merge = delta_merge.whenNotMatchedInsertAll( 100 condition=insert_predicate 101 ) 102 else: 103 if insert_column_set: 104 delta_merge = delta_merge.whenNotMatchedInsert(values=insert_column_set) 105 else: 106 delta_merge = delta_merge.whenNotMatchedInsertAll() 107 108 return delta_merge 109 110 @staticmethod 111 def _merge(delta_table: DeltaTable, output_spec: OutputSpec, df: DataFrame) -> None: 112 """Perform a delta lake merge according to several merge options. 113 114 Args: 115 delta_table: delta table to which to merge data. 116 output_spec: output specification containing the merge options. 117 df: dataframe with the new data to be merged into the delta table. 118 """ 119 delta_merge = delta_table.alias("current").merge( 120 df.alias("new"), output_spec.merge_opts.merge_predicate 121 ) 122 123 if not output_spec.merge_opts.insert_only: 124 if output_spec.merge_opts.delete_predicate: 125 delta_merge = delta_merge.whenMatchedDelete( 126 output_spec.merge_opts.delete_predicate 127 ) 128 delta_merge = DeltaMergeWriter._update( 129 delta_merge, 130 output_spec.merge_opts.update_predicate, 131 output_spec.merge_opts.update_column_set, 132 ) 133 134 delta_merge = DeltaMergeWriter._insert( 135 delta_merge, 136 output_spec.merge_opts.insert_predicate, 137 output_spec.merge_opts.insert_column_set, 138 ) 139 140 delta_merge.execute() 141 142 @staticmethod 143 def _update( 144 delta_merge: DeltaMergeBuilder, 145 update_predicate: Optional[str], 146 update_column_set: Optional[dict], 147 ) -> DeltaMergeBuilder: 148 """Get the builder of merge data with update predicate and column set. 149 150 Args: 151 delta_merge: builder of the merge data. 152 update_predicate: condition of the update. 153 update_column_set: rules for setting the values of 154 columns that need to be updated. 155 156 Returns: 157 DeltaMergeBuilder: builder of the merge data with update. 158 """ 159 if update_predicate: 160 if update_column_set: 161 delta_merge = delta_merge.whenMatchedUpdate( 162 condition=update_predicate, 163 set=update_column_set, 164 ) 165 else: 166 delta_merge = delta_merge.whenMatchedUpdateAll( 167 condition=update_predicate 168 ) 169 else: 170 if update_column_set: 171 delta_merge = delta_merge.whenMatchedUpdate(set=update_column_set) 172 else: 173 delta_merge = delta_merge.whenMatchedUpdateAll() 174 175 return delta_merge 176 177 @staticmethod 178 def _write_transformed_micro_batch( # type: ignore 179 output_spec: OutputSpec, 180 data: OrderedDict, 181 delta_table: Optional[DeltaTable] = None, 182 ) -> Callable: 183 """Perform the merge in streaming mode by specifying a transform function. 184 185 This function returns a function that will be invoked in the foreachBatch in 186 streaming mode, performing a delta lake merge while streaming the micro batches. 187 188 Args: 189 output_spec: output specification. 190 data: list of all dfs generated on previous steps before writer. 191 delta_table: delta table for which to merge the streaming data 192 with. 193 194 Returns: 195 Function to call in .foreachBatch streaming function. 196 """ 197 198 def inner(batch_df: DataFrame, batch_id: int) -> None: 199 transformed_df = Writer.get_transformed_micro_batch( 200 output_spec, batch_df, batch_id, data 201 ) 202 203 if output_spec.streaming_micro_batch_dq_processors: 204 transformed_df = Writer.run_micro_batch_dq_process( 205 transformed_df, output_spec.streaming_micro_batch_dq_processors 206 ) 207 208 DeltaMergeWriter._merge(delta_table, output_spec, transformed_df) 209 210 return inner
Class to merge data using delta lake.
DeltaMergeWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
18 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 19 """Construct DeltaMergeWriter instances. 20 21 Args: 22 output_spec: output specification containing merge options and 23 relevant information. 24 df: the dataframe containing the new data to be merged. 25 data: list of all dfs generated on previous steps before writer. 26 """ 27 super().__init__(output_spec, df, data)
Construct DeltaMergeWriter instances.
Arguments:
- output_spec: output specification containing merge options and relevant information.
- df: the dataframe containing the new data to be merged.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> None:
29 def write(self) -> None: 30 """Merge new data with current data.""" 31 delta_table = self._get_delta_table(self._output_spec) 32 if self._df.isStreaming: 33 stream_df = ( 34 self._df.writeStream.options( 35 **self._output_spec.options if self._output_spec.options else {} 36 ) 37 .foreachBatch( 38 self._write_transformed_micro_batch( 39 self._output_spec, self._data, delta_table 40 ) 41 ) 42 .trigger(**Writer.get_streaming_trigger(self._output_spec)) 43 .start() 44 ) 45 46 if self._output_spec.streaming_await_termination: 47 stream_df.awaitTermination( 48 self._output_spec.streaming_await_termination_timeout 49 ) 50 else: 51 DeltaMergeWriter._merge(delta_table, self._output_spec, self._df)
Merge new data with current data.