lakehouse_engine.utils.extraction.sftp_extraction_utils

Utilities module for SFTP extraction processes.

  1"""Utilities module for SFTP extraction processes."""
  2
  3import stat
  4from base64 import decodebytes
  5from datetime import datetime
  6from enum import Enum
  7from logging import Logger
  8from stat import S_ISREG
  9from typing import Any, List, Set, Tuple, Union
 10
 11import paramiko as p
 12from paramiko import Ed25519Key, PKey, RSAKey, Transport
 13from paramiko.sftp_client import SFTPAttributes, SFTPClient  # type: ignore
 14
 15from lakehouse_engine.transformers.exceptions import WrongArgumentsException
 16from lakehouse_engine.utils.logging_handler import LoggingHandler
 17
 18
 19class SFTPInputFormat(Enum):
 20    """Formats of algorithm input."""
 21
 22    CSV = "csv"
 23    FWF = "fwf"
 24    JSON = "json"
 25    XML = "xml"
 26
 27
 28class SFTPExtractionFilter(Enum):
 29    """Standardize the types of filters we can have from a SFTP source."""
 30
 31    file_name_contains = "file_name_contains"
 32    LATEST_FILE = "latest_file"
 33    EARLIEST_FILE = "earliest_file"
 34    GREATER_THAN = "date_time_gt"
 35    LOWER_THAN = "date_time_lt"
 36
 37
 38class SFTPExtractionUtils(object):
 39    """Utils for managing data extraction from particularly relevant SFTP sources."""
 40
 41    _logger: Logger = LoggingHandler(__name__).get_logger()
 42
 43    @classmethod
 44    def get_files_list(
 45        cls, sftp: SFTPClient, remote_path: str, options_args: dict
 46    ) -> Set[str]:
 47        """Get a list of files to be extracted from SFTP.
 48
 49        The arguments (options_args) to list files are:
 50        - date_time_gt(str):
 51            Filter the files greater than the string datetime
 52            formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
 53        - date_time_lt(str):
 54            Filter the files lower than the string datetime
 55            formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
 56        - earliest_file(bool):
 57            Filter the earliest dated file in the directory.
 58        - file_name_contains(str):
 59            Filter files when match the pattern.
 60        - latest_file(bool):
 61            Filter the most recent dated file in the directory.
 62        - sub_dir(bool):
 63            When true, the engine will search files into subdirectories
 64            of the remote_path.
 65            It will consider one level below the remote_path.
 66            When sub_dir is used with latest_file/earliest_file argument,
 67            the engine will retrieve the latest_file/earliest_file
 68            for each subdirectory.
 69
 70        Args:
 71            sftp: the SFTP client object.
 72            remote_path: path of files to be filtered.
 73            options_args: options from the acon.
 74
 75        Returns:
 76            A list containing the file names to be passed to Spark.
 77        """
 78        all_items, folder_path = cls._get_folder_items(remote_path, sftp, options_args)
 79
 80        filtered_files: Set[str] = set()
 81
 82        try:
 83            for item, folder in zip(all_items, folder_path):
 84                file_contains = cls._file_has_pattern(item, options_args)
 85                file_in_interval = cls._file_in_date_interval(item, options_args)
 86                if file_contains and file_in_interval:
 87                    filtered_files.add(folder + item.filename)
 88
 89            if (
 90                SFTPExtractionFilter.EARLIEST_FILE.value in options_args.keys()
 91                or SFTPExtractionFilter.LATEST_FILE.value in options_args.keys()
 92            ):
 93                filtered_files = cls._get_earliest_latest_file(
 94                    sftp, options_args, filtered_files, folder_path
 95                )
 96
 97        except Exception as e:
 98            cls._logger.error(f"SFTP list_files EXCEPTION: - {e}")
 99        return filtered_files
