lakehouse_engine.io.readers.kafka_reader

Module to define behaviour to read from Kafka.

 1"""Module to define behaviour to read from Kafka."""
 2
 3from pyspark.sql import DataFrame
 4
 5from lakehouse_engine.core.definitions import InputFormat, InputSpec
 6from lakehouse_engine.core.exec_env import ExecEnv
 7from lakehouse_engine.io.reader import Reader
 8
 9
10class KafkaReader(Reader):
11    """Class to read from Kafka."""
12
13    def __init__(self, input_spec: InputSpec):
14        """Construct KafkaReader instances.
15
16        Args:
17            input_spec: input specification.
18        """
19        super().__init__(input_spec)
20
21    def read(self) -> DataFrame:
22        """Read Kafka data.
23
24        Returns:
25            A dataframe containing the data from Kafka.
26        """
27        df = ExecEnv.SESSION.readStream.load(
28            format=InputFormat.KAFKA.value,
29            **self._input_spec.options if self._input_spec.options else {},
30        )
31
32        return df
class KafkaReader(lakehouse_engine.io.reader.Reader):
11class KafkaReader(Reader):
12    """Class to read from Kafka."""
13
14    def __init__(self, input_spec: InputSpec):
15        """Construct KafkaReader instances.
16
17        Args:
18            input_spec: input specification.
19        """
20        super().__init__(input_spec)
21
22    def read(self) -> DataFrame:
23        """Read Kafka data.
24
25        Returns:
26            A dataframe containing the data from Kafka.
27        """
28        df = ExecEnv.SESSION.readStream.load(
29            format=InputFormat.KAFKA.value,
30            **self._input_spec.options if self._input_spec.options else {},
31        )
32
33        return df

Class to read from Kafka.

KafkaReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
14    def __init__(self, input_spec: InputSpec):
15        """Construct KafkaReader instances.
16
17        Args:
18            input_spec: input specification.
19        """
20        super().__init__(input_spec)

Construct KafkaReader instances.

Arguments:
  • input_spec: input specification.
def read(self) -> pyspark.sql.dataframe.DataFrame:
22    def read(self) -> DataFrame:
23        """Read Kafka data.
24
25        Returns:
26            A dataframe containing the data from Kafka.
27        """
28        df = ExecEnv.SESSION.readStream.load(
29            format=InputFormat.KAFKA.value,
30            **self._input_spec.options if self._input_spec.options else {},
31        )
32
33        return df

Read Kafka data.

Returns:

A dataframe containing the data from Kafka.