lakehouse_engine.io.writers.kafka_writer
Module that defines the behaviour to write to Kafka.
1"""Module that defines the behaviour to write to Kafka.""" 2 3from typing import Callable, OrderedDict 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.definitions import OutputSpec 8from lakehouse_engine.io.writer import Writer 9 10 11class KafkaWriter(Writer): 12 """Class to write to a Kafka target.""" 13 14 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 15 """Construct KafkaWriter instances. 16 17 Args: 18 output_spec: output specification. 19 df: dataframe to be written. 20 data: list of all dfs generated on previous steps before writer. 21 """ 22 super().__init__(output_spec, df, data) 23 24 def write(self) -> None: 25 """Write data to Kafka.""" 26 if not self._df.isStreaming: 27 self._write_to_kafka_in_batch_mode(self._df, self._output_spec) 28 else: 29 self._write_to_kafka_in_streaming_mode( 30 self._df, self._output_spec, self._data 31 ) 32 33 @staticmethod 34 def _write_to_kafka_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 35 """Write to Kafka in batch mode. 36 37 Args: 38 df: dataframe to write. 39 output_spec: output specification. 40 """ 41 df.write.format(output_spec.data_format).options( 42 **output_spec.options if output_spec.options else {} 43 ).mode(output_spec.write_type).save() 44 45 @staticmethod 46 def _write_to_kafka_in_streaming_mode( 47 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 48 ) -> None: 49 """Write to kafka in streaming mode. 50 51 Args: 52 df: dataframe to write. 53 output_spec: output specification. 54 data: list of all dfs generated on previous steps before writer. 55 """ 56 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 57 58 if ( 59 output_spec.streaming_micro_batch_transformers 60 or output_spec.streaming_micro_batch_dq_processors 61 ): 62 stream_df = ( 63 df_writer.options(**output_spec.options if output_spec.options else {}) 64 .foreachBatch( 65 KafkaWriter._write_transformed_micro_batch(output_spec, data) 66 ) 67 .start() 68 ) 69 else: 70 stream_df = ( 71 df_writer.format(output_spec.data_format) 72 .options(**output_spec.options if output_spec.options else {}) 73 .start() 74 ) 75 76 if output_spec.streaming_await_termination: 77 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 78 79 @staticmethod 80 def _write_transformed_micro_batch( # type: ignore 81 output_spec: OutputSpec, data: OrderedDict 82 ) -> Callable: 83 """Define how to write a streaming micro batch after transforming it. 84 85 Args: 86 output_spec: output specification. 87 data: list of all dfs generated on previous steps before writer. 88 89 Returns: 90 A function to be executed in the foreachBatch spark write method. 91 """ 92 93 def inner(batch_df: DataFrame, batch_id: int) -> None: 94 transformed_df = Writer.get_transformed_micro_batch( 95 output_spec, batch_df, batch_id, data 96 ) 97 98 if output_spec.streaming_micro_batch_dq_processors: 99 transformed_df = Writer.run_micro_batch_dq_process( 100 transformed_df, output_spec.streaming_micro_batch_dq_processors 101 ) 102 103 KafkaWriter._write_to_kafka_in_batch_mode(transformed_df, output_spec) 104 105 return inner
12class KafkaWriter(Writer): 13 """Class to write to a Kafka target.""" 14 15 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 16 """Construct KafkaWriter instances. 17 18 Args: 19 output_spec: output specification. 20 df: dataframe to be written. 21 data: list of all dfs generated on previous steps before writer. 22 """ 23 super().__init__(output_spec, df, data) 24 25 def write(self) -> None: 26 """Write data to Kafka.""" 27 if not self._df.isStreaming: 28 self._write_to_kafka_in_batch_mode(self._df, self._output_spec) 29 else: 30 self._write_to_kafka_in_streaming_mode( 31 self._df, self._output_spec, self._data 32 ) 33 34 @staticmethod 35 def _write_to_kafka_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None: 36 """Write to Kafka in batch mode. 37 38 Args: 39 df: dataframe to write. 40 output_spec: output specification. 41 """ 42 df.write.format(output_spec.data_format).options( 43 **output_spec.options if output_spec.options else {} 44 ).mode(output_spec.write_type).save() 45 46 @staticmethod 47 def _write_to_kafka_in_streaming_mode( 48 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 49 ) -> None: 50 """Write to kafka in streaming mode. 51 52 Args: 53 df: dataframe to write. 54 output_spec: output specification. 55 data: list of all dfs generated on previous steps before writer. 56 """ 57 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 58 59 if ( 60 output_spec.streaming_micro_batch_transformers 61 or output_spec.streaming_micro_batch_dq_processors 62 ): 63 stream_df = ( 64 df_writer.options(**output_spec.options if output_spec.options else {}) 65 .foreachBatch( 66 KafkaWriter._write_transformed_micro_batch(output_spec, data) 67 ) 68 .start() 69 ) 70 else: 71 stream_df = ( 72 df_writer.format(output_spec.data_format) 73 .options(**output_spec.options if output_spec.options else {}) 74 .start() 75 ) 76 77 if output_spec.streaming_await_termination: 78 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 79 80 @staticmethod 81 def _write_transformed_micro_batch( # type: ignore 82 output_spec: OutputSpec, data: OrderedDict 83 ) -> Callable: 84 """Define how to write a streaming micro batch after transforming it. 85 86 Args: 87 output_spec: output specification. 88 data: list of all dfs generated on previous steps before writer. 89 90 Returns: 91 A function to be executed in the foreachBatch spark write method. 92 """ 93 94 def inner(batch_df: DataFrame, batch_id: int) -> None: 95 transformed_df = Writer.get_transformed_micro_batch( 96 output_spec, batch_df, batch_id, data 97 ) 98 99 if output_spec.streaming_micro_batch_dq_processors: 100 transformed_df = Writer.run_micro_batch_dq_process( 101 transformed_df, output_spec.streaming_micro_batch_dq_processors 102 ) 103 104 KafkaWriter._write_to_kafka_in_batch_mode(transformed_df, output_spec) 105 106 return inner
Class to write to a Kafka target.
KafkaWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
15 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 16 """Construct KafkaWriter instances. 17 18 Args: 19 output_spec: output specification. 20 df: dataframe to be written. 21 data: list of all dfs generated on previous steps before writer. 22 """ 23 super().__init__(output_spec, df, data)
Construct KafkaWriter instances.
Arguments:
- output_spec: output specification.
- df: dataframe to be written.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> None:
25 def write(self) -> None: 26 """Write data to Kafka.""" 27 if not self._df.isStreaming: 28 self._write_to_kafka_in_batch_mode(self._df, self._output_spec) 29 else: 30 self._write_to_kafka_in_streaming_mode( 31 self._df, self._output_spec, self._data 32 )
Write data to Kafka.