100
101    @classmethod
102    def get_sftp_client(
103        cls,
104        options_args: dict,
105    ) -> Tuple[SFTPClient, Transport]:
106        """Get the SFTP client.
107
108        The SFTP client is used to open an SFTP session across an open
109        SSH Transport and perform remote file operations.
110
111        Args:
112            options_args: dictionary containing SFTP connection parameters.
113                The Paramiko arguments expected to connect are:
114                    - "hostname": the server to connect to.
115                    - "port": the server port to connect to.
116                    - "username": the username to authenticate as.
117                    - "password": used for password authentication.
118                    - "pkey": optional - an optional public key to use for
119                        authentication.
120                    - "passphrase" – optional - options used for decrypting private
121                        keys.
122                    - "key_filename" – optional - the filename, or list of filenames,
123                        of optional private key(s) and/or certs to try for
124                        authentication.
125                    - "timeout" – an optional timeout (in seconds) for the TCP connect.
126                    - "allow_agent" – optional - set to False to disable
127                        connecting to the SSH agent.
128                    - "look_for_keys" – optional - set to False to disable searching
129                        for discoverable private key files in ~/.ssh/.
130                    - "compress" – optional - set to True to turn on compression.
131                    - "sock" - optional - an open socket or socket-like object
132                        to use for communication to the target host.
133                    - "gss_auth" – optional - True if you want to use GSS-API
134                        authentication.
135                    - "gss_kex" – optional - Perform GSS-API Key Exchange and
136                        user authentication.
137                    - "gss_deleg_creds" – optional - Delegate GSS-API client
138                        credentials or not.
139                    - "gss_host" – optional - The targets name in the kerberos database.
140                    - "gss_trust_dns" – optional - Indicates whether or
141                        not the DNS is trusted to securely canonicalize the name of the
142                        host being connected to (default True).
143                    - "banner_timeout" – an optional timeout (in seconds)
144                        to wait for the SSH banner to be presented.
145                    - "auth_timeout" – an optional timeout (in seconds)
146                        to wait for an authentication response.
147                    - "disabled_algorithms" – an optional dict passed directly to
148                        Transport and its keyword argument of the same name.
149                    - "transport_factory" – an optional callable which is handed a
150                        subset of the constructor arguments (primarily those related
151                        to the socket, GSS functionality, and algorithm selection)
152                        and generates a Transport instance to be used by this client.
153                        Defaults to Transport.__init__.
154
155                The parameter to specify the private key is expected to be in
156                RSA format. Attempting a connection with a blank host key is
157                not allowed unless the argument "add_auto_policy" is explicitly
158                set to True.
159
160        Returns:
161            sftp -> a new SFTPClient session object.
162            transport -> the Transport for this connection.
163        """
164        ssh_client = p.SSHClient()
165        try:
166            if not options_args.get("pkey") and not options_args.get("add_auto_policy"):
167                raise WrongArgumentsException(
168                    "Get SFTP Client: No host key (pkey) was provided and the "
169                    + "add_auto_policy property is false."
170                )
171
172            if options_args.get("pkey") and not options_args.get("key_type"):
173                raise WrongArgumentsException(
174                    "Get SFTP Client: The key_type must be provided when "
175                    + "the host key (pkey) is provided."
176                )
177
178            if options_args.get("pkey", None) and options_args.get("key_type", None):
179                key = cls._get_host_keys(
180                    options_args.get("pkey", None), options_args.get("key_type", None)
181                )
182                ssh_client.get_host_keys().add(
183                    hostname=f"[{options_args.get('hostname')}]:"
184                    + f"{options_args.get('port')}",
185                    keytype="ssh-rsa",
186                    key=key,
187                )
188            elif options_args.get("add_auto_policy", None):
189                ssh_client.load_system_host_keys()
190                ssh_client.set_missing_host_key_policy(p.WarningPolicy())  # nosec: B507
191            else:
192                ssh_client.load_system_host_keys()
193                ssh_client.set_missing_host_key_policy(p.RejectPolicy())
194
195            ssh_client.connect(
196                hostname=options_args.get("hostname"),
197                port=options_args.get("port", 22),
198                username=options_args.get("username", None),
199                password=options_args.get("password", None),
200                key_filename=options_args.get("key_filename", None),
201                timeout=options_args.get("timeout", None),
202                allow_agent=options_args.get("allow_agent", True),
203                look_for_keys=options_args.get("look_for_keys", True),
204                compress=options_args.get("compress", False),
205                sock=options_args.get("sock", None),
206                gss_auth=options_args.get("gss_auth", False),
207                gss_kex=options_args.get("gss_kex", False),
208                gss_deleg_creds=options_args.get("gss_deleg_creds", False),
209                gss_host=options_args.get("gss_host", False),
210                banner_timeout=options_args.get("banner_timeout", None),
211                auth_timeout=options_args.get("auth_timeout", None),
212                gss_trust_dns=options_args.get("gss_trust_dns", None),
213                passphrase=options_args.get("passphrase", None),
214                disabled_algorithms=options_args.get("disabled_algorithms", None),
215                transport_factory=options_args.get("transport_factory", None),
216            )
217
218            sftp = ssh_client.open_sftp()
219            transport = ssh_client.get_transport()
220        except ConnectionError as e:
221            cls._logger.error(e)
222            raise
223        return sftp, transport
224
225    @classmethod
226    def validate_format(cls, files_format: str) -> str:
227        """Validate the file extension based on the format definitions.
228
229        Args:
230            files_format: a string containing the file extension.
231
232        Returns:
233            The string validated and formatted.
234        """
235        formats_allowed = [
236            SFTPInputFormat.CSV.value,
237            SFTPInputFormat.FWF.value,
238            SFTPInputFormat.JSON.value,
239            SFTPInputFormat.XML.value,
240        ]
241
242        if files_format not in formats_allowed:
243            raise WrongArgumentsException(
244                f"The formats allowed for SFTP are {formats_allowed}."
245            )
246
247        return files_format
248
249    @classmethod
250    def validate_location(cls, location: str) -> str:
251        """Validate the location. Add "/" in the case it does not exist.
252
253        Args:
254            location: file path.
255
256        Returns:
257            The location validated.
258        """
259        return location if location.rfind("/") == len(location) - 1 else location + "/"
260
261    @classmethod
262    def _file_has_pattern(cls, item: SFTPAttributes, options_args: dict) -> bool:
263        """Check if a file follows the pattern used for filtering.
264
265        Args:
266            item: item available in SFTP directory.
267            options_args: options from the acon.
268
269        Returns:
270            A boolean telling whether the file contains a pattern or not.
271        """
272        file_to_consider = True
273
274        if SFTPExtractionFilter.file_name_contains.value in options_args.keys():
275            if not (
276                options_args.get(SFTPExtractionFilter.file_name_contains.value)
277                in item.filename
278                and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename))
279            ):
280                file_to_consider = False
281
282        return file_to_consider
283
284    @classmethod
285    def _file_in_date_interval(
286        cls,
287        item: SFTPAttributes,
288        options_args: dict,
289    ) -> bool:
290        """Check if the file is in the expected date interval.
291
292        The logic is applied based on the arguments greater_than and lower_than.
293        i.e:
294        - if greater_than and lower_than have values,
295        then it performs a between.
296        - if only lower_than has values,
297        then only values lower than the input value will be retrieved.
298        - if only greater_than has values,
299        then only values greater than the input value will be retrieved.
300
301        Args:
302            item: item available in SFTP directory.
303            options_args: options from the acon.
304
305        Returns:
306            A boolean telling whether the file is in the expected date interval or not.
307        """
308        file_to_consider = True
309
310        if (
311            SFTPExtractionFilter.LOWER_THAN.value in options_args.keys()
312            or SFTPExtractionFilter.GREATER_THAN.value in options_args.keys()
313            and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename))
314        ):
315            lower_than = options_args.get(
316                SFTPExtractionFilter.LOWER_THAN.value, "9999-12-31"
317            )
318            greater_than = options_args.get(
319                SFTPExtractionFilter.GREATER_THAN.value, "1900-01-01"
320            )
321
322            file_date = datetime.fromtimestamp(item.st_mtime)
323
324            if not (
325                (
326                    lower_than == greater_than
327                    and cls._validate_date(greater_than)
328                    <= file_date
329                    <= cls._validate_date(lower_than)
330                )
331                or (
332                    cls._validate_date(greater_than)
333                    < file_date
334                    < cls._validate_date(lower_than)
335                )
336            ):
337                file_to_consider = False
338
339        return file_to_consider
340
341    @classmethod
342    def _get_earliest_latest_file(
343        cls,
344        sftp: SFTPClient,
345        options_args: dict,
346        list_filter_files: Set[str],
347        folder_path: List,
348    ) -> Set[str]:
349        """Get the earliest or latest file of a directory.
350
351        Args:
352            sftp: the SFTP client object.
353            options_args: options from the acon.
354            list_filter_files: set of file names to filter from.
355            folder_path: the location of files.
356
357        Returns:
358            A set containing the earliest/latest file name.
359        """
360        list_earl_lat_files: Set[str] = set()
361
362        for folder in folder_path:
363            file_date = 0
364            file_name = ""
365            all_items, file_path = cls._get_folder_items(
366                f"{folder}", sftp, options_args
367            )
368            for item in all_items:
369                if (
370                    folder + item.filename in list_filter_files
371                    and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename))
372                    and (
373                        options_args.get("earliest_file")
374                        and (file_date == 0 or item.st_mtime < file_date)
375                    )
376                    or (
377                        options_args.get("latest_file")
378                        and (file_date == 0 or item.st_mtime > file_date)
379                    )
380                ):
381                    file_date = item.st_mtime
382                    file_name = folder + item.filename
383            list_earl_lat_files.add(file_name)
384
385        return list_earl_lat_files
386
387    @classmethod
388    def _get_folder_items(
389        cls, remote_path: str, sftp: SFTPClient, options_args: dict
390    ) -> Tuple:
391        """Get the files and the directory to be processed.
392
393        Args:
394            remote_path: root folder path.
395            sftp: a SFTPClient session object.
396            options_args: options from the acon.
397
398        Returns:
399            A tuple with a list of items (file object) and a list of directories.
400        """
401        sub_dir = options_args.get("sub_dir", False)
402        all_items: List[SFTPAttributes] = sftp.listdir_attr(remote_path)
403        items: List[SFTPAttributes] = []
404        folders: List = []
405
406        for item in all_items:
407            is_dir = stat.S_ISDIR(item.st_mode)
408            if is_dir and sub_dir and not item.filename.endswith((".gz", ".zip")):
409                dirs = sftp.listdir_attr(f"{remote_path}{item.filename}")
410                for file in dirs:
411                    items.append(file)
412                    folders.append(f"{remote_path}{item.filename}/")
413            else:
414                items.append(item)
415                folders.append(remote_path)
416
417        return items, folders
418
419    @classmethod
420    def _get_host_keys(cls, pkey: str, key_type: str) -> PKey:
421        """Get the pkey that will be added to the server.
422
423        Args:
424            pkey: a string with a host key value.
425            key_type: the type of key (rsa or ed25519).
426
427        Returns:
428            A PKey that will be used to authenticate the connection.
429        """
430        key: Union[RSAKey, Ed25519Key] = None
431        if pkey and key_type.lower() == "rsa":
432            b_pkey = bytes(pkey, "UTF-8")
433            key = p.RSAKey(data=decodebytes(b_pkey))
434        elif pkey and key_type.lower() == "ed25519":
435            b_pkey = bytes(pkey, "UTF-8")
436            key = p.Ed25519Key(data=decodebytes(b_pkey))
437
438        return key
439
440    @classmethod
441    def _is_compressed(cls, filename: str) -> Any:
442        """Validate if it is a compressed file.
443
444        Args:
445            filename: name of the file to be validated.
446
447        Returns:
448            A boolean with the result.
449        """
450        return filename.endswith((".gz", ".zip"))
451
452    @classmethod
453    def _validate_date(cls, date_text: str) -> datetime:
454        """Validate the input date format.
455
456        Args:
457            date_text: a string with the date or datetime value.
458            The expected formats are:
459                YYYY-MM-DD and YYYY-MM-DD HH:MM:SS
460
461        Returns:
462            The datetime validated and formatted.
463        """
464        for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S"):
465            try:
466                if date_text is not None:
467                    return datetime.strptime(date_text, fmt)
468            except ValueError:
469                pass
470        raise ValueError(
471            "Incorrect data format, should be YYYY-MM-DD or YYYY-MM-DD HH:MM:SS."
472        )
class SFTPInputFormat(enum.Enum):
20class SFTPInputFormat(Enum):
21    """Formats of algorithm input."""
22
23    CSV = "csv"
24    FWF = "fwf"
25    JSON = "json"
26    XML = "xml"

