lakehouse_engine.utils.extraction.sftp_extraction_utils
Utilities module for SFTP extraction processes.
1"""Utilities module for SFTP extraction processes.""" 2 3import stat 4from base64 import decodebytes 5from datetime import datetime 6from enum import Enum 7from logging import Logger 8from stat import S_ISREG 9from typing import Any, List, Set, Tuple, Union 10 11import paramiko as p 12from paramiko import Ed25519Key, PKey, RSAKey, Transport 13from paramiko.sftp_client import SFTPAttributes, SFTPClient # type: ignore 14 15from lakehouse_engine.transformers.exceptions import WrongArgumentsException 16from lakehouse_engine.utils.logging_handler import LoggingHandler 17 18 19class SFTPInputFormat(Enum): 20 """Formats of algorithm input.""" 21 22 CSV = "csv" 23 FWF = "fwf" 24 JSON = "json" 25 XML = "xml" 26 27 28class SFTPExtractionFilter(Enum): 29 """Standardize the types of filters we can have from a SFTP source.""" 30 31 file_name_contains = "file_name_contains" 32 LATEST_FILE = "latest_file" 33 EARLIEST_FILE = "earliest_file" 34 GREATER_THAN = "date_time_gt" 35 LOWER_THAN = "date_time_lt" 36 37 38class SFTPExtractionUtils(object): 39 """Utils for managing data extraction from particularly relevant SFTP sources.""" 40 41 _logger: Logger = LoggingHandler(__name__).get_logger() 42 43 @classmethod 44 def get_files_list( 45 cls, sftp: SFTPClient, remote_path: str, options_args: dict 46 ) -> Set[str]: 47 """Get a list of files to be extracted from SFTP. 48 49 The arguments (options_args) to list files are: 50 - date_time_gt(str): 51 Filter the files greater than the string datetime 52 formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS". 53 - date_time_lt(str): 54 Filter the files lower than the string datetime 55 formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS". 56 - earliest_file(bool): 57 Filter the earliest dated file in the directory. 58 - file_name_contains(str): 59 Filter files when match the pattern. 60 - latest_file(bool): 61 Filter the most recent dated file in the directory. 62 - sub_dir(bool): 63 When true, the engine will search files into subdirectories 64 of the remote_path. 65 It will consider one level below the remote_path. 66 When sub_dir is used with latest_file/earliest_file argument, 67 the engine will retrieve the latest_file/earliest_file 68 for each subdirectory. 69 70 Args: 71 sftp: the SFTP client object. 72 remote_path: path of files to be filtered. 73 options_args: options from the acon. 74 75 Returns: 76 A list containing the file names to be passed to Spark. 77 """ 78 all_items, folder_path = cls._get_folder_items(remote_path, sftp, options_args) 79 80 filtered_files: Set[str] = set() 81 82 try: 83 for item, folder in zip(all_items, folder_path): 84 file_contains = cls._file_has_pattern(item, options_args) 85 file_in_interval = cls._file_in_date_interval(item, options_args) 86 if file_contains and file_in_interval: 87 filtered_files.add(folder + item.filename) 88 89 if ( 90 SFTPExtractionFilter.EARLIEST_FILE.value in options_args.keys() 91 or SFTPExtractionFilter.LATEST_FILE.value in options_args.keys() 92 ): 93 filtered_files = cls._get_earliest_latest_file( 94 sftp, options_args, filtered_files, folder_path 95 ) 96 97 except Exception as e: 98 cls._logger.error(f"SFTP list_files EXCEPTION: - {e}") 99 return filtered_files 100 101 @classmethod 102 def get_sftp_client( 103 cls, 104 options_args: dict, 105 ) -> Tuple[SFTPClient, Transport]: 106 """Get the SFTP client. 107 108 The SFTP client is used to open an SFTP session across an open 109 SSH Transport and perform remote file operations. 110 111 Args: 112 options_args: dictionary containing SFTP connection parameters. 113 The Paramiko arguments expected to connect are: 114 - "hostname": the server to connect to. 115 - "port": the server port to connect to. 116 - "username": the username to authenticate as. 117 - "password": used for password authentication. 118 - "pkey": optional - an optional public key to use for 119 authentication. 120 - "passphrase" – optional - options used for decrypting private 121 keys. 122 - "key_filename" – optional - the filename, or list of filenames, 123 of optional private key(s) and/or certs to try for 124 authentication. 125 - "timeout" – an optional timeout (in seconds) for the TCP connect. 126 - "allow_agent" – optional - set to False to disable 127 connecting to the SSH agent. 128 - "look_for_keys" – optional - set to False to disable searching 129 for discoverable private key files in ~/.ssh/. 130 - "compress" – optional - set to True to turn on compression. 131 - "sock" - optional - an open socket or socket-like object 132 to use for communication to the target host. 133 - "gss_auth" – optional - True if you want to use GSS-API 134 authentication. 135 - "gss_kex" – optional - Perform GSS-API Key Exchange and 136 user authentication. 137 - "gss_deleg_creds" – optional - Delegate GSS-API client 138 credentials or not. 139 - "gss_host" – optional - The targets name in the kerberos database. 140 - "gss_trust_dns" – optional - Indicates whether or 141 not the DNS is trusted to securely canonicalize the name of the 142 host being connected to (default True). 143 - "banner_timeout" – an optional timeout (in seconds) 144 to wait for the SSH banner to be presented. 145 - "auth_timeout" – an optional timeout (in seconds) 146 to wait for an authentication response. 147 - "disabled_algorithms" – an optional dict passed directly to 148 Transport and its keyword argument of the same name. 149 - "transport_factory" – an optional callable which is handed a 150 subset of the constructor arguments (primarily those related 151 to the socket, GSS functionality, and algorithm selection) 152 and generates a Transport instance to be used by this client. 153 Defaults to Transport.__init__. 154 155 The parameter to specify the private key is expected to be in 156 RSA format. Attempting a connection with a blank host key is 157 not allowed unless the argument "add_auto_policy" is explicitly 158 set to True. 159 160 Returns: 161 sftp -> a new SFTPClient session object. 162 transport -> the Transport for this connection. 163 """ 164 ssh_client = p.SSHClient() 165 try: 166 if not options_args.get("pkey") and not options_args.get("add_auto_policy"): 167 raise WrongArgumentsException( 168 "Get SFTP Client: No host key (pkey) was provided and the " 169 + "add_auto_policy property is false." 170 ) 171 172 if options_args.get("pkey") and not options_args.get("key_type"): 173 raise WrongArgumentsException( 174 "Get SFTP Client: The key_type must be provided when " 175 + "the host key (pkey) is provided." 176 ) 177 178 if options_args.get("pkey", None) and options_args.get("key_type", None): 179 key = cls._get_host_keys( 180 options_args.get("pkey", None), options_args.get("key_type", None) 181 ) 182 ssh_client.get_host_keys().add( 183 hostname=f"[{options_args.get('hostname')}]:" 184 + f"{options_args.get('port')}", 185 keytype="ssh-rsa", 186 key=key, 187 ) 188 elif options_args.get("add_auto_policy", None): 189 ssh_client.load_system_host_keys() 190 ssh_client.set_missing_host_key_policy(p.WarningPolicy()) # nosec: B507 191 else: 192 ssh_client.load_system_host_keys() 193 ssh_client.set_missing_host_key_policy(p.RejectPolicy()) 194 195 ssh_client.connect( 196 hostname=options_args.get("hostname"), 197 port=options_args.get("port", 22), 198 username=options_args.get("username", None), 199 password=options_args.get("password", None), 200 key_filename=options_args.get("key_filename", None), 201 timeout=options_args.get("timeout", None), 202 allow_agent=options_args.get("allow_agent", True), 203 look_for_keys=options_args.get("look_for_keys", True), 204 compress=options_args.get("compress", False), 205 sock=options_args.get("sock", None), 206 gss_auth=options_args.get("gss_auth", False), 207 gss_kex=options_args.get("gss_kex", False), 208 gss_deleg_creds=options_args.get("gss_deleg_creds", False), 209 gss_host=options_args.get("gss_host", False), 210 banner_timeout=options_args.get("banner_timeout", None), 211 auth_timeout=options_args.get("auth_timeout", None), 212 gss_trust_dns=options_args.get("gss_trust_dns", None), 213 passphrase=options_args.get("passphrase", None), 214 disabled_algorithms=options_args.get("disabled_algorithms", None), 215 transport_factory=options_args.get("transport_factory", None), 216 ) 217 218 sftp = ssh_client.open_sftp() 219 transport = ssh_client.get_transport() 220 except ConnectionError as e: 221 cls._logger.error(e) 222 raise 223 return sftp, transport 224 225 @classmethod 226 def validate_format(cls, files_format: str) -> str: 227 """Validate the file extension based on the format definitions. 228 229 Args: 230 files_format: a string containing the file extension. 231 232 Returns: 233 The string validated and formatted. 234 """ 235 formats_allowed = [ 236 SFTPInputFormat.CSV.value, 237 SFTPInputFormat.FWF.value, 238 SFTPInputFormat.JSON.value, 239 SFTPInputFormat.XML.value, 240 ] 241 242 if files_format not in formats_allowed: 243 raise WrongArgumentsException( 244 f"The formats allowed for SFTP are {formats_allowed}." 245 ) 246 247 return files_format 248 249 @classmethod 250 def validate_location(cls, location: str) -> str: 251 """Validate the location. Add "/" in the case it does not exist. 252 253 Args: 254 location: file path. 255 256 Returns: 257 The location validated. 258 """ 259 return location if location.rfind("/") == len(location) - 1 else location + "/" 260 261 @classmethod 262 def _file_has_pattern(cls, item: SFTPAttributes, options_args: dict) -> bool: 263 """Check if a file follows the pattern used for filtering. 264 265 Args: 266 item: item available in SFTP directory. 267 options_args: options from the acon. 268 269 Returns: 270 A boolean telling whether the file contains a pattern or not. 271 """ 272 file_to_consider = True 273 274 if SFTPExtractionFilter.file_name_contains.value in options_args.keys(): 275 if not ( 276 options_args.get(SFTPExtractionFilter.file_name_contains.value) 277 in item.filename 278 and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename)) 279 ): 280 file_to_consider = False 281 282 return file_to_consider 283 284 @classmethod 285 def _file_in_date_interval( 286 cls, 287 item: SFTPAttributes, 288 options_args: dict, 289 ) -> bool: 290 """Check if the file is in the expected date interval. 291 292 The logic is applied based on the arguments greater_than and lower_than. 293 i.e: 294 - if greater_than and lower_than have values, 295 then it performs a between. 296 - if only lower_than has values, 297 then only values lower than the input value will be retrieved. 298 - if only greater_than has values, 299 then only values greater than the input value will be retrieved. 300 301 Args: 302 item: item available in SFTP directory. 303 options_args: options from the acon. 304 305 Returns: 306 A boolean telling whether the file is in the expected date interval or not. 307 """ 308 file_to_consider = True 309 310 if ( 311 SFTPExtractionFilter.LOWER_THAN.value in options_args.keys() 312 or SFTPExtractionFilter.GREATER_THAN.value in options_args.keys() 313 and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename)) 314 ): 315 lower_than = options_args.get( 316 SFTPExtractionFilter.LOWER_THAN.value, "9999-12-31" 317 ) 318 greater_than = options_args.get( 319 SFTPExtractionFilter.GREATER_THAN.value, "1900-01-01" 320 ) 321 322 file_date = datetime.fromtimestamp(item.st_mtime) 323 324 if not ( 325 ( 326 lower_than == greater_than 327 and cls._validate_date(greater_than) 328 <= file_date 329 <= cls._validate_date(lower_than) 330 ) 331 or ( 332 cls._validate_date(greater_than) 333 < file_date 334 < cls._validate_date(lower_than) 335 ) 336 ): 337 file_to_consider = False 338 339 return file_to_consider 340 341 @classmethod 342 def _get_earliest_latest_file( 343 cls, 344 sftp: SFTPClient, 345 options_args: dict, 346 list_filter_files: Set[str], 347 folder_path: List, 348 ) -> Set[str]: 349 """Get the earliest or latest file of a directory. 350 351 Args: 352 sftp: the SFTP client object. 353 options_args: options from the acon. 354 list_filter_files: set of file names to filter from. 355 folder_path: the location of files. 356 357 Returns: 358 A set containing the earliest/latest file name. 359 """ 360 list_earl_lat_files: Set[str] = set() 361 362 for folder in folder_path: 363 file_date = 0 364 file_name = "" 365 all_items, file_path = cls._get_folder_items( 366 f"{folder}", sftp, options_args 367 ) 368 for item in all_items: 369 if ( 370 folder + item.filename in list_filter_files 371 and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename)) 372 and ( 373 options_args.get("earliest_file") 374 and (file_date == 0 or item.st_mtime < file_date) 375 ) 376 or ( 377 options_args.get("latest_file") 378 and (file_date == 0 or item.st_mtime > file_date) 379 ) 380 ): 381 file_date = item.st_mtime 382 file_name = folder + item.filename 383 list_earl_lat_files.add(file_name) 384 385 return list_earl_lat_files 386 387 @classmethod 388 def _get_folder_items( 389 cls, remote_path: str, sftp: SFTPClient, options_args: dict 390 ) -> Tuple: 391 """Get the files and the directory to be processed. 392 393 Args: 394 remote_path: root folder path. 395 sftp: a SFTPClient session object. 396 options_args: options from the acon. 397 398 Returns: 399 A tuple with a list of items (file object) and a list of directories. 400 """ 401 sub_dir = options_args.get("sub_dir", False) 402 all_items: List[SFTPAttributes] = sftp.listdir_attr(remote_path) 403 items: List[SFTPAttributes] = [] 404 folders: List = [] 405 406 for item in all_items: 407 is_dir = stat.S_ISDIR(item.st_mode) 408 if is_dir and sub_dir and not item.filename.endswith((".gz", ".zip")): 409 dirs = sftp.listdir_attr(f"{remote_path}{item.filename}") 410 for file in dirs: 411 items.append(file) 412 folders.append(f"{remote_path}{item.filename}/") 413 else: 414 items.append(item) 415 folders.append(remote_path) 416 417 return items, folders 418 419 @classmethod 420 def _get_host_keys(cls, pkey: str, key_type: str) -> PKey: 421 """Get the pkey that will be added to the server. 422 423 Args: 424 pkey: a string with a host key value. 425 key_type: the type of key (rsa or ed25519). 426 427 Returns: 428 A PKey that will be used to authenticate the connection. 429 """ 430 key: Union[RSAKey, Ed25519Key] = None 431 if pkey and key_type.lower() == "rsa": 432 b_pkey = bytes(pkey, "UTF-8") 433 key = p.RSAKey(data=decodebytes(b_pkey)) 434 elif pkey and key_type.lower() == "ed25519": 435 b_pkey = bytes(pkey, "UTF-8") 436 key = p.Ed25519Key(data=decodebytes(b_pkey)) 437 438 return key 439 440 @classmethod 441 def _is_compressed(cls, filename: str) -> Any: 442 """Validate if it is a compressed file. 443 444 Args: 445 filename: name of the file to be validated. 446 447 Returns: 448 A boolean with the result. 449 """ 450 return filename.endswith((".gz", ".zip")) 451 452 @classmethod 453 def _validate_date(cls, date_text: str) -> datetime: 454 """Validate the input date format. 455 456 Args: 457 date_text: a string with the date or datetime value. 458 The expected formats are: 459 YYYY-MM-DD and YYYY-MM-DD HH:MM:SS 460 461 Returns: 462 The datetime validated and formatted. 463 """ 464 for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S"): 465 try: 466 if date_text is not None: 467 return datetime.strptime(date_text, fmt) 468 except ValueError: 469 pass 470 raise ValueError( 471 "Incorrect data format, should be YYYY-MM-DD or YYYY-MM-DD HH:MM:SS." 472 )
20class SFTPInputFormat(Enum): 21 """Formats of algorithm input.""" 22 23 CSV = "csv" 24 FWF = "fwf" 25 JSON = "json" 26 XML = "xml"
Formats of algorithm input.
Inherited Members
- enum.Enum
- name
- value
29class SFTPExtractionFilter(Enum): 30 """Standardize the types of filters we can have from a SFTP source.""" 31 32 file_name_contains = "file_name_contains" 33 LATEST_FILE = "latest_file" 34 EARLIEST_FILE = "earliest_file" 35 GREATER_THAN = "date_time_gt" 36 LOWER_THAN = "date_time_lt"
Standardize the types of filters we can have from a SFTP source.
Inherited Members
- enum.Enum
- name
- value
39class SFTPExtractionUtils(object): 40 """Utils for managing data extraction from particularly relevant SFTP sources.""" 41 42 _logger: Logger = LoggingHandler(__name__).get_logger() 43 44 @classmethod 45 def get_files_list( 46 cls, sftp: SFTPClient, remote_path: str, options_args: dict 47 ) -> Set[str]: 48 """Get a list of files to be extracted from SFTP. 49 50 The arguments (options_args) to list files are: 51 - date_time_gt(str): 52 Filter the files greater than the string datetime 53 formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS". 54 - date_time_lt(str): 55 Filter the files lower than the string datetime 56 formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS". 57 - earliest_file(bool): 58 Filter the earliest dated file in the directory. 59 - file_name_contains(str): 60 Filter files when match the pattern. 61 - latest_file(bool): 62 Filter the most recent dated file in the directory. 63 - sub_dir(bool): 64 When true, the engine will search files into subdirectories 65 of the remote_path. 66 It will consider one level below the remote_path. 67 When sub_dir is used with latest_file/earliest_file argument, 68 the engine will retrieve the latest_file/earliest_file 69 for each subdirectory. 70 71 Args: 72 sftp: the SFTP client object. 73 remote_path: path of files to be filtered. 74 options_args: options from the acon. 75 76 Returns: 77 A list containing the file names to be passed to Spark. 78 """ 79 all_items, folder_path = cls._get_folder_items(remote_path, sftp, options_args) 80 81 filtered_files: Set[str] = set() 82 83 try: 84 for item, folder in zip(all_items, folder_path): 85 file_contains = cls._file_has_pattern(item, options_args) 86 file_in_interval = cls._file_in_date_interval(item, options_args) 87 if file_contains and file_in_interval: 88 filtered_files.add(folder + item.filename) 89 90 if ( 91 SFTPExtractionFilter.EARLIEST_FILE.value in options_args.keys() 92 or SFTPExtractionFilter.LATEST_FILE.value in options_args.keys() 93 ): 94 filtered_files = cls._get_earliest_latest_file( 95 sftp, options_args, filtered_files, folder_path 96 ) 97 98 except Exception as e: 99 cls._logger.error(f"SFTP list_files EXCEPTION: - {e}") 100 return filtered_files 101 102 @classmethod 103 def get_sftp_client( 104 cls, 105 options_args: dict, 106 ) -> Tuple[SFTPClient, Transport]: 107 """Get the SFTP client. 108 109 The SFTP client is used to open an SFTP session across an open 110 SSH Transport and perform remote file operations. 111 112 Args: 113 options_args: dictionary containing SFTP connection parameters. 114 The Paramiko arguments expected to connect are: 115 - "hostname": the server to connect to. 116 - "port": the server port to connect to. 117 - "username": the username to authenticate as. 118 - "password": used for password authentication. 119 - "pkey": optional - an optional public key to use for 120 authentication. 121 - "passphrase" – optional - options used for decrypting private 122 keys. 123 - "key_filename" – optional - the filename, or list of filenames, 124 of optional private key(s) and/or certs to try for 125 authentication. 126 - "timeout" – an optional timeout (in seconds) for the TCP connect. 127 - "allow_agent" – optional - set to False to disable 128 connecting to the SSH agent. 129 - "look_for_keys" – optional - set to False to disable searching 130 for discoverable private key files in ~/.ssh/. 131 - "compress" – optional - set to True to turn on compression. 132 - "sock" - optional - an open socket or socket-like object 133 to use for communication to the target host. 134 - "gss_auth" – optional - True if you want to use GSS-API 135 authentication. 136 - "gss_kex" – optional - Perform GSS-API Key Exchange and 137 user authentication. 138 - "gss_deleg_creds" – optional - Delegate GSS-API client 139 credentials or not. 140 - "gss_host" – optional - The targets name in the kerberos database. 141 - "gss_trust_dns" – optional - Indicates whether or 142 not the DNS is trusted to securely canonicalize the name of the 143 host being connected to (default True). 144 - "banner_timeout" – an optional timeout (in seconds) 145 to wait for the SSH banner to be presented. 146 - "auth_timeout" – an optional timeout (in seconds) 147 to wait for an authentication response. 148 - "disabled_algorithms" – an optional dict passed directly to 149 Transport and its keyword argument of the same name. 150 - "transport_factory" – an optional callable which is handed a 151 subset of the constructor arguments (primarily those related 152 to the socket, GSS functionality, and algorithm selection) 153 and generates a Transport instance to be used by this client. 154 Defaults to Transport.__init__. 155 156 The parameter to specify the private key is expected to be in 157 RSA format. Attempting a connection with a blank host key is 158 not allowed unless the argument "add_auto_policy" is explicitly 159 set to True. 160 161 Returns: 162 sftp -> a new SFTPClient session object. 163 transport -> the Transport for this connection. 164 """ 165 ssh_client = p.SSHClient() 166 try: 167 if not options_args.get("pkey") and not options_args.get("add_auto_policy"): 168 raise WrongArgumentsException( 169 "Get SFTP Client: No host key (pkey) was provided and the " 170 + "add_auto_policy property is false." 171 ) 172 173 if options_args.get("pkey") and not options_args.get("key_type"): 174 raise WrongArgumentsException( 175 "Get SFTP Client: The key_type must be provided when " 176 + "the host key (pkey) is provided." 177 ) 178 179 if options_args.get("pkey", None) and options_args.get("key_type", None): 180 key = cls._get_host_keys( 181 options_args.get("pkey", None), options_args.get("key_type", None) 182 ) 183 ssh_client.get_host_keys().add( 184 hostname=f"[{options_args.get('hostname')}]:" 185 + f"{options_args.get('port')}", 186 keytype="ssh-rsa", 187 key=key, 188 ) 189 elif options_args.get("add_auto_policy", None): 190 ssh_client.load_system_host_keys() 191 ssh_client.set_missing_host_key_policy(p.WarningPolicy()) # nosec: B507 192 else: 193 ssh_client.load_system_host_keys() 194 ssh_client.set_missing_host_key_policy(p.RejectPolicy()) 195 196 ssh_client.connect( 197 hostname=options_args.get("hostname"), 198 port=options_args.get("port", 22), 199 username=options_args.get("username", None), 200 password=options_args.get("password", None), 201 key_filename=options_args.get("key_filename", None), 202 timeout=options_args.get("timeout", None), 203 allow_agent=options_args.get("allow_agent", True), 204 look_for_keys=options_args.get("look_for_keys", True), 205 compress=options_args.get("compress", False), 206 sock=options_args.get("sock", None), 207 gss_auth=options_args.get("gss_auth", False), 208 gss_kex=options_args.get("gss_kex", False), 209 gss_deleg_creds=options_args.get("gss_deleg_creds", False), 210 gss_host=options_args.get("gss_host", False), 211 banner_timeout=options_args.get("banner_timeout", None), 212 auth_timeout=options_args.get("auth_timeout", None), 213 gss_trust_dns=options_args.get("gss_trust_dns", None), 214 passphrase=options_args.get("passphrase", None), 215 disabled_algorithms=options_args.get("disabled_algorithms", None), 216 transport_factory=options_args.get("transport_factory", None), 217 ) 218 219 sftp = ssh_client.open_sftp() 220 transport = ssh_client.get_transport() 221 except ConnectionError as e: 222 cls._logger.error(e) 223 raise 224 return sftp, transport 225 226 @classmethod 227 def validate_format(cls, files_format: str) -> str: 228 """Validate the file extension based on the format definitions. 229 230 Args: 231 files_format: a string containing the file extension. 232 233 Returns: 234 The string validated and formatted. 235 """ 236 formats_allowed = [ 237 SFTPInputFormat.CSV.value, 238 SFTPInputFormat.FWF.value, 239 SFTPInputFormat.JSON.value, 240 SFTPInputFormat.XML.value, 241 ] 242 243 if files_format not in formats_allowed: 244 raise WrongArgumentsException( 245 f"The formats allowed for SFTP are {formats_allowed}." 246 ) 247 248 return files_format 249 250 @classmethod 251 def validate_location(cls, location: str) -> str: 252 """Validate the location. Add "/" in the case it does not exist. 253 254 Args: 255 location: file path. 256 257 Returns: 258 The location validated. 259 """ 260 return location if location.rfind("/") == len(location) - 1 else location + "/" 261 262 @classmethod 263 def _file_has_pattern(cls, item: SFTPAttributes, options_args: dict) -> bool: 264 """Check if a file follows the pattern used for filtering. 265 266 Args: 267 item: item available in SFTP directory. 268 options_args: options from the acon. 269 270 Returns: 271 A boolean telling whether the file contains a pattern or not. 272 """ 273 file_to_consider = True 274 275 if SFTPExtractionFilter.file_name_contains.value in options_args.keys(): 276 if not ( 277 options_args.get(SFTPExtractionFilter.file_name_contains.value) 278 in item.filename 279 and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename)) 280 ): 281 file_to_consider = False 282 283 return file_to_consider 284 285 @classmethod 286 def _file_in_date_interval( 287 cls, 288 item: SFTPAttributes, 289 options_args: dict, 290 ) -> bool: 291 """Check if the file is in the expected date interval. 292 293 The logic is applied based on the arguments greater_than and lower_than. 294 i.e: 295 - if greater_than and lower_than have values, 296 then it performs a between. 297 - if only lower_than has values, 298 then only values lower than the input value will be retrieved. 299 - if only greater_than has values, 300 then only values greater than the input value will be retrieved. 301 302 Args: 303 item: item available in SFTP directory. 304 options_args: options from the acon. 305 306 Returns: 307 A boolean telling whether the file is in the expected date interval or not. 308 """ 309 file_to_consider = True 310 311 if ( 312 SFTPExtractionFilter.LOWER_THAN.value in options_args.keys() 313 or SFTPExtractionFilter.GREATER_THAN.value in options_args.keys() 314 and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename)) 315 ): 316 lower_than = options_args.get( 317 SFTPExtractionFilter.LOWER_THAN.value, "9999-12-31" 318 ) 319 greater_than = options_args.get( 320 SFTPExtractionFilter.GREATER_THAN.value, "1900-01-01" 321 ) 322 323 file_date = datetime.fromtimestamp(item.st_mtime) 324 325 if not ( 326 ( 327 lower_than == greater_than 328 and cls._validate_date(greater_than) 329 <= file_date 330 <= cls._validate_date(lower_than) 331 ) 332 or ( 333 cls._validate_date(greater_than) 334 < file_date 335 < cls._validate_date(lower_than) 336 ) 337 ): 338 file_to_consider = False 339 340 return file_to_consider 341 342 @classmethod 343 def _get_earliest_latest_file( 344 cls, 345 sftp: SFTPClient, 346 options_args: dict, 347 list_filter_files: Set[str], 348 folder_path: List, 349 ) -> Set[str]: 350 """Get the earliest or latest file of a directory. 351 352 Args: 353 sftp: the SFTP client object. 354 options_args: options from the acon. 355 list_filter_files: set of file names to filter from. 356 folder_path: the location of files. 357 358 Returns: 359 A set containing the earliest/latest file name. 360 """ 361 list_earl_lat_files: Set[str] = set() 362 363 for folder in folder_path: 364 file_date = 0 365 file_name = "" 366 all_items, file_path = cls._get_folder_items( 367 f"{folder}", sftp, options_args 368 ) 369 for item in all_items: 370 if ( 371 folder + item.filename in list_filter_files 372 and (S_ISREG(item.st_mode) or cls._is_compressed(item.filename)) 373 and ( 374 options_args.get("earliest_file") 375 and (file_date == 0 or item.st_mtime < file_date) 376 ) 377 or ( 378 options_args.get("latest_file") 379 and (file_date == 0 or item.st_mtime > file_date) 380 ) 381 ): 382 file_date = item.st_mtime 383 file_name = folder + item.filename 384 list_earl_lat_files.add(file_name) 385 386 return list_earl_lat_files 387 388 @classmethod 389 def _get_folder_items( 390 cls, remote_path: str, sftp: SFTPClient, options_args: dict 391 ) -> Tuple: 392 """Get the files and the directory to be processed. 393 394 Args: 395 remote_path: root folder path. 396 sftp: a SFTPClient session object. 397 options_args: options from the acon. 398 399 Returns: 400 A tuple with a list of items (file object) and a list of directories. 401 """ 402 sub_dir = options_args.get("sub_dir", False) 403 all_items: List[SFTPAttributes] = sftp.listdir_attr(remote_path) 404 items: List[SFTPAttributes] = [] 405 folders: List = [] 406 407 for item in all_items: 408 is_dir = stat.S_ISDIR(item.st_mode) 409 if is_dir and sub_dir and not item.filename.endswith((".gz", ".zip")): 410 dirs = sftp.listdir_attr(f"{remote_path}{item.filename}") 411 for file in dirs: 412 items.append(file) 413 folders.append(f"{remote_path}{item.filename}/") 414 else: 415 items.append(item) 416 folders.append(remote_path) 417 418 return items, folders 419 420 @classmethod 421 def _get_host_keys(cls, pkey: str, key_type: str) -> PKey: 422 """Get the pkey that will be added to the server. 423 424 Args: 425 pkey: a string with a host key value. 426 key_type: the type of key (rsa or ed25519). 427 428 Returns: 429 A PKey that will be used to authenticate the connection. 430 """ 431 key: Union[RSAKey, Ed25519Key] = None 432 if pkey and key_type.lower() == "rsa": 433 b_pkey = bytes(pkey, "UTF-8") 434 key = p.RSAKey(data=decodebytes(b_pkey)) 435 elif pkey and key_type.lower() == "ed25519": 436 b_pkey = bytes(pkey, "UTF-8") 437 key = p.Ed25519Key(data=decodebytes(b_pkey)) 438 439 return key 440 441 @classmethod 442 def _is_compressed(cls, filename: str) -> Any: 443 """Validate if it is a compressed file. 444 445 Args: 446 filename: name of the file to be validated. 447 448 Returns: 449 A boolean with the result. 450 """ 451 return filename.endswith((".gz", ".zip")) 452 453 @classmethod 454 def _validate_date(cls, date_text: str) -> datetime: 455 """Validate the input date format. 456 457 Args: 458 date_text: a string with the date or datetime value. 459 The expected formats are: 460 YYYY-MM-DD and YYYY-MM-DD HH:MM:SS 461 462 Returns: 463 The datetime validated and formatted. 464 """ 465 for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S"): 466 try: 467 if date_text is not None: 468 return datetime.strptime(date_text, fmt) 469 except ValueError: 470 pass 471 raise ValueError( 472 "Incorrect data format, should be YYYY-MM-DD or YYYY-MM-DD HH:MM:SS." 473 )
Utils for managing data extraction from particularly relevant SFTP sources.
44 @classmethod 45 def get_files_list( 46 cls, sftp: SFTPClient, remote_path: str, options_args: dict 47 ) -> Set[str]: 48 """Get a list of files to be extracted from SFTP. 49 50 The arguments (options_args) to list files are: 51 - date_time_gt(str): 52 Filter the files greater than the string datetime 53 formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS". 54 - date_time_lt(str): 55 Filter the files lower than the string datetime 56 formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS". 57 - earliest_file(bool): 58 Filter the earliest dated file in the directory. 59 - file_name_contains(str): 60 Filter files when match the pattern. 61 - latest_file(bool): 62 Filter the most recent dated file in the directory. 63 - sub_dir(bool): 64 When true, the engine will search files into subdirectories 65 of the remote_path. 66 It will consider one level below the remote_path. 67 When sub_dir is used with latest_file/earliest_file argument, 68 the engine will retrieve the latest_file/earliest_file 69 for each subdirectory. 70 71 Args: 72 sftp: the SFTP client object. 73 remote_path: path of files to be filtered. 74 options_args: options from the acon. 75 76 Returns: 77 A list containing the file names to be passed to Spark. 78 """ 79 all_items, folder_path = cls._get_folder_items(remote_path, sftp, options_args) 80 81 filtered_files: Set[str] = set() 82 83 try: 84 for item, folder in zip(all_items, folder_path): 85 file_contains = cls._file_has_pattern(item, options_args) 86 file_in_interval = cls._file_in_date_interval(item, options_args) 87 if file_contains and file_in_interval: 88 filtered_files.add(folder + item.filename) 89 90 if ( 91 SFTPExtractionFilter.EARLIEST_FILE.value in options_args.keys() 92 or SFTPExtractionFilter.LATEST_FILE.value in options_args.keys() 93 ): 94 filtered_files = cls._get_earliest_latest_file( 95 sftp, options_args, filtered_files, folder_path 96 ) 97 98 except Exception as e: 99 cls._logger.error(f"SFTP list_files EXCEPTION: - {e}") 100 return filtered_files
Get a list of files to be extracted from SFTP.
The arguments (options_args) to list files are:
- date_time_gt(str): Filter the files greater than the string datetime formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
- date_time_lt(str): Filter the files lower than the string datetime formatted as "YYYY-MM-DD" or "YYYY-MM-DD HH:MM:SS".
- earliest_file(bool): Filter the earliest dated file in the directory.
- file_name_contains(str): Filter files when match the pattern.
- latest_file(bool): Filter the most recent dated file in the directory.
- sub_dir(bool): When true, the engine will search files into subdirectories of the remote_path. It will consider one level below the remote_path. When sub_dir is used with latest_file/earliest_file argument, the engine will retrieve the latest_file/earliest_file for each subdirectory.
Arguments:
- sftp: the SFTP client object.
- remote_path: path of files to be filtered.
- options_args: options from the acon.
Returns:
A list containing the file names to be passed to Spark.
102 @classmethod 103 def get_sftp_client( 104 cls, 105 options_args: dict, 106 ) -> Tuple[SFTPClient, Transport]: 107 """Get the SFTP client. 108 109 The SFTP client is used to open an SFTP session across an open 110 SSH Transport and perform remote file operations. 111 112 Args: 113 options_args: dictionary containing SFTP connection parameters. 114 The Paramiko arguments expected to connect are: 115 - "hostname": the server to connect to. 116 - "port": the server port to connect to. 117 - "username": the username to authenticate as. 118 - "password": used for password authentication. 119 - "pkey": optional - an optional public key to use for 120 authentication. 121 - "passphrase" – optional - options used for decrypting private 122 keys. 123 - "key_filename" – optional - the filename, or list of filenames, 124 of optional private key(s) and/or certs to try for 125 authentication. 126 - "timeout" – an optional timeout (in seconds) for the TCP connect. 127 - "allow_agent" – optional - set to False to disable 128 connecting to the SSH agent. 129 - "look_for_keys" – optional - set to False to disable searching 130 for discoverable private key files in ~/.ssh/. 131 - "compress" – optional - set to True to turn on compression. 132 - "sock" - optional - an open socket or socket-like object 133 to use for communication to the target host. 134 - "gss_auth" – optional - True if you want to use GSS-API 135 authentication. 136 - "gss_kex" – optional - Perform GSS-API Key Exchange and 137 user authentication. 138 - "gss_deleg_creds" – optional - Delegate GSS-API client 139 credentials or not. 140 - "gss_host" – optional - The targets name in the kerberos database. 141 - "gss_trust_dns" – optional - Indicates whether or 142 not the DNS is trusted to securely canonicalize the name of the 143 host being connected to (default True). 144 - "banner_timeout" – an optional timeout (in seconds) 145 to wait for the SSH banner to be presented. 146 - "auth_timeout" – an optional timeout (in seconds) 147 to wait for an authentication response. 148 - "disabled_algorithms" – an optional dict passed directly to 149 Transport and its keyword argument of the same name. 150 - "transport_factory" – an optional callable which is handed a 151 subset of the constructor arguments (primarily those related 152 to the socket, GSS functionality, and algorithm selection) 153 and generates a Transport instance to be used by this client. 154 Defaults to Transport.__init__. 155 156 The parameter to specify the private key is expected to be in 157 RSA format. Attempting a connection with a blank host key is 158 not allowed unless the argument "add_auto_policy" is explicitly 159 set to True. 160 161 Returns: 162 sftp -> a new SFTPClient session object. 163 transport -> the Transport for this connection. 164 """ 165 ssh_client = p.SSHClient() 166 try: 167 if not options_args.get("pkey") and not options_args.get("add_auto_policy"): 168 raise WrongArgumentsException( 169 "Get SFTP Client: No host key (pkey) was provided and the " 170 + "add_auto_policy property is false." 171 ) 172 173 if options_args.get("pkey") and not options_args.get("key_type"): 174 raise WrongArgumentsException( 175 "Get SFTP Client: The key_type must be provided when " 176 + "the host key (pkey) is provided." 177 ) 178 179 if options_args.get("pkey", None) and options_args.get("key_type", None): 180 key = cls._get_host_keys( 181 options_args.get("pkey", None), options_args.get("key_type", None) 182 ) 183 ssh_client.get_host_keys().add( 184 hostname=f"[{options_args.get('hostname')}]:" 185 + f"{options_args.get('port')}", 186 keytype="ssh-rsa", 187 key=key, 188 ) 189 elif options_args.get("add_auto_policy", None): 190 ssh_client.load_system_host_keys() 191 ssh_client.set_missing_host_key_policy(p.WarningPolicy()) # nosec: B507 192 else: 193 ssh_client.load_system_host_keys() 194 ssh_client.set_missing_host_key_policy(p.RejectPolicy()) 195 196 ssh_client.connect( 197 hostname=options_args.get("hostname"), 198 port=options_args.get("port", 22), 199 username=options_args.get("username", None), 200 password=options_args.get("password", None), 201 key_filename=options_args.get("key_filename", None), 202 timeout=options_args.get("timeout", None), 203 allow_agent=options_args.get("allow_agent", True), 204 look_for_keys=options_args.get("look_for_keys", True), 205 compress=options_args.get("compress", False), 206 sock=options_args.get("sock", None), 207 gss_auth=options_args.get("gss_auth", False), 208 gss_kex=options_args.get("gss_kex", False), 209 gss_deleg_creds=options_args.get("gss_deleg_creds", False), 210 gss_host=options_args.get("gss_host", False), 211 banner_timeout=options_args.get("banner_timeout", None), 212 auth_timeout=options_args.get("auth_timeout", None), 213 gss_trust_dns=options_args.get("gss_trust_dns", None), 214 passphrase=options_args.get("passphrase", None), 215 disabled_algorithms=options_args.get("disabled_algorithms", None), 216 transport_factory=options_args.get("transport_factory", None), 217 ) 218 219 sftp = ssh_client.open_sftp() 220 transport = ssh_client.get_transport() 221 except ConnectionError as e: 222 cls._logger.error(e) 223 raise 224 return sftp, transport
Get the SFTP client.
The SFTP client is used to open an SFTP session across an open SSH Transport and perform remote file operations.
Arguments:
options_args: dictionary containing SFTP connection parameters. The Paramiko arguments expected to connect are:
- "hostname": the server to connect to.
- "port": the server port to connect to.
- "username": the username to authenticate as.
- "password": used for password authentication.
- "pkey": optional - an optional public key to use for authentication.
- "passphrase" – optional - options used for decrypting private keys.
- "key_filename" – optional - the filename, or list of filenames, of optional private key(s) and/or certs to try for authentication.
- "timeout" – an optional timeout (in seconds) for the TCP connect.
- "allow_agent" – optional - set to False to disable connecting to the SSH agent.
- "look_for_keys" – optional - set to False to disable searching for discoverable private key files in ~/.ssh/.
- "compress" – optional - set to True to turn on compression.
- "sock" - optional - an open socket or socket-like object to use for communication to the target host.
- "gss_auth" – optional - True if you want to use GSS-API authentication.
- "gss_kex" – optional - Perform GSS-API Key Exchange and user authentication.
- "gss_deleg_creds" – optional - Delegate GSS-API client credentials or not.
- "gss_host" – optional - The targets name in the kerberos database.
- "gss_trust_dns" – optional - Indicates whether or not the DNS is trusted to securely canonicalize the name of the host being connected to (default True).
- "banner_timeout" – an optional timeout (in seconds) to wait for the SSH banner to be presented.
- "auth_timeout" – an optional timeout (in seconds) to wait for an authentication response.
- "disabled_algorithms" – an optional dict passed directly to Transport and its keyword argument of the same name.
- "transport_factory" – an optional callable which is handed a subset of the constructor arguments (primarily those related to the socket, GSS functionality, and algorithm selection) and generates a Transport instance to be used by this client. Defaults to Transport.__init__.
The parameter to specify the private key is expected to be in RSA format. Attempting a connection with a blank host key is not allowed unless the argument "add_auto_policy" is explicitly set to True.
Returns:
sftp -> a new SFTPClient session object. transport -> the Transport for this connection.
226 @classmethod 227 def validate_format(cls, files_format: str) -> str: 228 """Validate the file extension based on the format definitions. 229 230 Args: 231 files_format: a string containing the file extension. 232 233 Returns: 234 The string validated and formatted. 235 """ 236 formats_allowed = [ 237 SFTPInputFormat.CSV.value, 238 SFTPInputFormat.FWF.value, 239 SFTPInputFormat.JSON.value, 240 SFTPInputFormat.XML.value, 241 ] 242 243 if files_format not in formats_allowed: 244 raise WrongArgumentsException( 245 f"The formats allowed for SFTP are {formats_allowed}." 246 ) 247 248 return files_format
Validate the file extension based on the format definitions.
Arguments:
- files_format: a string containing the file extension.
Returns:
The string validated and formatted.
250 @classmethod 251 def validate_location(cls, location: str) -> str: 252 """Validate the location. Add "/" in the case it does not exist. 253 254 Args: 255 location: file path. 256 257 Returns: 258 The location validated. 259 """ 260 return location if location.rfind("/") == len(location) - 1 else location + "/"
Validate the location. Add "/" in the case it does not exist.
Arguments:
- location: file path.
Returns:
The location validated.