lakehouse_engine.io.readers.file_reader
Module to define behaviour to read from files.
1"""Module to define behaviour to read from files.""" 2 3from pyspark.sql import DataFrame 4 5from lakehouse_engine.core.definitions import FILE_INPUT_FORMATS, InputSpec, ReadType 6from lakehouse_engine.core.exec_env import ExecEnv 7from lakehouse_engine.io.reader import Reader 8from lakehouse_engine.utils.schema_utils import SchemaUtils 9 10 11class FileReader(Reader): 12 """Class to read from files.""" 13 14 def __init__(self, input_spec: InputSpec): 15 """Construct FileReader instances. 16 17 Args: 18 input_spec: input specification. 19 """ 20 super().__init__(input_spec) 21 22 def read(self) -> DataFrame: 23 """Read file data. 24 25 Returns: 26 A dataframe containing the data from the files. 27 """ 28 if ( 29 self._input_spec.read_type == ReadType.BATCH.value 30 and self._input_spec.data_format in FILE_INPUT_FORMATS 31 ): 32 df = ExecEnv.SESSION.read.load( 33 path=self._input_spec.location, 34 format=self._input_spec.data_format, 35 schema=SchemaUtils.from_input_spec(self._input_spec), 36 **self._input_spec.options if self._input_spec.options else {}, 37 ) 38 39 if self._input_spec.with_filepath: 40 # _metadata contains hidden columns 41 df = df.selectExpr( 42 "*", "_metadata.file_path as lhe_extraction_filepath" 43 ) 44 45 return df 46 elif ( 47 self._input_spec.read_type == ReadType.STREAMING.value 48 and self._input_spec.data_format in FILE_INPUT_FORMATS 49 ): 50 df = ExecEnv.SESSION.readStream.load( 51 path=self._input_spec.location, 52 format=self._input_spec.data_format, 53 schema=SchemaUtils.from_input_spec(self._input_spec), 54 **self._input_spec.options if self._input_spec.options else {}, 55 ) 56 57 if self._input_spec.with_filepath: 58 # _metadata contains hidden columns 59 df = df.selectExpr( 60 "*", "_metadata.file_path as lhe_extraction_filepath" 61 ) 62 63 return df 64 else: 65 raise NotImplementedError( 66 "The requested read type and format combination is not supported." 67 )
12class FileReader(Reader): 13 """Class to read from files.""" 14 15 def __init__(self, input_spec: InputSpec): 16 """Construct FileReader instances. 17 18 Args: 19 input_spec: input specification. 20 """ 21 super().__init__(input_spec) 22 23 def read(self) -> DataFrame: 24 """Read file data. 25 26 Returns: 27 A dataframe containing the data from the files. 28 """ 29 if ( 30 self._input_spec.read_type == ReadType.BATCH.value 31 and self._input_spec.data_format in FILE_INPUT_FORMATS 32 ): 33 df = ExecEnv.SESSION.read.load( 34 path=self._input_spec.location, 35 format=self._input_spec.data_format, 36 schema=SchemaUtils.from_input_spec(self._input_spec), 37 **self._input_spec.options if self._input_spec.options else {}, 38 ) 39 40 if self._input_spec.with_filepath: 41 # _metadata contains hidden columns 42 df = df.selectExpr( 43 "*", "_metadata.file_path as lhe_extraction_filepath" 44 ) 45 46 return df 47 elif ( 48 self._input_spec.read_type == ReadType.STREAMING.value 49 and self._input_spec.data_format in FILE_INPUT_FORMATS 50 ): 51 df = ExecEnv.SESSION.readStream.load( 52 path=self._input_spec.location, 53 format=self._input_spec.data_format, 54 schema=SchemaUtils.from_input_spec(self._input_spec), 55 **self._input_spec.options if self._input_spec.options else {}, 56 ) 57 58 if self._input_spec.with_filepath: 59 # _metadata contains hidden columns 60 df = df.selectExpr( 61 "*", "_metadata.file_path as lhe_extraction_filepath" 62 ) 63 64 return df 65 else: 66 raise NotImplementedError( 67 "The requested read type and format combination is not supported." 68 )
Class to read from files.
FileReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
15 def __init__(self, input_spec: InputSpec): 16 """Construct FileReader instances. 17 18 Args: 19 input_spec: input specification. 20 """ 21 super().__init__(input_spec)
Construct FileReader instances.
Arguments:
- input_spec: input specification.
def
read(self) -> pyspark.sql.dataframe.DataFrame:
23 def read(self) -> DataFrame: 24 """Read file data. 25 26 Returns: 27 A dataframe containing the data from the files. 28 """ 29 if ( 30 self._input_spec.read_type == ReadType.BATCH.value 31 and self._input_spec.data_format in FILE_INPUT_FORMATS 32 ): 33 df = ExecEnv.SESSION.read.load( 34 path=self._input_spec.location, 35 format=self._input_spec.data_format, 36 schema=SchemaUtils.from_input_spec(self._input_spec), 37 **self._input_spec.options if self._input_spec.options else {}, 38 ) 39 40 if self._input_spec.with_filepath: 41 # _metadata contains hidden columns 42 df = df.selectExpr( 43 "*", "_metadata.file_path as lhe_extraction_filepath" 44 ) 45 46 return df 47 elif ( 48 self._input_spec.read_type == ReadType.STREAMING.value 49 and self._input_spec.data_format in FILE_INPUT_FORMATS 50 ): 51 df = ExecEnv.SESSION.readStream.load( 52 path=self._input_spec.location, 53 format=self._input_spec.data_format, 54 schema=SchemaUtils.from_input_spec(self._input_spec), 55 **self._input_spec.options if self._input_spec.options else {}, 56 ) 57 58 if self._input_spec.with_filepath: 59 # _metadata contains hidden columns 60 df = df.selectExpr( 61 "*", "_metadata.file_path as lhe_extraction_filepath" 62 ) 63 64 return df 65 else: 66 raise NotImplementedError( 67 "The requested read type and format combination is not supported." 68 )
Read file data.
Returns:
A dataframe containing the data from the files.