lakehouse_engine.io.reader_factory

Module for reader factory.

 1"""Module for reader factory."""
 2
 3from abc import ABC
 4
 5from pyspark.sql import DataFrame
 6
 7from lakehouse_engine.core.definitions import FILE_INPUT_FORMATS, InputFormat, InputSpec
 8from lakehouse_engine.io.readers.dataframe_reader import DataFrameReader
 9from lakehouse_engine.io.readers.file_reader import FileReader
10from lakehouse_engine.io.readers.jdbc_reader import JDBCReader
11from lakehouse_engine.io.readers.kafka_reader import KafkaReader
12from lakehouse_engine.io.readers.query_reader import QueryReader
13from lakehouse_engine.io.readers.sap_b4_reader import SAPB4Reader
14from lakehouse_engine.io.readers.sap_bw_reader import SAPBWReader
15from lakehouse_engine.io.readers.table_reader import TableReader
16
17
18class ReaderFactory(ABC):  # noqa: B024
19    """Class for reader factory."""
20
21    @classmethod
22    def get_data(cls, spec: InputSpec) -> DataFrame:
23        """Get data according to the input specification following a factory pattern.
24
25        Args:
26            spec: input specification to get the data.
27
28        Returns:
29            A dataframe containing the data.
30        """
31        if spec.db_table:
32            read_df = TableReader(input_spec=spec).read()
33        elif spec.data_format == InputFormat.JDBC.value:
34            read_df = JDBCReader(input_spec=spec).read()
35        elif spec.data_format in FILE_INPUT_FORMATS:
36            read_df = FileReader(input_spec=spec).read()
37        elif spec.data_format == InputFormat.KAFKA.value:
38            read_df = KafkaReader(input_spec=spec).read()
39        elif spec.data_format == InputFormat.SQL.value:
40            read_df = QueryReader(input_spec=spec).read()
41        elif spec.data_format == InputFormat.SAP_BW.value:
42            read_df = SAPBWReader(input_spec=spec).read()
43        elif spec.data_format == InputFormat.SAP_B4.value:
44            read_df = SAPB4Reader(input_spec=spec).read()
45        elif spec.data_format == InputFormat.DATAFRAME.value:
46            read_df = DataFrameReader(input_spec=spec).read()
47        elif spec.data_format == InputFormat.SFTP.value:
48            from lakehouse_engine.io.readers.sftp_reader import SFTPReader
49
50            read_df = SFTPReader(input_spec=spec).read()
51        else:
52            raise NotImplementedError(
53                f"The requested input spec format {spec.data_format} is not supported."
54            )
55
56        if spec.temp_view:
57            read_df.createOrReplaceTempView(spec.temp_view)
58
59        return read_df
class ReaderFactory(abc.ABC):
19class ReaderFactory(ABC):  # noqa: B024
20    """Class for reader factory."""
21
22    @classmethod
23    def get_data(cls, spec: InputSpec) -> DataFrame:
24        """Get data according to the input specification following a factory pattern.
25
26        Args:
27            spec: input specification to get the data.
28
29        Returns:
30            A dataframe containing the data.
31        """
32        if spec.db_table:
33            read_df = TableReader(input_spec=spec).read()
34        elif spec.data_format == InputFormat.JDBC.value:
35            read_df = JDBCReader(input_spec=spec).read()
36        elif spec.data_format in FILE_INPUT_FORMATS:
37            read_df = FileReader(input_spec=spec).read()
38        elif spec.data_format == InputFormat.KAFKA.value:
39            read_df = KafkaReader(input_spec=spec).read()
40        elif spec.data_format == InputFormat.SQL.value:
41            read_df = QueryReader(input_spec=spec).read()
42        elif spec.data_format == InputFormat.SAP_BW.value:
43            read_df = SAPBWReader(input_spec=spec).read()
44        elif spec.data_format == InputFormat.SAP_B4.value:
45            read_df = SAPB4Reader(input_spec=spec).read()
46        elif spec.data_format == InputFormat.DATAFRAME.value:
47            read_df = DataFrameReader(input_spec=spec).read()
48        elif spec.data_format == InputFormat.SFTP.value:
49            from lakehouse_engine.io.readers.sftp_reader import SFTPReader
50
51            read_df = SFTPReader(input_spec=spec).read()
52        else:
53            raise NotImplementedError(
54                f"The requested input spec format {spec.data_format} is not supported."
55            )
56
57        if spec.temp_view:
58            read_df.createOrReplaceTempView(spec.temp_view)
59
60        return read_df

Class for reader factory.

@classmethod
def get_data( cls, spec: lakehouse_engine.core.definitions.InputSpec) -> pyspark.sql.dataframe.DataFrame:
22    @classmethod
23    def get_data(cls, spec: InputSpec) -> DataFrame:
24        """Get data according to the input specification following a factory pattern.
25
26        Args:
27            spec: input specification to get the data.
28
29        Returns:
30            A dataframe containing the data.
31        """
32        if spec.db_table:
33            read_df = TableReader(input_spec=spec).read()
34        elif spec.data_format == InputFormat.JDBC.value:
35            read_df = JDBCReader(input_spec=spec).read()
36        elif spec.data_format in FILE_INPUT_FORMATS:
37            read_df = FileReader(input_spec=spec).read()
38        elif spec.data_format == InputFormat.KAFKA.value:
39            read_df = KafkaReader(input_spec=spec).read()
40        elif spec.data_format == InputFormat.SQL.value:
41            read_df = QueryReader(input_spec=spec).read()
42        elif spec.data_format == InputFormat.SAP_BW.value:
43            read_df = SAPBWReader(input_spec=spec).read()
44        elif spec.data_format == InputFormat.SAP_B4.value:
45            read_df = SAPB4Reader(input_spec=spec).read()
46        elif spec.data_format == InputFormat.DATAFRAME.value:
47            read_df = DataFrameReader(input_spec=spec).read()
48        elif spec.data_format == InputFormat.SFTP.value:
49            from lakehouse_engine.io.readers.sftp_reader import SFTPReader
50
51            read_df = SFTPReader(input_spec=spec).read()
52        else:
53            raise NotImplementedError(
54                f"The requested input spec format {spec.data_format} is not supported."
55            )
56
57        if spec.temp_view:
58            read_df.createOrReplaceTempView(spec.temp_view)
59
60        return read_df

Get data according to the input specification following a factory pattern.

Arguments:
  • spec: input specification to get the data.
Returns:

A dataframe containing the data.