Formats of algorithm input.

CSV = <SFTPInputFormat.CSV: 'csv'>
FWF = <SFTPInputFormat.FWF: 'fwf'>
JSON = <SFTPInputFormat.JSON: 'json'>
XML = <SFTPInputFormat.XML: 'xml'>
Inherited Members
enum.Enum
name
value
class SFTPExtractionFilter(enum.Enum):
29class SFTPExtractionFilter(Enum):
30    """Standardize the types of filters we can have from a SFTP source."""
31
32    file_name_contains = "file_name_contains"
33    LATEST_FILE = "latest_file"
34    EARLIEST_FILE = "earliest_file"
35    GREATER_THAN = "date_time_gt"
36    LOWER_THAN = "date_time_lt"

Standardize the types of filters we can have from a SFTP source.

file_name_contains = <SFTPExtractionFilter.file_name_contains: 'file_name_contains'>
LATEST_FILE = <SFTPExtractionFilter.LATEST_FILE: 'latest_file'>
EARLIEST_FILE = <SFTPExtractionFilter.EARLIEST_FILE: 'earliest_file'>
GREATER_THAN = <SFTPExtractionFilter.GREATER_THAN: 'date_time_gt'>
LOWER_THAN = <SFTPExtractionFilter.LOWER_THAN: 'date_time_lt'>
Inherited Members
enum.Enum
name
value
class SFTPExtractionUtils:
 39class SFTPExtractionUtils(object):
 40    """Utils for managing data extraction from particularly relevant SFTP sources."""
 41
 42    _logger: Logger = LoggingHandler(__name__).get_logger()
 43
 44    @classmethod
 45    def get_files_list(
 46        cls, sftp: SFTPClient, remote_path: str, options_args: dict
 47    ) -> Set[str]:
 48        """Get a list of files to be extracted from SFTP.
 49
 50        The arguments (options_args) to list files are:
 51        - date_time_gt(str):
 52            Filter the files greater than the string datetime
 53            formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
 54        - date_time_lt(str):
 55            Filter the files lower than the string datetime
 56            formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
 57        - earliest_file(bool):
 58            Filter the earliest dated file in the directory.
 59        - file_name_contains(str):
 60            Filter files when match the pattern.
 61        - latest_file(bool):
 62            Filter the most recent dated file in the directory.
 63        - sub_dir(bool):
 64            When true, the engine will search files into subdirectories
 65            of the remote_path.
 66            It will consider one level below the remote_path.
 67            When sub_dir is used with latest_file/earliest_file argument,
 68            the engine will retrieve the latest_file/earliest_file
 69            for each subdirectory.
 70
 71        Args:
 72            sftp: the SFTP client object.
 73            remote_path: path of files to be filtered.
 74            options_args: options from the acon.
 75
 76        Returns:
 77            A list containing the file names to be passed to Spark.
 78        """
 79        all_items, folder_path = cls._get_folder_items(remote_path, sftp, options_args)
 80
 81        filtered_files: Set[str] = set()
 82
 83        try:
 84            for item, folder in zip(all_items, folder_path):
 85                file_contains = cls._file_has_pattern(item, options_args)
 86                file_in_interval = cls._file_in_date_interval(item, options_args)
 87                if file_contains and file_in_interval:
 88                    filtered_files.add(folder + item.filename)
 89
 90            if (
 91                SFTPExtractionFilter.EARLIEST_FILE.value in options_args.keys()
 92                or SFTPExtractionFilter.LATEST_FILE.value in options_args.keys()
 93            ):
 94                filtered_files = cls._get_earliest_latest_file(
 95                    sftp, options_args, filtered_files, folder_path
 96                )
 97
 98        except Exception as e:
 99            cls._logger.error(f"SFTP list_files EXCEPTION: - {e}")
