lakehouse_engine.io.readers.sftp_reader

Module to define behaviour to read from SFTP.

  1"""Module to define behaviour to read from SFTP."""
  2
  3import gzip
  4from datetime import datetime
  5from io import TextIOWrapper
  6from logging import Logger
  7from typing import List
  8from zipfile import ZipFile
  9
 10import pandas as pd
 11from pandas import DataFrame as PandasDataFrame
 12from pandas.errors import EmptyDataError
 13from paramiko.sftp_file import SFTPFile
 14from pyspark.sql import DataFrame
 15
 16from lakehouse_engine.core.definitions import InputSpec, ReadType
 17from lakehouse_engine.core.exec_env import ExecEnv
 18from lakehouse_engine.io.reader import Reader
 19from lakehouse_engine.utils.extraction.sftp_extraction_utils import SFTPExtractionUtils
 20from lakehouse_engine.utils.logging_handler import LoggingHandler
 21
 22
 23class SFTPReader(Reader):
 24    """Class to read from SFTP."""
 25
 26    _logger: Logger = LoggingHandler(__name__).get_logger()
 27
 28    def __init__(self, input_spec: InputSpec):
 29        """Construct SFTPReader instances.
 30
 31        Args:
 32            input_spec: input specification.
 33        """
 34        super().__init__(input_spec)
 35
 36    def read(self) -> DataFrame:
 37        """Read SFTP data.
 38
 39        Returns:
 40            A dataframe containing the data from SFTP.
 41        """
 42        if self._input_spec.read_type == ReadType.BATCH.value:
 43            options_args = self._input_spec.options if self._input_spec.options else {}
 44
 45            sftp_files_format = SFTPExtractionUtils.validate_format(
 46                self._input_spec.sftp_files_format.lower()
 47            )
 48
 49            location = SFTPExtractionUtils.validate_location(self._input_spec.location)
 50
 51            sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args)
 52
 53            files_list = SFTPExtractionUtils.get_files_list(
 54                sftp, location, options_args
 55            )
 56
 57            dfs: List[PandasDataFrame] = []
 58            try:
 59                for filename in files_list:
 60                    with sftp.open(filename, "r") as sftp_file:
 61                        try:
 62                            pdf = self._read_files(
 63                                filename,
 64                                sftp_file,
 65                                options_args.get("args", {}),
 66                                sftp_files_format,
 67                            )
 68                            if options_args.get("file_metadata", None):
 69                                pdf["filename"] = filename
 70                                pdf["modification_time"] = datetime.fromtimestamp(
 71                                    sftp.stat(filename).st_mtime
 72                                )
 73                            self._append_files(pdf, dfs)
 74                        except EmptyDataError:
 75                            self._logger.info(f"{filename} - Empty or malformed file.")
 76                if dfs:
 77                    df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs))
 78                else:
 79                    raise ValueError(
 80                        "No files were found with the specified parameters."
 81                    )
 82            finally:
 83                sftp.close()
 84                transport.close()
 85        else:
 86            raise NotImplementedError(
 87                "The requested read type supports only BATCH mode."
 88            )
 89        return df
 90
 91    @classmethod
 92    def _append_files(cls, pdf: PandasDataFrame, dfs: List) -> List:
 93        """Append to the list dataframes with data.
 94
 95        Args:
 96            pdf: a Pandas dataframe containing data from files.
 97            dfs: a list of Pandas dataframes.
 98
 99        Returns:
100            A list of not empty Pandas dataframes.
101        """
102        if not pdf.empty:
103            dfs.append(pdf)
104        return dfs
105
106    @classmethod
107    def _read_files(
108        cls, filename: str, sftp_file: SFTPFile, option_args: dict, files_format: str
109    ) -> PandasDataFrame:
110        """Open and decompress files to be extracted from SFTP.
111
112        For zip files, to avoid data type inferred issues
113        during the iteration, all data will be read as string.
114        Also, empty dataframes will NOT be considered to be processed.
115        For the not considered ones, the file names will be logged.
116
117        Args:
118            filename: the filename to be read.
119            sftp_file: SFTPFile object representing the open file.
120            option_args: options from the acon.
121            files_format: a string containing the file extension.
122
123        Returns:
124            A pandas dataframe with data from the file.
125        """
126        reader = getattr(pd, f"read_{files_format}")
127
128        if filename.endswith(".gz"):
129            with gzip.GzipFile(fileobj=sftp_file, mode="rb") as gz_file:
130                pdf = reader(
131                    TextIOWrapper(gz_file),  # type: ignore
132                    **option_args,
133                )
134        elif filename.endswith(".zip"):
135            with ZipFile(sftp_file, "r") as zf:  # type: ignore
136                dfs = [
137                    reader(TextIOWrapper(zf.open(f)), **option_args).fillna("")
138                    for f in zf.namelist()
139                ]
140                if not pd.concat(dfs, ignore_index=True).empty:
141                    pdf = pd.concat(dfs, ignore_index=True).astype(str)
142                else:
143                    pdf = pd.DataFrame()
144                    cls._logger.info(f"{filename} - Empty or malformed file.")
145        else:
146            pdf = reader(
147                sftp_file,
148                **option_args,
149            )
150        return pdf
class SFTPReader(lakehouse_engine.io.reader.Reader):
 24class SFTPReader(Reader):
 25    """Class to read from SFTP."""
 26
 27    _logger: Logger = LoggingHandler(__name__).get_logger()
 28
 29    def __init__(self, input_spec: InputSpec):
 30        """Construct SFTPReader instances.
 31
 32        Args:
 33            input_spec: input specification.
 34        """
 35        super().__init__(input_spec)
 36
 37    def read(self) -> DataFrame:
 38        """Read SFTP data.
 39
 40        Returns:
 41            A dataframe containing the data from SFTP.
 42        """
 43        if self._input_spec.read_type == ReadType.BATCH.value:
 44            options_args = self._input_spec.options if self._input_spec.options else {}
 45
 46            sftp_files_format = SFTPExtractionUtils.validate_format(
 47                self._input_spec.sftp_files_format.lower()
 48            )
 49
 50            location = SFTPExtractionUtils.validate_location(self._input_spec.location)
 51
 52            sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args)
 53
 54            files_list = SFTPExtractionUtils.get_files_list(
 55                sftp, location, options_args
 56            )
 57
 58            dfs: List[PandasDataFrame] = []
 59            try:
 60                for filename in files_list:
 61                    with sftp.open(filename, "r") as sftp_file:
 62                        try:
 63                            pdf = self._read_files(
 64                                filename,
 65                                sftp_file,
 66                                options_args.get("args", {}),
 67                                sftp_files_format,
 68                            )
 69                            if options_args.get("file_metadata", None):
 70                                pdf["filename"] = filename
 71                                pdf["modification_time"] = datetime.fromtimestamp(
 72                                    sftp.stat(filename).st_mtime
 73                                )
 74                            self._append_files(pdf, dfs)
 75                        except EmptyDataError:
 76                            self._logger.info(f"{filename} - Empty or malformed file.")
 77                if dfs:
 78                    df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs))
 79                else:
 80                    raise ValueError(
 81                        "No files were found with the specified parameters."
 82                    )
 83            finally:
 84                sftp.close()
 85                transport.close()
 86        else:
 87            raise NotImplementedError(
 88                "The requested read type supports only BATCH mode."
 89            )
 90        return df
 91
 92    @classmethod
 93    def _append_files(cls, pdf: PandasDataFrame, dfs: List) -> List:
 94        """Append to the list dataframes with data.
 95
 96        Args:
 97            pdf: a Pandas dataframe containing data from files.
 98            dfs: a list of Pandas dataframes.
 99
100        Returns:
101            A list of not empty Pandas dataframes.
102        """
103        if not pdf.empty:
104            dfs.append(pdf)
105        return dfs
106
107    @classmethod
108    def _read_files(
109        cls, filename: str, sftp_file: SFTPFile, option_args: dict, files_format: str
110    ) -> PandasDataFrame:
111        """Open and decompress files to be extracted from SFTP.
112
113        For zip files, to avoid data type inferred issues
114        during the iteration, all data will be read as string.
115        Also, empty dataframes will NOT be considered to be processed.
116        For the not considered ones, the file names will be logged.
117
118        Args:
119            filename: the filename to be read.
120            sftp_file: SFTPFile object representing the open file.
121            option_args: options from the acon.
122            files_format: a string containing the file extension.
123
124        Returns:
125            A pandas dataframe with data from the file.
126        """
127        reader = getattr(pd, f"read_{files_format}")
128
129        if filename.endswith(".gz"):
130            with gzip.GzipFile(fileobj=sftp_file, mode="rb") as gz_file:
131                pdf = reader(
132                    TextIOWrapper(gz_file),  # type: ignore
133                    **option_args,
134                )
135        elif filename.endswith(".zip"):
136            with ZipFile(sftp_file, "r") as zf:  # type: ignore
137                dfs = [
138                    reader(TextIOWrapper(zf.open(f)), **option_args).fillna("")
139                    for f in zf.namelist()
140                ]
141                if not pd.concat(dfs, ignore_index=True).empty:
142                    pdf = pd.concat(dfs, ignore_index=True).astype(str)
143                else:
144                    pdf = pd.DataFrame()
145                    cls._logger.info(f"{filename} - Empty or malformed file.")
146        else:
147            pdf = reader(
148                sftp_file,
149                **option_args,
150            )
151        return pdf

