lakehouse_engine.io.reader_factory
Module for reader factory.
1"""Module for reader factory.""" 2 3from abc import ABC 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.definitions import FILE_INPUT_FORMATS, InputFormat, InputSpec 8from lakehouse_engine.io.readers.dataframe_reader import DataFrameReader 9from lakehouse_engine.io.readers.file_reader import FileReader 10from lakehouse_engine.io.readers.jdbc_reader import JDBCReader 11from lakehouse_engine.io.readers.kafka_reader import KafkaReader 12from lakehouse_engine.io.readers.query_reader import QueryReader 13from lakehouse_engine.io.readers.sap_b4_reader import SAPB4Reader 14from lakehouse_engine.io.readers.sap_bw_reader import SAPBWReader 15from lakehouse_engine.io.readers.table_reader import TableReader 16 17 18class ReaderFactory(ABC): # noqa: B024 19 """Class for reader factory.""" 20 21 @classmethod 22 def get_data(cls, spec: InputSpec) -> DataFrame: 23 """Get data according to the input specification following a factory pattern. 24 25 Args: 26 spec: input specification to get the data. 27 28 Returns: 29 A dataframe containing the data. 30 """ 31 if spec.db_table: 32 read_df = TableReader(input_spec=spec).read() 33 elif spec.data_format == InputFormat.JDBC.value: 34 read_df = JDBCReader(input_spec=spec).read() 35 elif spec.data_format in FILE_INPUT_FORMATS: 36 read_df = FileReader(input_spec=spec).read() 37 elif spec.data_format == InputFormat.KAFKA.value: 38 read_df = KafkaReader(input_spec=spec).read() 39 elif spec.data_format == InputFormat.SQL.value: 40 read_df = QueryReader(input_spec=spec).read() 41 elif spec.data_format == InputFormat.SAP_BW.value: 42 read_df = SAPBWReader(input_spec=spec).read() 43 elif spec.data_format == InputFormat.SAP_B4.value: 44 read_df = SAPB4Reader(input_spec=spec).read() 45 elif spec.data_format == InputFormat.DATAFRAME.value: 46 read_df = DataFrameReader(input_spec=spec).read() 47 elif spec.data_format == InputFormat.SFTP.value: 48 from lakehouse_engine.io.readers.sftp_reader import SFTPReader 49 50 read_df = SFTPReader(input_spec=spec).read() 51 else: 52 raise NotImplementedError( 53 f"The requested input spec format {spec.data_format} is not supported." 54 ) 55 56 if spec.temp_view: 57 read_df.createOrReplaceTempView(spec.temp_view) 58 59 return read_df
class
ReaderFactory(abc.ABC):
19class ReaderFactory(ABC): # noqa: B024 20 """Class for reader factory.""" 21 22 @classmethod 23 def get_data(cls, spec: InputSpec) -> DataFrame: 24 """Get data according to the input specification following a factory pattern. 25 26 Args: 27 spec: input specification to get the data. 28 29 Returns: 30 A dataframe containing the data. 31 """ 32 if spec.db_table: 33 read_df = TableReader(input_spec=spec).read() 34 elif spec.data_format == InputFormat.JDBC.value: 35 read_df = JDBCReader(input_spec=spec).read() 36 elif spec.data_format in FILE_INPUT_FORMATS: 37 read_df = FileReader(input_spec=spec).read() 38 elif spec.data_format == InputFormat.KAFKA.value: 39 read_df = KafkaReader(input_spec=spec).read() 40 elif spec.data_format == InputFormat.SQL.value: 41 read_df = QueryReader(input_spec=spec).read() 42 elif spec.data_format == InputFormat.SAP_BW.value: 43 read_df = SAPBWReader(input_spec=spec).read() 44 elif spec.data_format == InputFormat.SAP_B4.value: 45 read_df = SAPB4Reader(input_spec=spec).read() 46 elif spec.data_format == InputFormat.DATAFRAME.value: 47 read_df = DataFrameReader(input_spec=spec).read() 48 elif spec.data_format == InputFormat.SFTP.value: 49 from lakehouse_engine.io.readers.sftp_reader import SFTPReader 50 51 read_df = SFTPReader(input_spec=spec).read() 52 else: 53 raise NotImplementedError( 54 f"The requested input spec format {spec.data_format} is not supported." 55 ) 56 57 if spec.temp_view: 58 read_df.createOrReplaceTempView(spec.temp_view) 59 60 return read_df
Class for reader factory.
@classmethod
def
get_data( cls, spec: lakehouse_engine.core.definitions.InputSpec) -> pyspark.sql.dataframe.DataFrame:
22 @classmethod 23 def get_data(cls, spec: InputSpec) -> DataFrame: 24 """Get data according to the input specification following a factory pattern. 25 26 Args: 27 spec: input specification to get the data. 28 29 Returns: 30 A dataframe containing the data. 31 """ 32 if spec.db_table: 33 read_df = TableReader(input_spec=spec).read() 34 elif spec.data_format == InputFormat.JDBC.value: 35 read_df = JDBCReader(input_spec=spec).read() 36 elif spec.data_format in FILE_INPUT_FORMATS: 37 read_df = FileReader(input_spec=spec).read() 38 elif spec.data_format == InputFormat.KAFKA.value: 39 read_df = KafkaReader(input_spec=spec).read() 40 elif spec.data_format == InputFormat.SQL.value: 41 read_df = QueryReader(input_spec=spec).read() 42 elif spec.data_format == InputFormat.SAP_BW.value: 43 read_df = SAPBWReader(input_spec=spec).read() 44 elif spec.data_format == InputFormat.SAP_B4.value: 45 read_df = SAPB4Reader(input_spec=spec).read() 46 elif spec.data_format == InputFormat.DATAFRAME.value: 47 read_df = DataFrameReader(input_spec=spec).read() 48 elif spec.data_format == InputFormat.SFTP.value: 49 from lakehouse_engine.io.readers.sftp_reader import SFTPReader 50 51 read_df = SFTPReader(input_spec=spec).read() 52 else: 53 raise NotImplementedError( 54 f"The requested input spec format {spec.data_format} is not supported." 55 ) 56 57 if spec.temp_view: 58 read_df.createOrReplaceTempView(spec.temp_view) 59 60 return read_df
Get data according to the input specification following a factory pattern.
Arguments:
- spec: input specification to get the data.
Returns:
A dataframe containing the data.