100        return filtered_files
101
102    @classmethod
103    def get_sftp_client(
104        cls,
105        options_args: dict,
106    ) -> Tuple[SFTPClient, Transport]:
107        """Get the SFTP client.
108
109        The SFTP client is used to open an SFTP session across an open
110        SSH Transport and perform remote file operations.
111
112        Args:
113            options_args: dictionary containing SFTP connection parameters.
114                The Paramiko arguments expected to connect are:
115                    - "hostname": the server to connect to.
116                    - "port": the server port to connect to.
117                    - "username": the username to authenticate as.
118                    - "password": used for password authentication.
119                    - "pkey": optional - an optional public key to use for
120                        authentication.
121                    - "passphrase" – optional - options used for decrypting private
122                        keys.
123                    - "key_filename" – optional - the filename, or list of filenames,
124                        of optional private key(s) and/or certs to try for
125                        authentication.
126                    - "timeout" – an optional timeout (in seconds) for the TCP connect.
127                    - "allow_agent" – optional - set to False to disable
128                        connecting to the SSH agent.
129                    - "look_for_keys" – optional - set to False to disable searching
130                        for discoverable private key files in ~/.ssh/.
131                    - "compress" – optional - set to True to turn on compression.
132                    - "sock" - optional - an open socket or socket-like object
133                        to use for communication to the target host.
134                    - "gss_auth" – optional - True if you want to use GSS-API
135                        authentication.
136                    - "gss_kex" – optional - Perform GSS-API Key Exchange and
137                        user authentication.
138                    - "gss_deleg_creds" – optional - Delegate GSS-API client
139                        credentials or not.
140                    - "gss_host" – optional - The targets name in the kerberos database.
141                    - "gss_trust_dns" – optional - Indicates whether or
142                        not the DNS is trusted to securely canonicalize the name of the
143                        host being connected to (default True).
144                    - "banner_timeout" – an optional timeout (in seconds)
145                        to wait for the SSH banner to be presented.
146                    - "auth_timeout" – an optional timeout (in seconds)
147                        to wait for an authentication response.
148                    - "disabled_algorithms" – an optional dict passed directly to
149                        Transport and its keyword argument of the same name.
150                    - "transport_factory" – an optional callable which is handed a
151                        subset of the constructor arguments (primarily those related
152                        to the socket, GSS functionality, and algorithm selection)
153                        and generates a Transport instance to be used by this client.
154                        Defaults to Transport.__init__.
155
156                The parameter to specify the private key is expected to be in
157                RSA format. Attempting a connection with a blank host key is
158                not allowed unless the argument "add_auto_policy" is explicitly
159                set to True.
160
161        Returns:
162            sftp -> a new SFTPClient session object.
163            transport -> the Transport for this connection.
164        """
165        ssh_client = p.SSHClient()
166        try:
167            if not options_args.get("pkey") and not options_args.get("add_auto_policy"):
168                raise WrongArgumentsException(
169                    "Get SFTP Client: No host key (pkey) was provided and the "
170                    + "add_auto_policy property is false."
171                )
172
173            if options_args.get("pkey") and not options_args.get("key_type"):
174                raise WrongArgumentsException(
175                    "Get SFTP Client: The key_type must be provided when "
176                    + "the host key (pkey) is provided."
177                )
178
179            if options_args.get("pkey", None) and options_args.get("key_type", None):
180                key = cls._get_host_keys(
181                    options_args.get("pkey", None), options_args.get("key_type", None)
182                )
183                ssh_client.get_host_keys().add(
184                    hostname=f"[{options_args.get('hostname')}]:"
185                    + f"{options_args.get('port')}",
186                    keytype="ssh-rsa",
187                    key=key,
188                )
189            elif options_args.get("add_auto_policy", None):
190                ssh_client.load_system_host_keys()
191                ssh_client.set_missing_host_key_policy(p.WarningPolicy())  # nosec: B507
192            else:
193                ssh_client.load_system_host_keys()
194                ssh_client.set_missing_host_key_policy(p.RejectPolicy())
195
196            ssh_client.connect(
197                hostname=options_args.get("hostname"),
198                port=options_args.get("port", 22),
199                username=options_args.get("username", None),
200                password=options_args.get("password", None),
201                key_filename=options_args.get("key_filename", None),
202                timeout=options_args.get("timeout", None),
203                allow_agent=options_args.get("allow_agent", True),
204                look_for_keys=options_args.get("look_for_keys", True),
205                compress=options_args.get("compress", False),
206                sock=options_args.get("sock", None),
207                gss_auth=options_args.get("gss_auth", False),
208                gss_kex=options_args.get("gss_kex", False),
209                gss_deleg_creds=options_args.get("gss_deleg_creds", False),
210                gss_host=options_args.get("gss_host", False),
211                banner_timeout=options_args.get("banner_timeout", None),
212                auth_timeout=options_args.get("auth_timeout", None),
213                gss_trust_dns=options_args.get("gss_trust_dns", None),
214                passphrase=options_args.get("passphrase", None),
215                disabled_algorithms=options_args.get("disabled_algorithms", None),
216                transport_factory=options_args.get("transport_factory", None),
217            )
218
219            sftp = ssh_client.open_sftp()
220            transport = ssh_client.get_transport()
221        except ConnectionError as e:
222            cls._logger.error(e)
223            raise
224        return sftp, transport
225
226    @classmethod
227    def validate_format(cls, files_format: str) -> str:
228        """Validate the file extension based on the format definitions.
229
230        Args:
231            files_format: a string containing the file extension.
232
233        Returns:
234            The string validated and formatted.
235        """
236        formats_allowed = [
237            SFTPInputFormat.CSV.value,
238            SFTPInputFormat.FWF.value,
239            SFTPInputFormat.JSON.value,
240            SFTPInputFormat.XML.value,
241        ]
242
243        if files_format not in formats_allowed:
244            raise WrongArgumentsException(
245                f"The formats allowed for SFTP are {formats_allowed}."
246            )
247
248        return files_format
249
250    @classmethod
251    def validate_location(cls, location: str) -> str:
252        """Validate the location. Add "/" in the case it does not exist.
253
254        Args:
255            location: file path.
256
257        Returns:
258            The location validated.
259        """
260        return location if location.rfind("/") == len(location) - 1 else location + "/"
261
262    @classmethod
263    def _file_has_pattern(cls, item: SFTPAttributes, options_args: dict) -> bool:
264        """Check if a file follows the pattern used for filtering.
265
266        Args:
267            item: item available in SFTP directory.
268            options_args: options from the acon.
269
270        Returns:
271            A boolean telling whether the file contains a pattern or not.
272        """
273        file_to_consider = True
274
275        if SFTPExtractionFilter.file_name_contains.value in options_args.keys():
276            if not (
277                options_args.get(SFTPExtractionFilter.file_name_contains.value)
278                in item.filename
279                and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename))
280            ):
281                file_to_consider = False
282
283        return file_to_consider
284
285    @classmethod
286    def _file_in_date_interval(
287        cls,
288        item: SFTPAttributes,
289        options_args: dict,
290    ) -> bool:
291        """Check if the file is in the expected date interval.
292
293        The logic is applied based on the arguments greater_than and lower_than.
294        i.e:
295        - if greater_than and lower_than have values,
296        then it performs a between.
297        - if only lower_than has values,
298        then only values lower than the input value will be retrieved.
299        - if only greater_than has values,
300        then only values greater than the input value will be retrieved.
301
302        Args:
303            item: item available in SFTP directory.
304            options_args: options from the acon.
305
306        Returns:
307            A boolean telling whether the file is in the expected date interval or not.
308        """
309        file_to_consider = True
310
311        if (
312            SFTPExtractionFilter.LOWER_THAN.value in options_args.keys()
313            or SFTPExtractionFilter.GREATER_THAN.value in options_args.keys()
314            and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename))
315        ):
316            lower_than = options_args.get(
317                SFTPExtractionFilter.LOWER_THAN.value, "9999-12-31"
318            )
319            greater_than = options_args.get(
320                SFTPExtractionFilter.GREATER_THAN.value, "1900-01-01"
321            )
322
323            file_date = datetime.fromtimestamp(item.st_mtime)
324
325            if not (
326                (
327                    lower_than == greater_than
328                    and cls._validate_date(greater_than)
329                    <= file_date
330                    <= cls._validate_date(lower_than)
331                )
332                or (
333                    cls._validate_date(greater_than)
334                    < file_date
335                    < cls._validate_date(lower_than)
336                )
337            ):
338                file_to_consider = False
339
340        return file_to_consider
341
342    @classmethod
343    def _get_earliest_latest_file(
344        cls,
345        sftp: SFTPClient,
346        options_args: dict,
347        list_filter_files: Set[str],
348        folder_path: List,
349    ) -> Set[str]:
350        """Get the earliest or latest file of a directory.
351
352        Args:
353            sftp: the SFTP client object.
354            options_args: options from the acon.
355            list_filter_files: set of file names to filter from.
356            folder_path: the location of files.
357
358        Returns:
359            A set containing the earliest/latest file name.
360        """
361        list_earl_lat_files: Set[str] = set()
362
363        for folder in folder_path:
364            file_date = 0
365            file_name = ""
366            all_items, file_path = cls._get_folder_items(
367                f"{folder}", sftp, options_args
368            )
369            for item in all_items:
370                if (
371                    folder + item.filename in list_filter_files
372                    and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename))
373                    and (
374                        options_args.get("earliest_file")
375                        and (file_date == 0 or item.st_mtime < file_date)
376                    )
377                    or (
378                        options_args.get("latest_file")
379                        and (file_date == 0 or item.st_mtime > file_date)
380                    )
381                ):
382                    file_date = item.st_mtime
383                    file_name = folder + item.filename
384            list_earl_lat_files.add(file_name)
385
386        return list_earl_lat_files
387
388    @classmethod
389    def _get_folder_items(
390        cls, remote_path: str, sftp: SFTPClient, options_args: dict
391    ) -> Tuple:
392        """Get the files and the directory to be processed.
393
394        Args:
395            remote_path: root folder path.
396            sftp: a SFTPClient session object.
397            options_args: options from the acon.
398
399        Returns:
400            A tuple with a list of items (file object) and a list of directories.
401        """
402        sub_dir = options_args.get("sub_dir", False)
403        all_items: List[SFTPAttributes] = sftp.listdir_attr(remote_path)
404        items: List[SFTPAttributes] = []
405        folders: List = []
406
407        for item in all_items:
408            is_dir = stat.S_ISDIR(item.st_mode)
409            if is_dir and sub_dir and not item.filename.endswith((".gz", ".zip")):
410                dirs = sftp.listdir_attr(f"{remote_path}{item.filename}")
411                for file in dirs:
412                    items.append(file)
413                    folders.append(f"{remote_path}{item.filename}/")
414            else:
415                items.append(item)
416                folders.append(remote_path)
417
418        return items, folders
419
420    @classmethod
421    def _get_host_keys(cls, pkey: str, key_type: str) -> PKey:
422        """Get the pkey that will be added to the server.
423
424        Args:
425            pkey: a string with a host key value.
426            key_type: the type of key (rsa or ed25519).
427
428        Returns:
429            A PKey that will be used to authenticate the connection.
430        """
431        key: Union[RSAKey, Ed25519Key] = None
432        if pkey and key_type.lower() == "rsa":
433            b_pkey = bytes(pkey, "UTF-8")
434            key = p.RSAKey(data=decodebytes(b_pkey))
435        elif pkey and key_type.lower() == "ed25519":
436            b_pkey = bytes(pkey, "UTF-8")
437            key = p.Ed25519Key(data=decodebytes(b_pkey))
438
439        return key
440
441    @classmethod
442    def _is_compressed(cls, filename: str) -> Any:
443        """Validate if it is a compressed file.
444
445        Args:
446            filename: name of the file to be validated.
447
448        Returns:
449            A boolean with the result.
450        """
451        return filename.endswith((".gz", ".zip"))
452
453    @classmethod
454    def _validate_date(cls, date_text: str) -> datetime:
455        """Validate the input date format.
456
457        Args:
458            date_text: a string with the date or datetime value.
459            The expected formats are:
460                YYYY-MM-DD and YYYY-MM-DD HH:MM:SS
461
462        Returns:
463            The datetime validated and formatted.
464        """
465        for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S"):
466            try:
467                if date_text is not None:
468                    return datetime.strptime(date_text, fmt)
469            except ValueError:
470                pass
471        raise ValueError(
472            "Incorrect data format, should be YYYY-MM-DD or YYYY-MM-DD HH:MM:SS."
473        )

