lakehouse_engine.io.writers.kafka_writer

Module that defines the behaviour to write to Kafka.

  1"""Module that defines the behaviour to write to Kafka."""
  2
  3from typing import Callable, OrderedDict
  4
  5from pyspark.sql import DataFrame
  6
  7from lakehouse_engine.core.definitions import OutputSpec
  8from lakehouse_engine.io.writer import Writer
  9
 10
 11class KafkaWriter(Writer):
 12    """Class to write to a Kafka target."""
 13
 14    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 15        """Construct KafkaWriter instances.
 16
 17        Args:
 18            output_spec: output specification.
 19            df: dataframe to be written.
 20            data: list of all dfs generated on previous steps before writer.
 21        """
 22        super().__init__(output_spec, df, data)
 23
 24    def write(self) -> None:
 25        """Write data to Kafka."""
 26        if not self._df.isStreaming:
 27            self._write_to_kafka_in_batch_mode(self._df, self._output_spec)
 28        else:
 29            self._write_to_kafka_in_streaming_mode(
 30                self._df, self._output_spec, self._data
 31            )
 32
 33    @staticmethod
 34    def _write_to_kafka_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
 35        """Write to Kafka in batch mode.
 36
 37        Args:
 38            df: dataframe to write.
 39            output_spec: output specification.
 40        """
 41        df.write.format(output_spec.data_format).options(
 42            **output_spec.options if output_spec.options else {}
 43        ).mode(output_spec.write_type).save()
 44
 45    @staticmethod
 46    def _write_to_kafka_in_streaming_mode(
 47        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 48    ) -> None:
 49        """Write to kafka in streaming mode.
 50
 51        Args:
 52            df: dataframe to write.
 53            output_spec: output specification.
 54            data: list of all dfs generated on previous steps before writer.
 55        """
 56        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
 57
 58        if (
 59            output_spec.streaming_micro_batch_transformers
 60            or output_spec.streaming_micro_batch_dq_processors
 61        ):
 62            stream_df = (
 63                df_writer.options(**output_spec.options if output_spec.options else {})
 64                .foreachBatch(
 65                    KafkaWriter._write_transformed_micro_batch(output_spec, data)
 66                )
 67                .start()
 68            )
 69        else:
 70            stream_df = (
 71                df_writer.format(output_spec.data_format)
 72                .options(**output_spec.options if output_spec.options else {})
 73                .start()
 74            )
 75
 76        if output_spec.streaming_await_termination:
 77            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
 78
 79    @staticmethod
 80    def _write_transformed_micro_batch(  # type: ignore
 81        output_spec: OutputSpec, data: OrderedDict
 82    ) -> Callable:
 83        """Define how to write a streaming micro batch after transforming it.
 84
 85        Args:
 86            output_spec: output specification.
 87            data: list of all dfs generated on previous steps before writer.
 88
 89        Returns:
 90            A function to be executed in the foreachBatch spark write method.
 91        """
 92
 93        def inner(batch_df: DataFrame, batch_id: int) -> None:
 94            transformed_df = Writer.get_transformed_micro_batch(
 95                output_spec, batch_df, batch_id, data
 96            )
 97
 98            if output_spec.streaming_micro_batch_dq_processors:
 99                transformed_df = Writer.run_micro_batch_dq_process(
100                    transformed_df, output_spec.streaming_micro_batch_dq_processors
101                )
102
103            KafkaWriter._write_to_kafka_in_batch_mode(transformed_df, output_spec)
104
105        return inner
class KafkaWriter(lakehouse_engine.io.writer.Writer):
 12class KafkaWriter(Writer):
 13    """Class to write to a Kafka target."""
 14
 15    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 16        """Construct KafkaWriter instances.
 17
 18        Args:
 19            output_spec: output specification.
 20            df: dataframe to be written.
 21            data: list of all dfs generated on previous steps before writer.
 22        """
 23        super().__init__(output_spec, df, data)
 24
 25    def write(self) -> None:
 26        """Write data to Kafka."""
 27        if not self._df.isStreaming:
 28            self._write_to_kafka_in_batch_mode(self._df, self._output_spec)
 29        else:
 30            self._write_to_kafka_in_streaming_mode(
 31                self._df, self._output_spec, self._data
 32            )
 33
 34    @staticmethod
 35    def _write_to_kafka_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
 36        """Write to Kafka in batch mode.
 37
 38        Args:
 39            df: dataframe to write.
 40            output_spec: output specification.
 41        """
 42        df.write.format(output_spec.data_format).options(
 43            **output_spec.options if output_spec.options else {}
 44        ).mode(output_spec.write_type).save()
 45
 46    @staticmethod
 47    def _write_to_kafka_in_streaming_mode(
 48        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 49    ) -> None:
 50        """Write to kafka in streaming mode.
 51
 52        Args:
 53            df: dataframe to write.
 54            output_spec: output specification.
 55            data: list of all dfs generated on previous steps before writer.
 56        """
 57        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
 58
 59        if (
 60            output_spec.streaming_micro_batch_transformers
 61            or output_spec.streaming_micro_batch_dq_processors
 62        ):
 63            stream_df = (
 64                df_writer.options(**output_spec.options if output_spec.options else {})
 65                .foreachBatch(
 66                    KafkaWriter._write_transformed_micro_batch(output_spec, data)
 67                )
 68                .start()
 69            )
 70        else:
 71            stream_df = (
 72                df_writer.format(output_spec.data_format)
 73                .options(**output_spec.options if output_spec.options else {})
 74                .start()
 75            )
 76
 77        if output_spec.streaming_await_termination:
 78            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
 79
 80    @staticmethod
 81    def _write_transformed_micro_batch(  # type: ignore
 82        output_spec: OutputSpec, data: OrderedDict
 83    ) -> Callable:
 84        """Define how to write a streaming micro batch after transforming it.
 85
 86        Args:
 87            output_spec: output specification.
 88            data: list of all dfs generated on previous steps before writer.
 89
 90        Returns:
 91            A function to be executed in the foreachBatch spark write method.
 92        """
 93
 94        def inner(batch_df: DataFrame, batch_id: int) -> None:
 95            transformed_df = Writer.get_transformed_micro_batch(
 96                output_spec, batch_df, batch_id, data
 97            )
 98
 99            if output_spec.streaming_micro_batch_dq_processors:
100                transformed_df = Writer.run_micro_batch_dq_process(
101                    transformed_df, output_spec.streaming_micro_batch_dq_processors
102                )
103
104            KafkaWriter._write_to_kafka_in_batch_mode(transformed_df, output_spec)
105
106        return inner

Class to write to a Kafka target.

KafkaWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
15    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
16        """Construct KafkaWriter instances.
17
18        Args:
19            output_spec: output specification.
20            df: dataframe to be written.
21            data: list of all dfs generated on previous steps before writer.
22        """
23        super().__init__(output_spec, df, data)

Construct KafkaWriter instances.

Arguments:
  • output_spec: output specification.
  • df: dataframe to be written.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> None:
25    def write(self) -> None:
26        """Write data to Kafka."""
27        if not self._df.isStreaming:
28            self._write_to_kafka_in_batch_mode(self._df, self._output_spec)
29        else:
30            self._write_to_kafka_in_streaming_mode(
31                self._df, self._output_spec, self._data
32            )

Write data to Kafka.