lakehouse_engine.io.readers.file_reader

Module to define behaviour to read from files.

 1"""Module to define behaviour to read from files."""
 2
 3from pyspark.sql import DataFrame
 4
 5from lakehouse_engine.core.definitions import FILE_INPUT_FORMATS, InputSpec, ReadType
 6from lakehouse_engine.core.exec_env import ExecEnv
 7from lakehouse_engine.io.reader import Reader
 8from lakehouse_engine.utils.schema_utils import SchemaUtils
 9
10
11class FileReader(Reader):
12    """Class to read from files."""
13
14    def __init__(self, input_spec: InputSpec):
15        """Construct FileReader instances.
16
17        Args:
18            input_spec: input specification.
19        """
20        super().__init__(input_spec)
21
22    def read(self) -> DataFrame:
23        """Read file data.
24
25        Returns:
26            A dataframe containing the data from the files.
27        """
28        if (
29            self._input_spec.read_type == ReadType.BATCH.value
30            and self._input_spec.data_format in FILE_INPUT_FORMATS
31        ):
32            df = ExecEnv.SESSION.read.load(
33                path=self._input_spec.location,
34                format=self._input_spec.data_format,
35                schema=SchemaUtils.from_input_spec(self._input_spec),
36                **self._input_spec.options if self._input_spec.options else {},
37            )
38
39            if self._input_spec.with_filepath:
40                # _metadata contains hidden columns
41                df = df.selectExpr(
42                    "*", "_metadata.file_path as lhe_extraction_filepath"
43                )
44
45            return df
46        elif (
47            self._input_spec.read_type == ReadType.STREAMING.value
48            and self._input_spec.data_format in FILE_INPUT_FORMATS
49        ):
50            df = ExecEnv.SESSION.readStream.load(
51                path=self._input_spec.location,
52                format=self._input_spec.data_format,
53                schema=SchemaUtils.from_input_spec(self._input_spec),
54                **self._input_spec.options if self._input_spec.options else {},
55            )
56
57            if self._input_spec.with_filepath:
58                # _metadata contains hidden columns
59                df = df.selectExpr(
60                    "*", "_metadata.file_path as lhe_extraction_filepath"
61                )
62
63            return df
64        else:
65            raise NotImplementedError(
66                "The requested read type and format combination is not supported."
67            )
class FileReader(lakehouse_engine.io.reader.Reader):
12class FileReader(Reader):
13    """Class to read from files."""
14
15    def __init__(self, input_spec: InputSpec):
16        """Construct FileReader instances.
17
18        Args:
19            input_spec: input specification.
20        """
21        super().__init__(input_spec)
22
23    def read(self) -> DataFrame:
24        """Read file data.
25
26        Returns:
27            A dataframe containing the data from the files.
28        """
29        if (
30            self._input_spec.read_type == ReadType.BATCH.value
31            and self._input_spec.data_format in FILE_INPUT_FORMATS
32        ):
33            df = ExecEnv.SESSION.read.load(
34                path=self._input_spec.location,
35                format=self._input_spec.data_format,
36                schema=SchemaUtils.from_input_spec(self._input_spec),
37                **self._input_spec.options if self._input_spec.options else {},
38            )
39
40            if self._input_spec.with_filepath:
41                # _metadata contains hidden columns
42                df = df.selectExpr(
43                    "*", "_metadata.file_path as lhe_extraction_filepath"
44                )
45
46            return df
47        elif (
48            self._input_spec.read_type == ReadType.STREAMING.value
49            and self._input_spec.data_format in FILE_INPUT_FORMATS
50        ):
51            df = ExecEnv.SESSION.readStream.load(
52                path=self._input_spec.location,
53                format=self._input_spec.data_format,
54                schema=SchemaUtils.from_input_spec(self._input_spec),
55                **self._input_spec.options if self._input_spec.options else {},
56            )
57
58            if self._input_spec.with_filepath:
59                # _metadata contains hidden columns
60                df = df.selectExpr(
61                    "*", "_metadata.file_path as lhe_extraction_filepath"
62                )
63
64            return df
65        else:
66            raise NotImplementedError(
67                "The requested read type and format combination is not supported."
68            )

Class to read from files.

FileReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
15    def __init__(self, input_spec: InputSpec):
16        """Construct FileReader instances.
17
18        Args:
19            input_spec: input specification.
20        """
21        super().__init__(input_spec)

Construct FileReader instances.

Arguments:
  • input_spec: input specification.
def read(self) -> pyspark.sql.dataframe.DataFrame:
23    def read(self) -> DataFrame:
24        """Read file data.
25
26        Returns:
27            A dataframe containing the data from the files.
28        """
29        if (
30            self._input_spec.read_type == ReadType.BATCH.value
31            and self._input_spec.data_format in FILE_INPUT_FORMATS
32        ):
33            df = ExecEnv.SESSION.read.load(
34                path=self._input_spec.location,
35                format=self._input_spec.data_format,
36                schema=SchemaUtils.from_input_spec(self._input_spec),
37                **self._input_spec.options if self._input_spec.options else {},
38            )
39
40            if self._input_spec.with_filepath:
41                # _metadata contains hidden columns
42                df = df.selectExpr(
43                    "*", "_metadata.file_path as lhe_extraction_filepath"
44                )
45
46            return df
47        elif (
48            self._input_spec.read_type == ReadType.STREAMING.value
49            and self._input_spec.data_format in FILE_INPUT_FORMATS
50        ):
51            df = ExecEnv.SESSION.readStream.load(
52                path=self._input_spec.location,
53                format=self._input_spec.data_format,
54                schema=SchemaUtils.from_input_spec(self._input_spec),
55                **self._input_spec.options if self._input_spec.options else {},
56            )
57
58            if self._input_spec.with_filepath:
59                # _metadata contains hidden columns
60                df = df.selectExpr(
61                    "*", "_metadata.file_path as lhe_extraction_filepath"
62                )
63
64            return df
65        else:
66            raise NotImplementedError(
67                "The requested read type and format combination is not supported."
68            )

Read file data.

Returns:

A dataframe containing the data from the files.