Utils for managing data extraction from particularly relevant SFTP sources.

@classmethod
def get_files_list( cls, sftp: paramiko.sftp_client.SFTPClient, remote_path: str, options_args: dict) -> Set[str]:
 44    @classmethod
 45    def get_files_list(
 46        cls, sftp: SFTPClient, remote_path: str, options_args: dict
 47    ) -> Set[str]:
 48        """Get a list of files to be extracted from SFTP.
 49
 50        The arguments (options_args) to list files are:
 51        - date_time_gt(str):
 52            Filter the files greater than the string datetime
 53            formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
 54        - date_time_lt(str):
 55            Filter the files lower than the string datetime
 56            formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
 57        - earliest_file(bool):
 58            Filter the earliest dated file in the directory.
 59        - file_name_contains(str):
 60            Filter files when match the pattern.
 61        - latest_file(bool):
 62            Filter the most recent dated file in the directory.
 63        - sub_dir(bool):
 64            When true, the engine will search files into subdirectories
 65            of the remote_path.
 66            It will consider one level below the remote_path.
 67            When sub_dir is used with latest_file/earliest_file argument,
 68            the engine will retrieve the latest_file/earliest_file
 69            for each subdirectory.
 70
 71        Args:
 72            sftp: the SFTP client object.
 73            remote_path: path of files to be filtered.
 74            options_args: options from the acon.
 75
 76        Returns:
 77            A list containing the file names to be passed to Spark.
 78        """
 79        all_items, folder_path = cls._get_folder_items(remote_path, sftp, options_args)
 80
 81        filtered_files: Set[str] = set()
 82
 83        try:
 84            for item, folder in zip(all_items, folder_path):
 85                file_contains = cls._file_has_pattern(item, options_args)
 86                file_in_interval = cls._file_in_date_interval(item, options_args)
 87                if file_contains and file_in_interval:
 88                    filtered_files.add(folder + item.filename)
 89
 90            if (
 91                SFTPExtractionFilter.EARLIEST_FILE.value in options_args.keys()
 92                or SFTPExtractionFilter.LATEST_FILE.value in options_args.keys()
 93            ):
 94                filtered_files = cls._get_earliest_latest_file(
 95                    sftp, options_args, filtered_files, folder_path
 96                )
 97
 98        except Exception as e:
 99            cls._logger.error(f"SFTP list_files EXCEPTION: - {e}")
