lakehouse_engine.io.writers.jdbc_writer
Module that defines the behaviour to write to JDBC targets.
1"""Module that defines the behaviour to write to JDBC targets.""" 2 3from typing import Callable, OrderedDict 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.definitions import OutputSpec 8from lakehouse_engine.io.writer import Writer 9 10 11class JDBCWriter(Writer): 12 """Class to write to JDBC targets.""" 13 14 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 15 """Construct JDBCWriter instances. 16 17 Args: 18 output_spec: output specification. 19 df: dataframe to be writen. 20 data: list of all dfs generated on previous steps before writer. 21 """ 22 super().__init__(output_spec, df, data) 23 24 def write(self) -> None: 25 """Write data into JDBC target.""" 26 if not self._df.isStreaming: 27 self._write_to_jdbc_in_batch_mode(self._df, self._output_spec) 28 else: 29 stream_df = ( 30 self._df.writeStream.trigger( 31 **Writer.get_streaming_trigger(self._output_spec) 32 ) 33 .options( 34 **self._output_spec.options if self._output_spec.options else {} 35 ) 36 .foreachBatch( 37 self._write_transformed_micro_batch(self._output_spec, self._data) 38 ) 39 .start() 40 ) 41 42 if self._output_spec.streaming_await_termination: 43 stream_df.awaitTermination( 44 self._output_spec.streaming_await_termination_timeout 45 ) 46 47 @staticmethod 48 def _write_to_jdbc_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 49 """Write to jdbc in batch mode. 50 51 Args: 52 df: dataframe to write. 53 output_spec: output specification. 54 """ 55 df.write.format(output_spec.data_format).partitionBy( 56 output_spec.partitions 57 ).options(**output_spec.options if output_spec.options else {}).mode( 58 output_spec.write_type 59 ).save( 60 output_spec.location 61 ) 62 63 @staticmethod 64 def _write_transformed_micro_batch( # type: ignore 65 output_spec: OutputSpec, data: OrderedDict 66 ) -> Callable: 67 """Define how to write a streaming micro batch after transforming it. 68 69 Args: 70 output_spec: output specification. 71 data: list of all dfs generated on previous steps before writer. 72 73 Returns: 74 A function to be executed in the foreachBatch spark write method. 75 """ 76 77 def inner(batch_df: DataFrame, batch_id: int) -> None: 78 transformed_df = Writer.get_transformed_micro_batch( 79 output_spec, batch_df, batch_id, data 80 ) 81 82 if output_spec.streaming_micro_batch_dq_processors: 83 transformed_df = Writer.run_micro_batch_dq_process( 84 transformed_df, output_spec.streaming_micro_batch_dq_processors 85 ) 86 87 JDBCWriter._write_to_jdbc_in_batch_mode(transformed_df, output_spec) 88 89 return inner
12class JDBCWriter(Writer): 13 """Class to write to JDBC targets.""" 14 15 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 16 """Construct JDBCWriter instances. 17 18 Args: 19 output_spec: output specification. 20 df: dataframe to be writen. 21 data: list of all dfs generated on previous steps before writer. 22 """ 23 super().__init__(output_spec, df, data) 24 25 def write(self) -> None: 26 """Write data into JDBC target.""" 27 if not self._df.isStreaming: 28 self._write_to_jdbc_in_batch_mode(self._df, self._output_spec) 29 else: 30 stream_df = ( 31 self._df.writeStream.trigger( 32 **Writer.get_streaming_trigger(self._output_spec) 33 ) 34 .options( 35 **self._output_spec.options if self._output_spec.options else {} 36 ) 37 .foreachBatch( 38 self._write_transformed_micro_batch(self._output_spec, self._data) 39 ) 40 .start() 41 ) 42 43 if self._output_spec.streaming_await_termination: 44 stream_df.awaitTermination( 45 self._output_spec.streaming_await_termination_timeout 46 ) 47 48 @staticmethod 49 def _write_to_jdbc_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 50 """Write to jdbc in batch mode. 51 52 Args: 53 df: dataframe to write. 54 output_spec: output specification. 55 """ 56 df.write.format(output_spec.data_format).partitionBy( 57 output_spec.partitions 58 ).options(**output_spec.options if output_spec.options else {}).mode( 59 output_spec.write_type 60 ).save( 61 output_spec.location 62 ) 63 64 @staticmethod 65 def _write_transformed_micro_batch( # type: ignore 66 output_spec: OutputSpec, data: OrderedDict 67 ) -> Callable: 68 """Define how to write a streaming micro batch after transforming it. 69 70 Args: 71 output_spec: output specification. 72 data: list of all dfs generated on previous steps before writer. 73 74 Returns: 75 A function to be executed in the foreachBatch spark write method. 76 """ 77 78 def inner(batch_df: DataFrame, batch_id: int) -> None: 79 transformed_df = Writer.get_transformed_micro_batch( 80 output_spec, batch_df, batch_id, data 81 ) 82 83 if output_spec.streaming_micro_batch_dq_processors: 84 transformed_df = Writer.run_micro_batch_dq_process( 85 transformed_df, output_spec.streaming_micro_batch_dq_processors 86 ) 87 88 JDBCWriter._write_to_jdbc_in_batch_mode(transformed_df, output_spec) 89 90 return inner
Class to write to JDBC targets.
JDBCWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
15 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 16 """Construct JDBCWriter instances. 17 18 Args: 19 output_spec: output specification. 20 df: dataframe to be writen. 21 data: list of all dfs generated on previous steps before writer. 22 """ 23 super().__init__(output_spec, df, data)
Construct JDBCWriter instances.
Arguments:
- output_spec: output specification.
- df: dataframe to be writen.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> None:
25 def write(self) -> None: 26 """Write data into JDBC target.""" 27 if not self._df.isStreaming: 28 self._write_to_jdbc_in_batch_mode(self._df, self._output_spec) 29 else: 30 stream_df = ( 31 self._df.writeStream.trigger( 32 **Writer.get_streaming_trigger(self._output_spec) 33 ) 34 .options( 35 **self._output_spec.options if self._output_spec.options else {} 36 ) 37 .foreachBatch( 38 self._write_transformed_micro_batch(self._output_spec, self._data) 39 ) 40 .start() 41 ) 42 43 if self._output_spec.streaming_await_termination: 44 stream_df.awaitTermination( 45 self._output_spec.streaming_await_termination_timeout 46 )
Write data into JDBC target.