lakehouse_engine.io.readers.table_reader
Module to define behaviour to read from tables.
1"""Module to define behaviour to read from tables.""" 2 3from pyspark.sql import DataFrame 4 5from lakehouse_engine.core.definitions import InputSpec, ReadType 6from lakehouse_engine.core.exec_env import ExecEnv 7from lakehouse_engine.io.reader import Reader 8 9 10class TableReader(Reader): 11 """Class to read data from a table.""" 12 13 def __init__(self, input_spec: InputSpec): 14 """Construct TableReader instances. 15 16 Args: 17 input_spec: input specification. 18 """ 19 super().__init__(input_spec) 20 21 def read(self) -> DataFrame: 22 """Read data from a table. 23 24 Returns: 25 A dataframe containing the data from the table. 26 """ 27 if self._input_spec.read_type == ReadType.BATCH.value: 28 return ExecEnv.SESSION.read.options( 29 **self._input_spec.options if self._input_spec.options else {} 30 ).table(self._input_spec.db_table) 31 elif self._input_spec.read_type == ReadType.STREAMING.value: 32 return ExecEnv.SESSION.readStream.options( 33 **self._input_spec.options if self._input_spec.options else {} 34 ).table(self._input_spec.db_table) 35 else: 36 self._logger.error("The requested read type is not supported.") 37 raise NotImplementedError
11class TableReader(Reader): 12 """Class to read data from a table.""" 13 14 def __init__(self, input_spec: InputSpec): 15 """Construct TableReader instances. 16 17 Args: 18 input_spec: input specification. 19 """ 20 super().__init__(input_spec) 21 22 def read(self) -> DataFrame: 23 """Read data from a table. 24 25 Returns: 26 A dataframe containing the data from the table. 27 """ 28 if self._input_spec.read_type == ReadType.BATCH.value: 29 return ExecEnv.SESSION.read.options( 30 **self._input_spec.options if self._input_spec.options else {} 31 ).table(self._input_spec.db_table) 32 elif self._input_spec.read_type == ReadType.STREAMING.value: 33 return ExecEnv.SESSION.readStream.options( 34 **self._input_spec.options if self._input_spec.options else {} 35 ).table(self._input_spec.db_table) 36 else: 37 self._logger.error("The requested read type is not supported.") 38 raise NotImplementedError
Class to read data from a table.
TableReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
14 def __init__(self, input_spec: InputSpec): 15 """Construct TableReader instances. 16 17 Args: 18 input_spec: input specification. 19 """ 20 super().__init__(input_spec)
Construct TableReader instances.
Arguments:
- input_spec: input specification.
def
read(self) -> pyspark.sql.dataframe.DataFrame:
22 def read(self) -> DataFrame: 23 """Read data from a table. 24 25 Returns: 26 A dataframe containing the data from the table. 27 """ 28 if self._input_spec.read_type == ReadType.BATCH.value: 29 return ExecEnv.SESSION.read.options( 30 **self._input_spec.options if self._input_spec.options else {} 31 ).table(self._input_spec.db_table) 32 elif self._input_spec.read_type == ReadType.STREAMING.value: 33 return ExecEnv.SESSION.readStream.options( 34 **self._input_spec.options if self._input_spec.options else {} 35 ).table(self._input_spec.db_table) 36 else: 37 self._logger.error("The requested read type is not supported.") 38 raise NotImplementedError
Read data from a table.
Returns:
A dataframe containing the data from the table.