100        return filtered_files

Get a list of files to be extracted from SFTP.

The arguments (options_args) to list files are:

  • date_time_gt(str): Filter the files greater than the string datetime formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
  • date_time_lt(str): Filter the files lower than the string datetime formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
  • earliest_file(bool): Filter the earliest dated file in the directory.
  • file_name_contains(str): Filter files when match the pattern.
  • latest_file(bool): Filter the most recent dated file in the directory.
  • sub_dir(bool): When true, the engine will search files into subdirectories of the remote_path. It will consider one level below the remote_path. When sub_dir is used with latest_file/earliest_file argument, the engine will retrieve the latest_file/earliest_file for each subdirectory.
Arguments:
  • sftp: the SFTP client object.
  • remote_path: path of files to be filtered.
  • options_args: options from the acon.
Returns:

A list containing the file names to be passed to Spark.

@classmethod
def get_sftp_client( cls, options_args: dict) -> Tuple[paramiko.sftp_client.SFTPClient, paramiko.transport.Transport]:
102    @classmethod
103    def get_sftp_client(
104        cls,
105        options_args: dict,
106    ) -> Tuple[SFTPClient, Transport]:
107        """Get the SFTP client.
108
109        The SFTP client is used to open an SFTP session across an open
110        SSH Transport and perform remote file operations.
111
112        Args:
113            options_args: dictionary containing SFTP connection parameters.
114                The Paramiko arguments expected to connect are:
115                    - "hostname": the server to connect to.
116                    - "port": the server port to connect to.
117                    - "username": the username to authenticate as.
118                    - "password": used for password authentication.
119                    - "pkey": optional - an optional public key to use for
120                        authentication.
121                    - "passphrase" – optional - options used for decrypting private
122                        keys.
123                    - "key_filename" – optional - the filename, or list of filenames,
124                        of optional private key(s) and/or certs to try for
125                        authentication.
126                    - "timeout" – an optional timeout (in seconds) for the TCP connect.
127                    - "allow_agent" – optional - set to False to disable
128                        connecting to the SSH agent.
129                    - "look_for_keys" – optional - set to False to disable searching
130                        for discoverable private key files in ~/.ssh/.
131                    - "compress" – optional - set to True to turn on compression.
132                    - "sock" - optional - an open socket or socket-like object
133                        to use for communication to the target host.
134                    - "gss_auth" – optional - True if you want to use GSS-API
135                        authentication.
136                    - "gss_kex" – optional - Perform GSS-API Key Exchange and
137                        user authentication.
138                    - "gss_deleg_creds" – optional - Delegate GSS-API client
139                        credentials or not.
140                    - "gss_host" – optional - The targets name in the kerberos database.
141                    - "gss_trust_dns" – optional - Indicates whether or
142                        not the DNS is trusted to securely canonicalize the name of the
143                        host being connected to (default True).
144                    - "banner_timeout" – an optional timeout (in seconds)
145                        to wait for the SSH banner to be presented.
146                    - "auth_timeout" – an optional timeout (in seconds)
147                        to wait for an authentication response.
148                    - "disabled_algorithms" – an optional dict passed directly to
149                        Transport and its keyword argument of the same name.
150                    - "transport_factory" – an optional callable which is handed a
151                        subset of the constructor arguments (primarily those related
152                        to the socket, GSS functionality, and algorithm selection)
153                        and generates a Transport instance to be used by this client.
154                        Defaults to Transport.__init__.
155
156                The parameter to specify the private key is expected to be in
157                RSA format. Attempting a connection with a blank host key is
158                not allowed unless the argument "add_auto_policy" is explicitly
159                set to True.
160
161        Returns:
162            sftp -> a new SFTPClient session object.
163            transport -> the Transport for this connection.
164        """
165        ssh_client = p.SSHClient()
166        try:
167            if not options_args.get("pkey") and not options_args.get("add_auto_policy"):
168                raise WrongArgumentsException(
169                    "Get SFTP Client: No host key (pkey) was provided and the "
170                    + "add_auto_policy property is false."
171                )
172
173            if options_args.get("pkey") and not options_args.get("key_type"):
174                raise WrongArgumentsException(
175                    "Get SFTP Client: The key_type must be provided when "
176                    + "the host key (pkey) is provided."
177                )
178
179            if options_args.get("pkey", None) and options_args.get("key_type", None):
180                key = cls._get_host_keys(
181                    options_args.get("pkey", None), options_args.get("key_type", None)
182                )
183                ssh_client.get_host_keys().add(
184                    hostname=f"[{options_args.get('hostname')}]:"
185                    + f"{options_args.get('port')}",
186                    keytype="ssh-rsa",
187                    key=key,
188                )
189            elif options_args.get("add_auto_policy", None):
190                ssh_client.load_system_host_keys()
191                ssh_client.set_missing_host_key_policy(p.WarningPolicy())  # nosec: B507
192            else:
193                ssh_client.load_system_host_keys()
194                ssh_client.set_missing_host_key_policy(p.RejectPolicy())
195
196            ssh_client.connect(
197                hostname=options_args.get("hostname"),
198                port=options_args.get("port", 22),
199                username=options_args.get("username", None),
200                password=options_args.get("password", None),
201                key_filename=options_args.get("key_filename", None),
202                timeout=options_args.get("timeout", None),
203                allow_agent=options_args.get("allow_agent", True),
204                look_for_keys=options_args.get("look_for_keys", True),
205                compress=options_args.get("compress", False),
206                sock=options_args.get("sock", None),
207                gss_auth=options_args.get("gss_auth", False),
208                gss_kex=options_args.get("gss_kex", False),
209                gss_deleg_creds=options_args.get("gss_deleg_creds", False),
210                gss_host=options_args.get("gss_host", False),
211                banner_timeout=options_args.get("banner_timeout", None),
212                auth_timeout=options_args.get("auth_timeout", None),
213                gss_trust_dns=options_args.get("gss_trust_dns", None),
214                passphrase=options_args.get("passphrase", None),
215                disabled_algorithms=options_args.get("disabled_algorithms", None),
216                transport_factory=options_args.get("transport_factory", None),
217            )
218
219            sftp = ssh_client.open_sftp()
220            transport = ssh_client.get_transport()
221        except ConnectionError as e:
222            cls._logger.error(e)
223            raise
224        return sftp, transport

