Skip to content

File reader

Module to define behaviour to read from files.

FileReader

Bases: Reader

Class to read from files.

Source code in mkdocs/lakehouse_engine/packages/io/readers/file_reader.py
class FileReader(Reader):
    """Class to read from files."""

    def __init__(self, input_spec: InputSpec):
        """Construct FileReader instances.

        Args:
            input_spec: input specification.
        """
        super().__init__(input_spec)

    def read(self) -> DataFrame:
        """Read file data.

        Returns:
            A dataframe containing the data from the files.
        """
        if (
            self._input_spec.read_type == ReadType.BATCH.value
            and self._input_spec.data_format in FILE_INPUT_FORMATS
        ):
            df = ExecEnv.SESSION.read.load(
                path=self._input_spec.location,
                format=self._input_spec.data_format,
                schema=SchemaUtils.from_input_spec(self._input_spec),
                **self._input_spec.options if self._input_spec.options else {},
            )

            if self._input_spec.with_filepath:
                # _metadata contains hidden columns
                df = df.selectExpr(
                    "*", "_metadata.file_path as lhe_extraction_filepath"
                )

            return df
        elif (
            self._input_spec.read_type == ReadType.STREAMING.value
            and self._input_spec.data_format in FILE_INPUT_FORMATS
        ):
            df = ExecEnv.SESSION.readStream.load(
                path=self._input_spec.location,
                format=self._input_spec.data_format,
                schema=SchemaUtils.from_input_spec(self._input_spec),
                **self._input_spec.options if self._input_spec.options else {},
            )

            if self._input_spec.with_filepath:
                # _metadata contains hidden columns
                df = df.selectExpr(
                    "*", "_metadata.file_path as lhe_extraction_filepath"
                )

            return df
        else:
            raise NotImplementedError(
                "The requested read type and format combination is not supported."
            )

__init__(input_spec)

Construct FileReader instances.

Parameters:

Name Type Description Default
input_spec InputSpec

input specification.

required
Source code in mkdocs/lakehouse_engine/packages/io/readers/file_reader.py
def __init__(self, input_spec: InputSpec):
    """Construct FileReader instances.

    Args:
        input_spec: input specification.
    """
    super().__init__(input_spec)

read()

Read file data.

Returns:

Type Description
DataFrame

A dataframe containing the data from the files.

Source code in mkdocs/lakehouse_engine/packages/io/readers/file_reader.py
def read(self) -> DataFrame:
    """Read file data.

    Returns:
        A dataframe containing the data from the files.
    """
    if (
        self._input_spec.read_type == ReadType.BATCH.value
        and self._input_spec.data_format in FILE_INPUT_FORMATS
    ):
        df = ExecEnv.SESSION.read.load(
            path=self._input_spec.location,
            format=self._input_spec.data_format,
            schema=SchemaUtils.from_input_spec(self._input_spec),
            **self._input_spec.options if self._input_spec.options else {},
        )

        if self._input_spec.with_filepath:
            # _metadata contains hidden columns
            df = df.selectExpr(
                "*", "_metadata.file_path as lhe_extraction_filepath"
            )

        return df
    elif (
        self._input_spec.read_type == ReadType.STREAMING.value
        and self._input_spec.data_format in FILE_INPUT_FORMATS
    ):
        df = ExecEnv.SESSION.readStream.load(
            path=self._input_spec.location,
            format=self._input_spec.data_format,
            schema=SchemaUtils.from_input_spec(self._input_spec),
            **self._input_spec.options if self._input_spec.options else {},
        )

        if self._input_spec.with_filepath:
            # _metadata contains hidden columns
            df = df.selectExpr(
                "*", "_metadata.file_path as lhe_extraction_filepath"
            )

        return df
    else:
        raise NotImplementedError(
            "The requested read type and format combination is not supported."
        )