lakehouse_engine.io.readers.kafka_reader
Module to define behaviour to read from Kafka.
1"""Module to define behaviour to read from Kafka.""" 2 3from pyspark.sql import DataFrame 4 5from lakehouse_engine.core.definitions import InputFormat, InputSpec 6from lakehouse_engine.core.exec_env import ExecEnv 7from lakehouse_engine.io.reader import Reader 8 9 10class KafkaReader(Reader): 11 """Class to read from Kafka.""" 12 13 def __init__(self, input_spec: InputSpec): 14 """Construct KafkaReader instances. 15 16 Args: 17 input_spec: input specification. 18 """ 19 super().__init__(input_spec) 20 21 def read(self) -> DataFrame: 22 """Read Kafka data. 23 24 Returns: 25 A dataframe containing the data from Kafka. 26 """ 27 df = ExecEnv.SESSION.readStream.load( 28 format=InputFormat.KAFKA.value, 29 **self._input_spec.options if self._input_spec.options else {}, 30 ) 31 32 return df
11class KafkaReader(Reader): 12 """Class to read from Kafka.""" 13 14 def __init__(self, input_spec: InputSpec): 15 """Construct KafkaReader instances. 16 17 Args: 18 input_spec: input specification. 19 """ 20 super().__init__(input_spec) 21 22 def read(self) -> DataFrame: 23 """Read Kafka data. 24 25 Returns: 26 A dataframe containing the data from Kafka. 27 """ 28 df = ExecEnv.SESSION.readStream.load( 29 format=InputFormat.KAFKA.value, 30 **self._input_spec.options if self._input_spec.options else {}, 31 ) 32 33 return df
Class to read from Kafka.
KafkaReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
14 def __init__(self, input_spec: InputSpec): 15 """Construct KafkaReader instances. 16 17 Args: 18 input_spec: input specification. 19 """ 20 super().__init__(input_spec)
Construct KafkaReader instances.
Arguments:
- input_spec: input specification.
def
read(self) -> pyspark.sql.dataframe.DataFrame:
22 def read(self) -> DataFrame: 23 """Read Kafka data. 24 25 Returns: 26 A dataframe containing the data from Kafka. 27 """ 28 df = ExecEnv.SESSION.readStream.load( 29 format=InputFormat.KAFKA.value, 30 **self._input_spec.options if self._input_spec.options else {}, 31 ) 32 33 return df
Read Kafka data.
Returns:
A dataframe containing the data from Kafka.