Get the SFTP client.

The SFTP client is used to open an SFTP session across an open SSH Transport and perform remote file operations.

Arguments:
  • options_args: dictionary containing SFTP connection parameters. The Paramiko arguments expected to connect are:

    • "hostname": the server to connect to.
    • "port": the server port to connect to.
    • "username": the username to authenticate as.
    • "password": used for password authentication.
    • "pkey": optional - an optional public key to use for authentication.
    • "passphrase" – optional - options used for decrypting private keys.
    • "key_filename" – optional - the filename, or list of filenames, of optional private key(s) and/or certs to try for authentication.
    • "timeout" – an optional timeout (in seconds) for the TCP connect.
    • "allow_agent" – optional - set to False to disable connecting to the SSH agent.
    • "look_for_keys" – optional - set to False to disable searching for discoverable private key files in ~/.ssh/.
    • "compress" – optional - set to True to turn on compression.
    • "sock" - optional - an open socket or socket-like object to use for communication to the target host.
    • "gss_auth" – optional - True if you want to use GSS-API authentication.
    • "gss_kex" – optional - Perform GSS-API Key Exchange and user authentication.
    • "gss_deleg_creds" – optional - Delegate GSS-API client credentials or not.
    • "gss_host" – optional - The targets name in the kerberos database.
    • "gss_trust_dns" – optional - Indicates whether or not the DNS is trusted to securely canonicalize the name of the host being connected to (default True).
    • "banner_timeout" – an optional timeout (in seconds) to wait for the SSH banner to be presented.
    • "auth_timeout" – an optional timeout (in seconds) to wait for an authentication response.
    • "disabled_algorithms" – an optional dict passed directly to Transport and its keyword argument of the same name.
    • "transport_factory" – an optional callable which is handed a subset of the constructor arguments (primarily those related to the socket, GSS functionality, and algorithm selection) and generates a Transport instance to be used by this client. Defaults to Transport.__init__.

    The parameter to specify the private key is expected to be in RSA format. Attempting a connection with a blank host key is not allowed unless the argument "add_auto_policy" is explicitly set to True.

