lakehouse_engine.io.writers.table_writer
Module that defines the behaviour to write to tables.
1"""Module that defines the behaviour to write to tables.""" 2 3from typing import Any, Callable, OrderedDict 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.definitions import OutputFormat, OutputSpec 8from lakehouse_engine.core.exec_env import ExecEnv 9from lakehouse_engine.io.writer import Writer 10 11 12class TableWriter(Writer): 13 """Class to write to a table.""" 14 15 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 16 """Construct TableWriter instances. 17 18 Args: 19 output_spec: output specification. 20 df: dataframe to be written. 21 data: list of all dfs generated on previous steps before writer. 22 """ 23 super().__init__(output_spec, df, data) 24 25 def write(self) -> None: 26 """Write data to a table. 27 28 After the write operation we repair the table (e.g., update partitions). 29 However, there's a caveat to this, which is the fact that this repair 30 operation is not reachable if we are running long-running streaming mode. 31 Therefore, we recommend not using the TableWriter with formats other than 32 delta lake for those scenarios (as delta lake does not need msck repair). 33 So, you can: 1) use delta lake format for the table; 2) use the FileWriter 34 and run the repair with a certain frequency in a separate task of your 35 pipeline. 36 """ 37 if not self._df.isStreaming: 38 self._write_to_table_in_batch_mode(self._df, self._output_spec) 39 else: 40 df_writer = self._df.writeStream.trigger( 41 **Writer.get_streaming_trigger(self._output_spec) 42 ) 43 44 if ( 45 self._output_spec.streaming_micro_batch_transformers 46 or self._output_spec.streaming_micro_batch_dq_processors 47 ): 48 stream_df = ( 49 df_writer.options( 50 **self._output_spec.options if self._output_spec.options else {} 51 ) 52 .foreachBatch( 53 self._write_transformed_micro_batch( 54 self._output_spec, self._data 55 ) 56 ) 57 .start() 58 ) 59 60 if self._output_spec.streaming_await_termination: 61 stream_df.awaitTermination( 62 self._output_spec.streaming_await_termination_timeout 63 ) 64 else: 65 self._write_to_table_in_streaming_mode(df_writer, self._output_spec) 66 67 if ( 68 self._output_spec.data_format != OutputFormat.DELTAFILES.value 69 and self._output_spec.partitions 70 ): 71 ExecEnv.SESSION.sql(f"MSCK REPAIR TABLE {self._output_spec.db_table}") 72 73 @staticmethod 74 def _write_to_table_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 75 """Write to a metastore table in batch mode. 76 77 Args: 78 df: dataframe to write. 79 output_spec: output specification. 80 """ 81 df_writer = df.write.format(output_spec.data_format) 82 83 if output_spec.partitions: 84 df_writer = df_writer.partitionBy(output_spec.partitions) 85 86 if output_spec.location: 87 df_writer = df_writer.options( 88 path=output_spec.location, 89 **output_spec.options if output_spec.options else {}, 90 ) 91 else: 92 df_writer = df_writer.options( 93 **output_spec.options if output_spec.options else {} 94 ) 95 96 df_writer.mode(output_spec.write_type).saveAsTable(output_spec.db_table) 97 98 @staticmethod 99 def _write_to_table_in_streaming_mode( 100 df_writer: Any, output_spec: OutputSpec 101 ) -> None: 102 """Write to a metastore table in streaming mode. 103 104 Args: 105 df_writer: dataframe writer. 106 output_spec: output specification. 107 """ 108 df_writer = df_writer.outputMode(output_spec.write_type).format( 109 output_spec.data_format 110 ) 111 112 if output_spec.partitions: 113 df_writer = df_writer.partitionBy(output_spec.partitions) 114 115 if output_spec.location: 116 df_writer = df_writer.options( 117 path=output_spec.location, 118 **output_spec.options if output_spec.options else {}, 119 ) 120 else: 121 df_writer = df_writer.options( 122 **output_spec.options if output_spec.options else {} 123 ) 124 125 if output_spec.streaming_await_termination: 126 df_writer.toTable(output_spec.db_table).awaitTermination( 127 output_spec.streaming_await_termination_timeout 128 ) 129 else: 130 df_writer.toTable(output_spec.db_table) 131 132 @staticmethod 133 def _write_transformed_micro_batch( # type: ignore 134 output_spec: OutputSpec, data: OrderedDict 135 ) -> Callable: 136 """Define how to write a streaming micro batch after transforming it. 137 138 Args: 139 output_spec: output specification. 140 data: list of all dfs generated on previous steps before writer. 141 142 Returns: 143 A function to be executed in the foreachBatch spark write method. 144 """ 145 146 def inner(batch_df: DataFrame, batch_id: int) -> None: 147 transformed_df = Writer.get_transformed_micro_batch( 148 output_spec, batch_df, batch_id, data 149 ) 150 151 if output_spec.streaming_micro_batch_dq_processors: 152 transformed_df = Writer.run_micro_batch_dq_process( 153 transformed_df, output_spec.streaming_micro_batch_dq_processors 154 ) 155 156 TableWriter._write_to_table_in_batch_mode(transformed_df, output_spec) 157 158 return inner
13class TableWriter(Writer): 14 """Class to write to a table.""" 15 16 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 17 """Construct TableWriter instances. 18 19 Args: 20 output_spec: output specification. 21 df: dataframe to be written. 22 data: list of all dfs generated on previous steps before writer. 23 """ 24 super().__init__(output_spec, df, data) 25 26 def write(self) -> None: 27 """Write data to a table. 28 29 After the write operation we repair the table (e.g., update partitions). 30 However, there's a caveat to this, which is the fact that this repair 31 operation is not reachable if we are running long-running streaming mode. 32 Therefore, we recommend not using the TableWriter with formats other than 33 delta lake for those scenarios (as delta lake does not need msck repair). 34 So, you can: 1) use delta lake format for the table; 2) use the FileWriter 35 and run the repair with a certain frequency in a separate task of your 36 pipeline. 37 """ 38 if not self._df.isStreaming: 39 self._write_to_table_in_batch_mode(self._df, self._output_spec) 40 else: 41 df_writer = self._df.writeStream.trigger( 42 **Writer.get_streaming_trigger(self._output_spec) 43 ) 44 45 if ( 46 self._output_spec.streaming_micro_batch_transformers 47 or self._output_spec.streaming_micro_batch_dq_processors 48 ): 49 stream_df = ( 50 df_writer.options( 51 **self._output_spec.options if self._output_spec.options else {} 52 ) 53 .foreachBatch( 54 self._write_transformed_micro_batch( 55 self._output_spec, self._data 56 ) 57 ) 58 .start() 59 ) 60 61 if self._output_spec.streaming_await_termination: 62 stream_df.awaitTermination( 63 self._output_spec.streaming_await_termination_timeout 64 ) 65 else: 66 self._write_to_table_in_streaming_mode(df_writer, self._output_spec) 67 68 if ( 69 self._output_spec.data_format != OutputFormat.DELTAFILES.value 70 and self._output_spec.partitions 71 ): 72 ExecEnv.SESSION.sql(f"MSCK REPAIR TABLE {self._output_spec.db_table}") 73 74 @staticmethod 75 def _write_to_table_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 76 """Write to a metastore table in batch mode. 77 78 Args: 79 df: dataframe to write. 80 output_spec: output specification. 81 """ 82 df_writer = df.write.format(output_spec.data_format) 83 84 if output_spec.partitions: 85 df_writer = df_writer.partitionBy(output_spec.partitions) 86 87 if output_spec.location: 88 df_writer = df_writer.options( 89 path=output_spec.location, 90 **output_spec.options if output_spec.options else {}, 91 ) 92 else: 93 df_writer = df_writer.options( 94 **output_spec.options if output_spec.options else {} 95 ) 96 97 df_writer.mode(output_spec.write_type).saveAsTable(output_spec.db_table) 98 99 @staticmethod 100 def _write_to_table_in_streaming_mode( 101 df_writer: Any, output_spec: OutputSpec 102 ) -> None: 103 """Write to a metastore table in streaming mode. 104 105 Args: 106 df_writer: dataframe writer. 107 output_spec: output specification. 108 """ 109 df_writer = df_writer.outputMode(output_spec.write_type).format( 110 output_spec.data_format 111 ) 112 113 if output_spec.partitions: 114 df_writer = df_writer.partitionBy(output_spec.partitions) 115 116 if output_spec.location: 117 df_writer = df_writer.options( 118 path=output_spec.location, 119 **output_spec.options if output_spec.options else {}, 120 ) 121 else: 122 df_writer = df_writer.options( 123 **output_spec.options if output_spec.options else {} 124 ) 125 126 if output_spec.streaming_await_termination: 127 df_writer.toTable(output_spec.db_table).awaitTermination( 128 output_spec.streaming_await_termination_timeout 129 ) 130 else: 131 df_writer.toTable(output_spec.db_table) 132 133 @staticmethod 134 def _write_transformed_micro_batch( # type: ignore 135 output_spec: OutputSpec, data: OrderedDict 136 ) -> Callable: 137 """Define how to write a streaming micro batch after transforming it. 138 139 Args: 140 output_spec: output specification. 141 data: list of all dfs generated on previous steps before writer. 142 143 Returns: 144 A function to be executed in the foreachBatch spark write method. 145 """ 146 147 def inner(batch_df: DataFrame, batch_id: int) -> None: 148 transformed_df = Writer.get_transformed_micro_batch( 149 output_spec, batch_df, batch_id, data 150 ) 151 152 if output_spec.streaming_micro_batch_dq_processors: 153 transformed_df = Writer.run_micro_batch_dq_process( 154 transformed_df, output_spec.streaming_micro_batch_dq_processors 155 ) 156 157 TableWriter._write_to_table_in_batch_mode(transformed_df, output_spec) 158 159 return inner
Class to write to a table.
TableWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
16 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 17 """Construct TableWriter instances. 18 19 Args: 20 output_spec: output specification. 21 df: dataframe to be written. 22 data: list of all dfs generated on previous steps before writer. 23 """ 24 super().__init__(output_spec, df, data)
Construct TableWriter instances.
Arguments:
- output_spec: output specification.
- df: dataframe to be written.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> None:
26 def write(self) -> None: 27 """Write data to a table. 28 29 After the write operation we repair the table (e.g., update partitions). 30 However, there's a caveat to this, which is the fact that this repair 31 operation is not reachable if we are running long-running streaming mode. 32 Therefore, we recommend not using the TableWriter with formats other than 33 delta lake for those scenarios (as delta lake does not need msck repair). 34 So, you can: 1) use delta lake format for the table; 2) use the FileWriter 35 and run the repair with a certain frequency in a separate task of your 36 pipeline. 37 """ 38 if not self._df.isStreaming: 39 self._write_to_table_in_batch_mode(self._df, self._output_spec) 40 else: 41 df_writer = self._df.writeStream.trigger( 42 **Writer.get_streaming_trigger(self._output_spec) 43 ) 44 45 if ( 46 self._output_spec.streaming_micro_batch_transformers 47 or self._output_spec.streaming_micro_batch_dq_processors 48 ): 49 stream_df = ( 50 df_writer.options( 51 **self._output_spec.options if self._output_spec.options else {} 52 ) 53 .foreachBatch( 54 self._write_transformed_micro_batch( 55 self._output_spec, self._data 56 ) 57 ) 58 .start() 59 ) 60 61 if self._output_spec.streaming_await_termination: 62 stream_df.awaitTermination( 63 self._output_spec.streaming_await_termination_timeout 64 ) 65 else: 66 self._write_to_table_in_streaming_mode(df_writer, self._output_spec) 67 68 if ( 69 self._output_spec.data_format != OutputFormat.DELTAFILES.value 70 and self._output_spec.partitions 71 ): 72 ExecEnv.SESSION.sql(f"MSCK REPAIR TABLE {self._output_spec.db_table}")
Write data to a table.
After the write operation we repair the table (e.g., update partitions). However, there's a caveat to this, which is the fact that this repair operation is not reachable if we are running long-running streaming mode. Therefore, we recommend not using the TableWriter with formats other than delta lake for those scenarios (as delta lake does not need msck repair). So, you can: 1) use delta lake format for the table; 2) use the FileWriter and run the repair with a certain frequency in a separate task of your pipeline.