lakehouse_engine.io.readers.table_reader

Module to define behaviour to read from tables.

 1"""Module to define behaviour to read from tables."""
 2
 3from pyspark.sql import DataFrame
 4
 5from lakehouse_engine.core.definitions import InputSpec, ReadType
 6from lakehouse_engine.core.exec_env import ExecEnv
 7from lakehouse_engine.io.reader import Reader
 8
 9
10class TableReader(Reader):
11    """Class to read data from a table."""
12
13    def __init__(self, input_spec: InputSpec):
14        """Construct TableReader instances.
15
16        Args:
17            input_spec: input specification.
18        """
19        super().__init__(input_spec)
20
21    def read(self) -> DataFrame:
22        """Read data from a table.
23
24        Returns:
25            A dataframe containing the data from the table.
26        """
27        if self._input_spec.read_type == ReadType.BATCH.value:
28            return ExecEnv.SESSION.read.options(
29                **self._input_spec.options if self._input_spec.options else {}
30            ).table(self._input_spec.db_table)
31        elif self._input_spec.read_type == ReadType.STREAMING.value:
32            return ExecEnv.SESSION.readStream.options(
33                **self._input_spec.options if self._input_spec.options else {}
34            ).table(self._input_spec.db_table)
35        else:
36            self._logger.error("The requested read type is not supported.")
37            raise NotImplementedError
class TableReader(lakehouse_engine.io.reader.Reader):
11class TableReader(Reader):
12    """Class to read data from a table."""
13
14    def __init__(self, input_spec: InputSpec):
15        """Construct TableReader instances.
16
17        Args:
18            input_spec: input specification.
19        """
20        super().__init__(input_spec)
21
22    def read(self) -> DataFrame:
23        """Read data from a table.
24
25        Returns:
26            A dataframe containing the data from the table.
27        """
28        if self._input_spec.read_type == ReadType.BATCH.value:
29            return ExecEnv.SESSION.read.options(
30                **self._input_spec.options if self._input_spec.options else {}
31            ).table(self._input_spec.db_table)
32        elif self._input_spec.read_type == ReadType.STREAMING.value:
33            return ExecEnv.SESSION.readStream.options(
34                **self._input_spec.options if self._input_spec.options else {}
35            ).table(self._input_spec.db_table)
36        else:
37            self._logger.error("The requested read type is not supported.")
38            raise NotImplementedError

Class to read data from a table.

TableReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
14    def __init__(self, input_spec: InputSpec):
15        """Construct TableReader instances.
16
17        Args:
18            input_spec: input specification.
19        """
20        super().__init__(input_spec)

Construct TableReader instances.

Arguments:
  • input_spec: input specification.
def read(self) -> pyspark.sql.dataframe.DataFrame:
22    def read(self) -> DataFrame:
23        """Read data from a table.
24
25        Returns:
26            A dataframe containing the data from the table.
27        """
28        if self._input_spec.read_type == ReadType.BATCH.value:
29            return ExecEnv.SESSION.read.options(
30                **self._input_spec.options if self._input_spec.options else {}
31            ).table(self._input_spec.db_table)
32        elif self._input_spec.read_type == ReadType.STREAMING.value:
33            return ExecEnv.SESSION.readStream.options(
34                **self._input_spec.options if self._input_spec.options else {}
35            ).table(self._input_spec.db_table)
36        else:
37            self._logger.error("The requested read type is not supported.")
38            raise NotImplementedError

Read data from a table.

Returns:

A dataframe containing the data from the table.