Skip to content

Reader factory

Module for reader factory.

ReaderFactory

Bases: ABC

Class for reader factory.

Source code in mkdocs/lakehouse_engine/packages/io/reader_factory.py
class ReaderFactory(ABC):  # noqa: B024
    """Class for reader factory."""

    @classmethod
    def get_data(cls, spec: InputSpec) -> DataFrame:
        """Get data according to the input specification following a factory pattern.

        Args:
            spec: input specification to get the data.

        Returns:
            A dataframe containing the data.
        """
        if spec.db_table:
            read_df = TableReader(input_spec=spec).read()
        elif spec.data_format == InputFormat.JDBC.value:
            read_df = JDBCReader(input_spec=spec).read()
        elif spec.data_format in FILE_INPUT_FORMATS:
            read_df = FileReader(input_spec=spec).read()
        elif spec.data_format == InputFormat.KAFKA.value:
            read_df = KafkaReader(input_spec=spec).read()
        elif spec.data_format == InputFormat.SQL.value:
            read_df = QueryReader(input_spec=spec).read()
        elif spec.data_format == InputFormat.SAP_BW.value:
            read_df = SAPBWReader(input_spec=spec).read()
        elif spec.data_format == InputFormat.SAP_B4.value:
            read_df = SAPB4Reader(input_spec=spec).read()
        elif spec.data_format == InputFormat.DATAFRAME.value:
            read_df = DataFrameReader(input_spec=spec).read()
        elif spec.data_format == InputFormat.SFTP.value:
            from lakehouse_engine.io.readers.sftp_reader import SFTPReader

            read_df = SFTPReader(input_spec=spec).read()
        else:
            raise NotImplementedError(
                f"The requested input spec format {spec.data_format} is not supported."
            )

        if spec.temp_view:
            read_df.createOrReplaceTempView(spec.temp_view)

        return read_df

get_data(spec) classmethod

Get data according to the input specification following a factory pattern.

Parameters:

Name Type Description Default
spec InputSpec

input specification to get the data.

required

Returns:

Type Description
DataFrame

A dataframe containing the data.

Source code in mkdocs/lakehouse_engine/packages/io/reader_factory.py
@classmethod
def get_data(cls, spec: InputSpec) -> DataFrame:
    """Get data according to the input specification following a factory pattern.

    Args:
        spec: input specification to get the data.

    Returns:
        A dataframe containing the data.
    """
    if spec.db_table:
        read_df = TableReader(input_spec=spec).read()
    elif spec.data_format == InputFormat.JDBC.value:
        read_df = JDBCReader(input_spec=spec).read()
    elif spec.data_format in FILE_INPUT_FORMATS:
        read_df = FileReader(input_spec=spec).read()
    elif spec.data_format == InputFormat.KAFKA.value:
        read_df = KafkaReader(input_spec=spec).read()
    elif spec.data_format == InputFormat.SQL.value:
        read_df = QueryReader(input_spec=spec).read()
    elif spec.data_format == InputFormat.SAP_BW.value:
        read_df = SAPBWReader(input_spec=spec).read()
    elif spec.data_format == InputFormat.SAP_B4.value:
        read_df = SAPB4Reader(input_spec=spec).read()
    elif spec.data_format == InputFormat.DATAFRAME.value:
        read_df = DataFrameReader(input_spec=spec).read()
    elif spec.data_format == InputFormat.SFTP.value:
        from lakehouse_engine.io.readers.sftp_reader import SFTPReader

        read_df = SFTPReader(input_spec=spec).read()
    else:
        raise NotImplementedError(
            f"The requested input spec format {spec.data_format} is not supported."
        )

    if spec.temp_view:
        read_df.createOrReplaceTempView(spec.temp_view)

    return read_df