classSFTPReader(Reader):"""Class to read from SFTP."""_logger:Logger=LoggingHandler(__name__).get_logger()def__init__(self,input_spec:InputSpec):"""Construct SFTPReader instances. Args: input_spec: input specification. """super().__init__(input_spec)defread(self)->DataFrame:"""Read SFTP data. Returns: A dataframe containing the data from SFTP. """ifself._input_spec.read_type==ReadType.BATCH.value:options_args=self._input_spec.optionsifself._input_spec.optionselse{}sftp_files_format=SFTPExtractionUtils.validate_format(self._input_spec.sftp_files_format.lower())location=SFTPExtractionUtils.validate_location(self._input_spec.location)sftp,transport=SFTPExtractionUtils.get_sftp_client(options_args)files_list=SFTPExtractionUtils.get_files_list(sftp,location,options_args)dfs:List[PandasDataFrame]=[]try:forfilenameinfiles_list:withsftp.open(filename,"r")assftp_file:try:pdf=self._read_files(filename,sftp_file,options_args.get("args",{}),sftp_files_format,)ifoptions_args.get("file_metadata",None):pdf["filename"]=filenamepdf["modification_time"]=datetime.fromtimestamp(sftp.stat(filename).st_mtime)self._append_files(pdf,dfs)exceptEmptyDataError:self._logger.info(f"{filename} - Empty or malformed file.")ifdfs:df=ExecEnv.SESSION.createDataFrame(pd.concat(dfs))else:raiseValueError("No files were found with the specified parameters.")finally:sftp.close()transport.close()else:raiseNotImplementedError("The requested read type supports only BATCH mode.")returndf@classmethoddef_append_files(cls,pdf:PandasDataFrame,dfs:List)->List:"""Append to the list dataframes with data. Args: pdf: a Pandas dataframe containing data from files. dfs: a list of Pandas dataframes. Returns: A list of not empty Pandas dataframes. """ifnotpdf.empty:dfs.append(pdf)returndfs@classmethoddef_read_files(cls,filename:str,sftp_file:SFTPFile,option_args:dict,files_format:str)->PandasDataFrame:"""Open and decompress files to be extracted from SFTP. For zip files, to avoid data type inferred issues during the iteration, all data will be read as string. Also, empty dataframes will NOT be considered to be processed. For the not considered ones, the file names will be logged. Args: filename: the filename to be read. sftp_file: SFTPFile object representing the open file. option_args: options from the acon. files_format: a string containing the file extension. Returns: A pandas dataframe with data from the file. """reader=getattr(pd,f"read_{files_format}")iffilename.endswith(".gz"):withgzip.GzipFile(fileobj=sftp_file,mode="rb")asgz_file:pdf=reader(TextIOWrapper(gz_file),# type: ignore**option_args,)eliffilename.endswith(".zip"):withZipFile(sftp_file,"r")aszf:# type: ignoredfs=[reader(TextIOWrapper(zf.open(f)),**option_args).fillna("")forfinzf.namelist()]ifnotpd.concat(dfs,ignore_index=True).empty:pdf=pd.concat(dfs,ignore_index=True).astype(str)else:pdf=pd.DataFrame()cls._logger.info(f"{filename} - Empty or malformed file.")else:pdf=reader(sftp_file,**option_args,)returnpdf
defread(self)->DataFrame:"""Read SFTP data. Returns: A dataframe containing the data from SFTP. """ifself._input_spec.read_type==ReadType.BATCH.value:options_args=self._input_spec.optionsifself._input_spec.optionselse{}sftp_files_format=SFTPExtractionUtils.validate_format(self._input_spec.sftp_files_format.lower())location=SFTPExtractionUtils.validate_location(self._input_spec.location)sftp,transport=SFTPExtractionUtils.get_sftp_client(options_args)files_list=SFTPExtractionUtils.get_files_list(sftp,location,options_args)dfs:List[PandasDataFrame]=[]try:forfilenameinfiles_list:withsftp.open(filename,"r")assftp_file:try:pdf=self._read_files(filename,sftp_file,options_args.get("args",{}),sftp_files_format,)ifoptions_args.get("file_metadata",None):pdf["filename"]=filenamepdf["modification_time"]=datetime.fromtimestamp(sftp.stat(filename).st_mtime)self._append_files(pdf,dfs)exceptEmptyDataError:self._logger.info(f"{filename} - Empty or malformed file.")ifdfs:df=ExecEnv.SESSION.createDataFrame(pd.concat(dfs))else:raiseValueError("No files were found with the specified parameters.")finally:sftp.close()transport.close()else:raiseNotImplementedError("The requested read type supports only BATCH mode.")returndf