lakehouse_engine.io.readers.sftp_reader
Module to define behaviour to read from SFTP.
1"""Module to define behaviour to read from SFTP.""" 2 3import gzip 4from datetime import datetime 5from io import TextIOWrapper 6from logging import Logger 7from typing import List 8from zipfile import ZipFile 9 10import pandas as pd 11from pandas import DataFrame as PandasDataFrame 12from pandas.errors import EmptyDataError 13from paramiko.sftp_file import SFTPFile 14from pyspark.sql import DataFrame 15 16from lakehouse_engine.core.definitions import InputSpec, ReadType 17from lakehouse_engine.core.exec_env import ExecEnv 18from lakehouse_engine.io.reader import Reader 19from lakehouse_engine.utils.extraction.sftp_extraction_utils import SFTPExtractionUtils 20from lakehouse_engine.utils.logging_handler import LoggingHandler 21 22 23class SFTPReader(Reader): 24 """Class to read from SFTP.""" 25 26 _logger: Logger = LoggingHandler(__name__).get_logger() 27 28 def __init__(self, input_spec: InputSpec): 29 """Construct SFTPReader instances. 30 31 Args: 32 input_spec: input specification. 33 """ 34 super().__init__(input_spec) 35 36 def read(self) -> DataFrame: 37 """Read SFTP data. 38 39 Returns: 40 A dataframe containing the data from SFTP. 41 """ 42 if self._input_spec.read_type == ReadType.BATCH.value: 43 options_args = self._input_spec.options if self._input_spec.options else {} 44 45 sftp_files_format = SFTPExtractionUtils.validate_format( 46 self._input_spec.sftp_files_format.lower() 47 ) 48 49 location = SFTPExtractionUtils.validate_location(self._input_spec.location) 50 51 sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args) 52 53 files_list = SFTPExtractionUtils.get_files_list( 54 sftp, location, options_args 55 ) 56 57 dfs: List[PandasDataFrame] = [] 58 try: 59 for filename in files_list: 60 with sftp.open(filename, "r") as sftp_file: 61 try: 62 pdf = self._read_files( 63 filename, 64 sftp_file, 65 options_args.get("args", {}), 66 sftp_files_format, 67 ) 68 if options_args.get("file_metadata", None): 69 pdf["filename"] = filename 70 pdf["modification_time"] = datetime.fromtimestamp( 71 sftp.stat(filename).st_mtime 72 ) 73 self._append_files(pdf, dfs) 74 except EmptyDataError: 75 self._logger.info(f"{filename} - Empty or malformed file.") 76 if dfs: 77 df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs)) 78 else: 79 raise ValueError( 80 "No files were found with the specified parameters." 81 ) 82 finally: 83 sftp.close() 84 transport.close() 85 else: 86 raise NotImplementedError( 87 "The requested read type supports only BATCH mode." 88 ) 89 return df 90 91 @classmethod 92 def _append_files(cls, pdf: PandasDataFrame, dfs: List) -> List: 93 """Append to the list dataframes with data. 94 95 Args: 96 pdf: a Pandas dataframe containing data from files. 97 dfs: a list of Pandas dataframes. 98 99 Returns: 100 A list of not empty Pandas dataframes. 101 """ 102 if not pdf.empty: 103 dfs.append(pdf) 104 return dfs 105 106 @classmethod 107 def _read_files( 108 cls, filename: str, sftp_file: SFTPFile, option_args: dict, files_format: str 109 ) -> PandasDataFrame: 110 """Open and decompress files to be extracted from SFTP. 111 112 For zip files, to avoid data type inferred issues 113 during the iteration, all data will be read as string. 114 Also, empty dataframes will NOT be considered to be processed. 115 For the not considered ones, the file names will be logged. 116 117 Args: 118 filename: the filename to be read. 119 sftp_file: SFTPFile object representing the open file. 120 option_args: options from the acon. 121 files_format: a string containing the file extension. 122 123 Returns: 124 A pandas dataframe with data from the file. 125 """ 126 reader = getattr(pd, f"read_{files_format}") 127 128 if filename.endswith(".gz"): 129 with gzip.GzipFile(fileobj=sftp_file, mode="rb") as gz_file: 130 pdf = reader( 131 TextIOWrapper(gz_file), # type: ignore 132 **option_args, 133 ) 134 elif filename.endswith(".zip"): 135 with ZipFile(sftp_file, "r") as zf: # type: ignore 136 dfs = [ 137 reader(TextIOWrapper(zf.open(f)), **option_args).fillna("") 138 for f in zf.namelist() 139 ] 140 if not pd.concat(dfs, ignore_index=True).empty: 141 pdf = pd.concat(dfs, ignore_index=True).astype(str) 142 else: 143 pdf = pd.DataFrame() 144 cls._logger.info(f"{filename} - Empty or malformed file.") 145 else: 146 pdf = reader( 147 sftp_file, 148 **option_args, 149 ) 150 return pdf
24class SFTPReader(Reader): 25 """Class to read from SFTP.""" 26 27 _logger: Logger = LoggingHandler(__name__).get_logger() 28 29 def __init__(self, input_spec: InputSpec): 30 """Construct SFTPReader instances. 31 32 Args: 33 input_spec: input specification. 34 """ 35 super().__init__(input_spec) 36 37 def read(self) -> DataFrame: 38 """Read SFTP data. 39 40 Returns: 41 A dataframe containing the data from SFTP. 42 """ 43 if self._input_spec.read_type == ReadType.BATCH.value: 44 options_args = self._input_spec.options if self._input_spec.options else {} 45 46 sftp_files_format = SFTPExtractionUtils.validate_format( 47 self._input_spec.sftp_files_format.lower() 48 ) 49 50 location = SFTPExtractionUtils.validate_location(self._input_spec.location) 51 52 sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args) 53 54 files_list = SFTPExtractionUtils.get_files_list( 55 sftp, location, options_args 56 ) 57 58 dfs: List[PandasDataFrame] = [] 59 try: 60 for filename in files_list: 61 with sftp.open(filename, "r") as sftp_file: 62 try: 63 pdf = self._read_files( 64 filename, 65 sftp_file, 66 options_args.get("args", {}), 67 sftp_files_format, 68 ) 69 if options_args.get("file_metadata", None): 70 pdf["filename"] = filename 71 pdf["modification_time"] = datetime.fromtimestamp( 72 sftp.stat(filename).st_mtime 73 ) 74 self._append_files(pdf, dfs) 75 except EmptyDataError: 76 self._logger.info(f"{filename} - Empty or malformed file.") 77 if dfs: 78 df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs)) 79 else: 80 raise ValueError( 81 "No files were found with the specified parameters." 82 ) 83 finally: 84 sftp.close() 85 transport.close() 86 else: 87 raise NotImplementedError( 88 "The requested read type supports only BATCH mode." 89 ) 90 return df 91 92 @classmethod 93 def _append_files(cls, pdf: PandasDataFrame, dfs: List) -> List: 94 """Append to the list dataframes with data. 95 96 Args: 97 pdf: a Pandas dataframe containing data from files. 98 dfs: a list of Pandas dataframes. 99 100 Returns: 101 A list of not empty Pandas dataframes. 102 """ 103 if not pdf.empty: 104 dfs.append(pdf) 105 return dfs 106 107 @classmethod 108 def _read_files( 109 cls, filename: str, sftp_file: SFTPFile, option_args: dict, files_format: str 110 ) -> PandasDataFrame: 111 """Open and decompress files to be extracted from SFTP. 112 113 For zip files, to avoid data type inferred issues 114 during the iteration, all data will be read as string. 115 Also, empty dataframes will NOT be considered to be processed. 116 For the not considered ones, the file names will be logged. 117 118 Args: 119 filename: the filename to be read. 120 sftp_file: SFTPFile object representing the open file. 121 option_args: options from the acon. 122 files_format: a string containing the file extension. 123 124 Returns: 125 A pandas dataframe with data from the file. 126 """ 127 reader = getattr(pd, f"read_{files_format}") 128 129 if filename.endswith(".gz"): 130 with gzip.GzipFile(fileobj=sftp_file, mode="rb") as gz_file: 131 pdf = reader( 132 TextIOWrapper(gz_file), # type: ignore 133 **option_args, 134 ) 135 elif filename.endswith(".zip"): 136 with ZipFile(sftp_file, "r") as zf: # type: ignore 137 dfs = [ 138 reader(TextIOWrapper(zf.open(f)), **option_args).fillna("") 139 for f in zf.namelist() 140 ] 141 if not pd.concat(dfs, ignore_index=True).empty: 142 pdf = pd.concat(dfs, ignore_index=True).astype(str) 143 else: 144 pdf = pd.DataFrame() 145 cls._logger.info(f"{filename} - Empty or malformed file.") 146 else: 147 pdf = reader( 148 sftp_file, 149 **option_args, 150 ) 151 return pdf
Class to read from SFTP.
SFTPReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
29 def __init__(self, input_spec: InputSpec): 30 """Construct SFTPReader instances. 31 32 Args: 33 input_spec: input specification. 34 """ 35 super().__init__(input_spec)
Construct SFTPReader instances.
Arguments:
- input_spec: input specification.
def
read(self) -> pyspark.sql.dataframe.DataFrame:
37 def read(self) -> DataFrame: 38 """Read SFTP data. 39 40 Returns: 41 A dataframe containing the data from SFTP. 42 """ 43 if self._input_spec.read_type == ReadType.BATCH.value: 44 options_args = self._input_spec.options if self._input_spec.options else {} 45 46 sftp_files_format = SFTPExtractionUtils.validate_format( 47 self._input_spec.sftp_files_format.lower() 48 ) 49 50 location = SFTPExtractionUtils.validate_location(self._input_spec.location) 51 52 sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args) 53 54 files_list = SFTPExtractionUtils.get_files_list( 55 sftp, location, options_args 56 ) 57 58 dfs: List[PandasDataFrame] = [] 59 try: 60 for filename in files_list: 61 with sftp.open(filename, "r") as sftp_file: 62 try: 63 pdf = self._read_files( 64 filename, 65 sftp_file, 66 options_args.get("args", {}), 67 sftp_files_format, 68 ) 69 if options_args.get("file_metadata", None): 70 pdf["filename"] = filename 71 pdf["modification_time"] = datetime.fromtimestamp( 72 sftp.stat(filename).st_mtime 73 ) 74 self._append_files(pdf, dfs) 75 except EmptyDataError: 76 self._logger.info(f"{filename} - Empty or malformed file.") 77 if dfs: 78 df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs)) 79 else: 80 raise ValueError( 81 "No files were found with the specified parameters." 82 ) 83 finally: 84 sftp.close() 85 transport.close() 86 else: 87 raise NotImplementedError( 88 "The requested read type supports only BATCH mode." 89 ) 90 return df
Read SFTP data.
Returns:
A dataframe containing the data from SFTP.