Class to read from SFTP.

SFTPReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
29    def __init__(self, input_spec: InputSpec):
30        """Construct SFTPReader instances.
31
32        Args:
33            input_spec: input specification.
34        """
35        super().__init__(input_spec)

Construct SFTPReader instances.

Arguments:
  • input_spec: input specification.
def read(self) -> pyspark.sql.dataframe.DataFrame:
37    def read(self) -> DataFrame:
38        """Read SFTP data.
39
40        Returns:
41            A dataframe containing the data from SFTP.
42        """
43        if self._input_spec.read_type == ReadType.BATCH.value:
44            options_args = self._input_spec.options if self._input_spec.options else {}
45
46            sftp_files_format = SFTPExtractionUtils.validate_format(
47                self._input_spec.sftp_files_format.lower()
48            )
49
50            location = SFTPExtractionUtils.validate_location(self._input_spec.location)
51
52            sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args)
53
54            files_list = SFTPExtractionUtils.get_files_list(
55                sftp, location, options_args
56            )
57
58            dfs: List[PandasDataFrame] = []
59            try:
60                for filename in files_list:
61                    with sftp.open(filename, "r") as sftp_file:
62                        try:
63                            pdf = self._read_files(
64                                filename,
65                                sftp_file,
66                                options_args.get("args", {}),
67                                sftp_files_format,
68                            )
69                            if options_args.get("file_metadata", None):
70                                pdf["filename"] = filename
71                                pdf["modification_time"] = datetime.fromtimestamp(
72                                    sftp.stat(filename).st_mtime
73                                )
74                            self._append_files(pdf, dfs)
75                        except EmptyDataError:
76                            self._logger.info(f"{filename} - Empty or malformed file.")
77                if dfs:
78                    df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs))
79                else:
80                    raise ValueError(
81                        "No files were found with the specified parameters."
82                    )
83            finally:
84                sftp.close()
85                transport.close()
86        else:
87            raise NotImplementedError(
88                "The requested read type supports only BATCH mode."
89            )
90        return df

Read SFTP data.

Returns:

A dataframe containing the data from SFTP.