Skip to content

Sftp reader

Module to define behaviour to read from SFTP.

SFTPReader

Bases: Reader

Class to read from SFTP.

Source code in mkdocs/lakehouse_engine/packages/io/readers/sftp_reader.py
class SFTPReader(Reader):
    """Class to read from SFTP."""

    _logger: Logger = LoggingHandler(__name__).get_logger()

    def __init__(self, input_spec: InputSpec):
        """Construct SFTPReader instances.

        Args:
            input_spec: input specification.
        """
        super().__init__(input_spec)

    def read(self) -> DataFrame:
        """Read SFTP data.

        Returns:
            A dataframe containing the data from SFTP.
        """
        if self._input_spec.read_type == ReadType.BATCH.value:
            options_args = self._input_spec.options if self._input_spec.options else {}

            sftp_files_format = SFTPExtractionUtils.validate_format(
                self._input_spec.sftp_files_format.lower()
            )

            location = SFTPExtractionUtils.validate_location(self._input_spec.location)

            sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args)

            files_list = SFTPExtractionUtils.get_files_list(
                sftp, location, options_args
            )

            dfs: List[PandasDataFrame] = []
            try:
                for filename in files_list:
                    with sftp.open(filename, "r") as sftp_file:
                        try:
                            pdf = self._read_files(
                                filename,
                                sftp_file,
                                options_args.get("args", {}),
                                sftp_files_format,
                            )
                            if options_args.get("file_metadata", None):
                                pdf["filename"] = filename
                                pdf["modification_time"] = datetime.fromtimestamp(
                                    sftp.stat(filename).st_mtime
                                )
                            self._append_files(pdf, dfs)
                        except EmptyDataError:
                            self._logger.info(f"{filename} - Empty or malformed file.")
                if dfs:
                    df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs))
                else:
                    raise ValueError(
                        "No files were found with the specified parameters."
                    )
            finally:
                sftp.close()
                transport.close()
        else:
            raise NotImplementedError(
                "The requested read type supports only BATCH mode."
            )
        return df

    @classmethod
    def _append_files(cls, pdf: PandasDataFrame, dfs: List) -> List:
        """Append to the list dataframes with data.

        Args:
            pdf: a Pandas dataframe containing data from files.
            dfs: a list of Pandas dataframes.

        Returns:
            A list of not empty Pandas dataframes.
        """
        if not pdf.empty:
            dfs.append(pdf)
        return dfs

    @classmethod
    def _read_files(
        cls, filename: str, sftp_file: SFTPFile, option_args: dict, files_format: str
    ) -> PandasDataFrame:
        """Open and decompress files to be extracted from SFTP.

        For zip files, to avoid data type inferred issues
        during the iteration, all data will be read as string.
        Also, empty dataframes will NOT be considered to be processed.
        For the not considered ones, the file names will be logged.

        Args:
            filename: the filename to be read.
            sftp_file: SFTPFile object representing the open file.
            option_args: options from the acon.
            files_format: a string containing the file extension.

        Returns:
            A pandas dataframe with data from the file.
        """
        reader = getattr(pd, f"read_{files_format}")

        if filename.endswith(".gz"):
            with gzip.GzipFile(fileobj=sftp_file, mode="rb") as gz_file:
                pdf = reader(
                    TextIOWrapper(gz_file),  # type: ignore
                    **option_args,
                )
        elif filename.endswith(".zip"):
            with ZipFile(sftp_file, "r") as zf:  # type: ignore
                dfs = [
                    reader(TextIOWrapper(zf.open(f)), **option_args).fillna("")
                    for f in zf.namelist()
                ]
                if not pd.concat(dfs, ignore_index=True).empty:
                    pdf = pd.concat(dfs, ignore_index=True).astype(str)
                else:
                    pdf = pd.DataFrame()
                    cls._logger.info(f"{filename} - Empty or malformed file.")
        else:
            pdf = reader(
                sftp_file,
                **option_args,
            )
        return pdf

__init__(input_spec)

Construct SFTPReader instances.

Parameters:

Name Type Description Default
input_spec InputSpec

input specification.

required
Source code in mkdocs/lakehouse_engine/packages/io/readers/sftp_reader.py
def __init__(self, input_spec: InputSpec):
    """Construct SFTPReader instances.

    Args:
        input_spec: input specification.
    """
    super().__init__(input_spec)

read()

Read SFTP data.

Returns:

Type Description
DataFrame

A dataframe containing the data from SFTP.

Source code in mkdocs/lakehouse_engine/packages/io/readers/sftp_reader.py
def read(self) -> DataFrame:
    """Read SFTP data.

    Returns:
        A dataframe containing the data from SFTP.
    """
    if self._input_spec.read_type == ReadType.BATCH.value:
        options_args = self._input_spec.options if self._input_spec.options else {}

        sftp_files_format = SFTPExtractionUtils.validate_format(
            self._input_spec.sftp_files_format.lower()
        )

        location = SFTPExtractionUtils.validate_location(self._input_spec.location)

        sftp, transport = SFTPExtractionUtils.get_sftp_client(options_args)

        files_list = SFTPExtractionUtils.get_files_list(
            sftp, location, options_args
        )

        dfs: List[PandasDataFrame] = []
        try:
            for filename in files_list:
                with sftp.open(filename, "r") as sftp_file:
                    try:
                        pdf = self._read_files(
                            filename,
                            sftp_file,
                            options_args.get("args", {}),
                            sftp_files_format,
                        )
                        if options_args.get("file_metadata", None):
                            pdf["filename"] = filename
                            pdf["modification_time"] = datetime.fromtimestamp(
                                sftp.stat(filename).st_mtime
                            )
                        self._append_files(pdf, dfs)
                    except EmptyDataError:
                        self._logger.info(f"{filename} - Empty or malformed file.")
            if dfs:
                df = ExecEnv.SESSION.createDataFrame(pd.concat(dfs))
            else:
                raise ValueError(
                    "No files were found with the specified parameters."
                )
        finally:
            sftp.close()
            transport.close()
    else:
        raise NotImplementedError(
            "The requested read type supports only BATCH mode."
        )
    return df