Returns:

sftp -> a new SFTPClient session object. transport -> the Transport for this connection.

@classmethod
def validate_format(cls, files_format: str) -> str:
226    @classmethod
227    def validate_format(cls, files_format: str) -> str:
228        """Validate the file extension based on the format definitions.
229
230        Args:
231            files_format: a string containing the file extension.
232
233        Returns:
234            The string validated and formatted.
235        """
236        formats_allowed = [
237            SFTPInputFormat.CSV.value,
238            SFTPInputFormat.FWF.value,
239            SFTPInputFormat.JSON.value,
240            SFTPInputFormat.XML.value,
241        ]
242
243        if files_format not in formats_allowed:
244            raise WrongArgumentsException(
245                f"The formats allowed for SFTP are {formats_allowed}."
246            )
247
248        return files_format

Validate the file extension based on the format definitions.

Arguments:
  • files_format: a string containing the file extension.
Returns:

The string validated and formatted.

@classmethod
def validate_location(cls, location: str) -> str:
250    @classmethod
251    def validate_location(cls, location: str) -> str:
252        """Validate the location. Add "/" in the case it does not exist.
253
254        Args:
255            location: file path.
256
257        Returns:
258            The location validated.
259        """
260        return location if location.rfind("/") == len(location) - 1 else location + "/"

Validate the location. Add "/" in the case it does not exist.

Arguments:
  • location: file path.
Returns:

The location validated.