lakehouse_engine.core.s3_file_manager
File manager module using boto3.
1"""File manager module using boto3.""" 2 3import time 4from typing import Any, Optional, Tuple 5 6import boto3 7 8from lakehouse_engine.algorithms.exceptions import RestoreTypeNotFoundException 9from lakehouse_engine.core.definitions import ( 10 ARCHIVE_STORAGE_CLASS, 11 FileManagerAPIKeys, 12 RestoreStatus, 13 RestoreType, 14) 15from lakehouse_engine.core.file_manager import FileManager 16from lakehouse_engine.utils.file_utils import get_directory_path 17from lakehouse_engine.utils.logging_handler import LoggingHandler 18 19 20def _dry_run(bucket: str, object_paths: list) -> dict: 21 """Build the dry run request return format. 22 23 Args: 24 bucket: name of bucket to perform operation. 25 object_paths: paths of object to list. 26 27 Returns: 28 A dict with a list of objects that would be copied/deleted. 29 """ 30 response = {} 31 32 for path in object_paths: 33 if _check_directory(bucket, path): 34 path = get_directory_path(path) 35 36 res = _list_objects_recursively(bucket=bucket, path=path) 37 38 if res: 39 response[path] = res 40 else: 41 response[path] = ["No such key"] 42 43 return response 44 45 46def _list_objects( 47 s3_client: Any, bucket: str, path: str, paginator: str = "" 48) -> Tuple[list, str]: 49 """List 1000 objects in a bucket given a prefix and paginator in s3. 50 51 Args: 52 bucket: name of bucket to perform the list. 53 path: path to be used as a prefix. 54 paginator: paginator token to be used. 55 56 Returns: 57 A list of object names. 58 """ 59 object_list = [] 60 61 if not paginator: 62 list_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=path) 63 else: 64 list_response = s3_client.list_objects_v2( 65 Bucket=bucket, 66 Prefix=path, 67 ContinuationToken=paginator, 68 ) 69 70 if FileManagerAPIKeys.CONTENTS.value in list_response: 71 for obj in list_response[FileManagerAPIKeys.CONTENTS.value]: 72 object_list.append(obj[FileManagerAPIKeys.KEY.value]) 73 74 if FileManagerAPIKeys.CONTINUATION.value in list_response: 75 pagination = list_response[FileManagerAPIKeys.CONTINUATION.value] 76 else: 77 pagination = "" 78 79 return object_list, pagination 80 81 82def _list_objects_recursively(bucket: str, path: str) -> list: 83 """Recursively list all objects given a prefix in s3. 84 85 Args: 86 bucket: name of bucket to perform the list. 87 path: path to be used as a prefix. 88 89 Returns: 90 A list of object names fetched recursively. 91 """ 92 object_list = [] 93 more_objects = True 94 paginator = "" 95 96 s3 = boto3.client("s3") 97 98 while more_objects: 99 temp_list, paginator = _list_objects(s3, bucket, path, paginator) 100 101 object_list.extend(temp_list) 102 103 if not paginator: 104 more_objects = False 105 106 return object_list 107 108 109def _check_directory(bucket: str, path: str) -> bool: 110 """Checks if the object is a 'directory' in s3. 111 112 Args: 113 bucket: name of bucket to perform the check. 114 path: path to be used as a prefix. 115 116 Returns: 117 If path represents a 'directory'. 118 """ 119 s3 = boto3.client("s3") 120 objects, _ = _list_objects(s3, bucket, path) 121 return len(objects) > 1 122 123 124class S3FileManager(FileManager): 125 """Set of actions to manipulate s3 files in several ways.""" 126 127 _logger = LoggingHandler(__name__).get_logger() 128 129 def get_function(self) -> None: 130 """Get a specific function to execute.""" 131 available_functions = { 132 "delete_objects": self.delete_objects, 133 "copy_objects": self.copy_objects, 134 "request_restore": self.request_restore, 135 "check_restore_status": self.check_restore_status, 136 "request_restore_to_destination_and_wait": ( 137 self.request_restore_to_destination_and_wait 138 ), 139 } 140 141 self._logger.info("Function being executed: {}".format(self.function)) 142 if self.function in available_functions.keys(): 143 func = available_functions[self.function] 144 func() 145 else: 146 raise NotImplementedError( 147 f"The requested function {self.function} is not implemented." 148 ) 149 150 def _delete_objects(self, bucket: str, objects_paths: list) -> None: 151 """Delete objects recursively in s3. 152 153 Params: 154 bucket: name of bucket to perform the delete operation. 155 objects_paths: objects to be deleted. 156 """ 157 s3 = boto3.client("s3") 158 159 for path in objects_paths: 160 if _check_directory(bucket, path): 161 path = get_directory_path(path) 162 else: 163 path = path.strip() 164 165 more_objects = True 166 paginator = "" 167 objects_to_delete = [] 168 169 while more_objects: 170 objects_found, paginator = _list_objects( 171 s3_client=s3, bucket=bucket, path=path, paginator=paginator 172 ) 173 for obj in objects_found: 174 objects_to_delete.append({FileManagerAPIKeys.KEY.value: obj}) 175 176 if not paginator: 177 more_objects = False 178 179 response = s3.delete_objects( 180 Bucket=bucket, 181 Delete={FileManagerAPIKeys.OBJECTS.value: objects_to_delete}, 182 ) 183 self._logger.info(response) 184 objects_to_delete = [] 185 186 def delete_objects(self) -> None: 187 """Delete objects and 'directories'. 188 189 If dry_run is set to True the function will print a dict with all the 190 paths that would be deleted based on the given keys. 191 """ 192 bucket = self.configs["bucket"] 193 objects_paths = self.configs["object_paths"] 194 dry_run = self.configs["dry_run"] 195 196 if dry_run: 197 response = _dry_run(bucket=bucket, object_paths=objects_paths) 198 199 self._logger.info("Paths that would be deleted:") 200 self._logger.info(response) 201 else: 202 self._delete_objects(bucket, objects_paths) 203 204 def copy_objects(self) -> None: 205 """Copies objects and 'directories'. 206 207 If dry_run is set to True the function will print a dict with all the 208 paths that would be copied based on the given keys. 209 """ 210 source_bucket = self.configs["bucket"] 211 source_object = self.configs["source_object"] 212 destination_bucket = self.configs["destination_bucket"] 213 destination_object = self.configs["destination_object"] 214 dry_run = self.configs["dry_run"] 215 216 S3FileManager._copy_objects( 217 source_bucket=source_bucket, 218 source_object=source_object, 219 destination_bucket=destination_bucket, 220 destination_object=destination_object, 221 dry_run=dry_run, 222 ) 223 224 def move_objects(self) -> None: 225 """Moves objects and 'directories'. 226 227 If dry_run is set to True the function will print a dict with all the 228 paths that would be moved based on the given keys. 229 """ 230 pass 231 232 def request_restore(self) -> None: 233 """Request the restore of archived data.""" 234 source_bucket = self.configs["bucket"] 235 source_object = self.configs["source_object"] 236 restore_expiration = self.configs["restore_expiration"] 237 retrieval_tier = self.configs["retrieval_tier"] 238 dry_run = self.configs["dry_run"] 239 240 ArchiveFileManager.request_restore( 241 source_bucket, 242 source_object, 243 restore_expiration, 244 retrieval_tier, 245 dry_run, 246 ) 247 248 def check_restore_status(self) -> None: 249 """Check the restore status of archived data.""" 250 source_bucket = self.configs["bucket"] 251 source_object = self.configs["source_object"] 252 253 restore_status = ArchiveFileManager.check_restore_status( 254 source_bucket, source_object 255 ) 256 257 self._logger.info( 258 f""" 259 Restore status: 260 - Not Started: {restore_status.get('not_started_objects')} 261 - Ongoing: {restore_status.get('ongoing_objects')} 262 - Restored: {restore_status.get('restored_objects')} 263 Total objects in this restore process: {restore_status.get('total_objects')} 264 """ 265 ) 266 267 def request_restore_to_destination_and_wait(self) -> None: 268 """Request and wait for the restore to complete, polling the restore status. 269 270 After the restore is done, copy the restored files to destination 271 """ 272 source_bucket = self.configs["bucket"] 273 source_object = self.configs["source_object"] 274 destination_bucket = self.configs["destination_bucket"] 275 destination_object = self.configs["destination_object"] 276 restore_expiration = self.configs["restore_expiration"] 277 retrieval_tier = self.configs["retrieval_tier"] 278 dry_run = self.configs["dry_run"] 279 280 ArchiveFileManager.request_restore_and_wait( 281 source_bucket=source_bucket, 282 source_object=source_object, 283 restore_expiration=restore_expiration, 284 retrieval_tier=retrieval_tier, 285 dry_run=dry_run, 286 ) 287 288 S3FileManager._logger.info( 289 f"Restoration complete for {source_bucket} and {source_object}" 290 ) 291 S3FileManager._logger.info( 292 f"Starting to copy data from {source_bucket}/{source_object} to " 293 f"{destination_bucket}/{destination_object}" 294 ) 295 S3FileManager._copy_objects( 296 source_bucket=source_bucket, 297 source_object=source_object, 298 destination_bucket=destination_bucket, 299 destination_object=destination_object, 300 dry_run=dry_run, 301 ) 302 S3FileManager._logger.info( 303 f"Finished copying data, data should be available on {destination_bucket}/" 304 f"{destination_object}" 305 ) 306 307 @staticmethod 308 def _copy_objects( 309 source_bucket: str, 310 source_object: str, 311 destination_bucket: str, 312 destination_object: str, 313 dry_run: bool, 314 ) -> None: 315 """Copies objects and 'directories' in s3. 316 317 Args: 318 source_bucket: name of bucket to perform the copy. 319 source_object: object/folder to be copied. 320 destination_bucket: name of the target bucket to copy. 321 destination_object: target object/folder to copy. 322 dry_run: if dry_run is set to True the function will print a dict with 323 all the paths that would be deleted based on the given keys. 324 """ 325 s3 = boto3.client("s3") 326 327 if dry_run: 328 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 329 330 S3FileManager._logger.info("Paths that would be copied:") 331 S3FileManager._logger.info(response) 332 else: 333 original_object_name = source_object.split("/")[-1] 334 335 if _check_directory(source_bucket, source_object): 336 source_object = get_directory_path(source_object) 337 338 copy_object = _list_objects_recursively( 339 bucket=source_bucket, path=source_object 340 ) 341 342 for obj in copy_object: 343 S3FileManager._logger.info(f"Copying obj: {obj}") 344 345 final_path = obj.replace(source_object, "") 346 347 response = s3.copy_object( 348 Bucket=destination_bucket, 349 CopySource={ 350 FileManagerAPIKeys.BUCKET.value: source_bucket, 351 FileManagerAPIKeys.KEY.value: obj, 352 }, 353 Key=f"{destination_object}/{original_object_name}/{final_path}", 354 ) 355 S3FileManager._logger.info(response) 356 else: 357 S3FileManager._logger.info(f"Copying obj: {source_object}") 358 359 response = s3.copy_object( 360 Bucket=destination_bucket, 361 CopySource={ 362 FileManagerAPIKeys.BUCKET.value: source_bucket, 363 FileManagerAPIKeys.KEY.value: source_object, 364 }, 365 Key=f"""{destination_object}/{original_object_name}""", 366 ) 367 S3FileManager._logger.info(response) 368 369 370class ArchiveFileManager(object): 371 """Set of actions to restore archives.""" 372 373 _logger = LoggingHandler(__name__).get_logger() 374 375 @staticmethod 376 def _get_archived_object(bucket: str, object_key: str) -> Optional[Any]: 377 """Get the archived object if it's an object. 378 379 Args: 380 bucket: name of bucket to check get the object. 381 object_key: object to get. 382 383 Returns: 384 S3 Object if it's an archived object, otherwise None. 385 """ 386 s3 = boto3.resource("s3") 387 object_to_restore = s3.Object(bucket, object_key) 388 389 if ( 390 object_to_restore.storage_class is not None 391 and object_to_restore.storage_class in ARCHIVE_STORAGE_CLASS 392 ): 393 return object_to_restore 394 else: 395 return None 396 397 @staticmethod 398 def _check_object_restore_status( 399 bucket: str, object_key: str 400 ) -> Optional[RestoreStatus]: 401 """Check the restore status of the archive. 402 403 Args: 404 bucket: name of bucket to check the restore status. 405 object_key: object to check the restore status. 406 407 Returns: 408 The restore status represented by an enum, possible values are: 409 NOT_STARTED, ONGOING or RESTORED 410 """ 411 archived_object = ArchiveFileManager._get_archived_object(bucket, object_key) 412 413 if archived_object is None: 414 status = None 415 elif archived_object.restore is None: 416 status = RestoreStatus.NOT_STARTED 417 elif 'ongoing-request="true"' in archived_object.restore: 418 status = RestoreStatus.ONGOING 419 else: 420 status = RestoreStatus.RESTORED 421 422 return status 423 424 @staticmethod 425 def check_restore_status(source_bucket: str, source_object: str) -> dict: 426 """Check the restore status of archived data. 427 428 Args: 429 source_bucket: name of bucket to check the restore status. 430 source_object: object to check the restore status. 431 432 Returns: 433 A dict containing the amount of objects in each status. 434 """ 435 not_started_objects = 0 436 ongoing_objects = 0 437 restored_objects = 0 438 total_objects = 0 439 440 if _check_directory(source_bucket, source_object): 441 source_object = get_directory_path(source_object) 442 443 objects_to_restore = _list_objects_recursively( 444 bucket=source_bucket, path=source_object 445 ) 446 447 for obj in objects_to_restore: 448 ArchiveFileManager._logger.info(f"Checking restore status for: {obj}") 449 450 restore_status = ArchiveFileManager._check_object_restore_status( 451 source_bucket, obj 452 ) 453 if not restore_status: 454 ArchiveFileManager._logger.warning( 455 f"Restore status not found for {source_bucket}/{obj}" 456 ) 457 else: 458 total_objects += 1 459 460 if RestoreStatus.NOT_STARTED == restore_status: 461 not_started_objects += 1 462 elif RestoreStatus.ONGOING == restore_status: 463 ongoing_objects += 1 464 else: 465 restored_objects += 1 466 467 ArchiveFileManager._logger.info( 468 f"{obj} restore status is {restore_status.value}" 469 ) 470 471 return { 472 "total_objects": total_objects, 473 "not_started_objects": not_started_objects, 474 "ongoing_objects": ongoing_objects, 475 "restored_objects": restored_objects, 476 } 477 478 @staticmethod 479 def _request_restore_object( 480 bucket: str, object_key: str, expiration: int, retrieval_tier: str 481 ) -> None: 482 """Request a restore of the archive. 483 484 Args: 485 bucket: name of bucket to perform the restore. 486 object_key: object to be restored. 487 expiration: restore expiration. 488 retrieval_tier: type of restore, possible values are: 489 Bulk, Standard or Expedited. 490 """ 491 if not RestoreType.exists(retrieval_tier): 492 raise RestoreTypeNotFoundException( 493 f"Restore type {retrieval_tier} not supported." 494 ) 495 496 if _check_directory(bucket, object_key): 497 object_key = get_directory_path(object_key) 498 499 archived_object = ArchiveFileManager._get_archived_object(bucket, object_key) 500 501 if archived_object and archived_object.restore is None: 502 ArchiveFileManager._logger.info(f"Restoring archive {bucket}/{object_key}.") 503 archived_object.restore_object( 504 RestoreRequest={ 505 "Days": expiration, 506 "GlacierJobParameters": {"Tier": retrieval_tier}, 507 } 508 ) 509 else: 510 ArchiveFileManager._logger.info( 511 f"Restore request for {bucket}/{object_key} not performed." 512 ) 513 514 @staticmethod 515 def request_restore( 516 source_bucket: str, 517 source_object: str, 518 restore_expiration: int, 519 retrieval_tier: str, 520 dry_run: bool, 521 ) -> None: 522 """Request the restore of archived data. 523 524 Args: 525 source_bucket: name of bucket to perform the restore. 526 source_object: object to be restored. 527 restore_expiration: restore expiration in days. 528 retrieval_tier: type of restore, possible values are: 529 Bulk, Standard or Expedited. 530 dry_run: if dry_run is set to True the function will print a dict with 531 all the paths that would be deleted based on the given keys. 532 """ 533 if _check_directory(source_bucket, source_object): 534 source_object = get_directory_path(source_object) 535 536 if dry_run: 537 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 538 539 ArchiveFileManager._logger.info("Paths that would be restored:") 540 ArchiveFileManager._logger.info(response) 541 else: 542 objects_to_restore = _list_objects_recursively( 543 bucket=source_bucket, path=source_object 544 ) 545 546 for obj in objects_to_restore: 547 ArchiveFileManager._request_restore_object( 548 source_bucket, 549 obj, 550 restore_expiration, 551 retrieval_tier, 552 ) 553 554 @staticmethod 555 def request_restore_and_wait( 556 source_bucket: str, 557 source_object: str, 558 restore_expiration: int, 559 retrieval_tier: str, 560 dry_run: bool, 561 ) -> None: 562 """Request and wait for the restore to complete, polling the restore status. 563 564 Args: 565 source_bucket: name of bucket to perform the restore. 566 source_object: object to be restored. 567 restore_expiration: restore expiration in days. 568 retrieval_tier: type of restore, possible values are: 569 Bulk, Standard or Expedited. 570 dry_run: if dry_run is set to True the function will print a dict with 571 all the paths that would be deleted based on the given keys. 572 """ 573 if retrieval_tier != RestoreType.EXPEDITED.value: 574 ArchiveFileManager._logger.error( 575 f"Retrieval Tier {retrieval_tier} not allowed on this operation! This " 576 "kind of restore should be used just with `Expedited` retrieval tier " 577 "to save cluster costs." 578 ) 579 raise ValueError( 580 f"Retrieval Tier {retrieval_tier} not allowed on this operation! This " 581 "kind of restore should be used just with `Expedited` retrieval tier " 582 "to save cluster costs." 583 ) 584 585 ArchiveFileManager.request_restore( 586 source_bucket=source_bucket, 587 source_object=source_object, 588 restore_expiration=restore_expiration, 589 retrieval_tier=retrieval_tier, 590 dry_run=dry_run, 591 ) 592 restore_status = ArchiveFileManager.check_restore_status( 593 source_bucket, source_object 594 ) 595 ArchiveFileManager._logger.info(f"Restore status: {restore_status}") 596 597 if not dry_run: 598 ArchiveFileManager._logger.info("Checking the restore status in 5 minutes.") 599 wait_time = 300 600 while restore_status.get("total_objects") > restore_status.get( 601 "restored_objects" 602 ): 603 ArchiveFileManager._logger.info( 604 "Not all objects have been restored yet, checking the status again " 605 f"in {wait_time} seconds." 606 ) 607 time.sleep(wait_time) 608 wait_time = 30 609 restore_status = ArchiveFileManager.check_restore_status( 610 source_bucket, source_object 611 ) 612 ArchiveFileManager._logger.info(f"Restore status: {restore_status}")
125class S3FileManager(FileManager): 126 """Set of actions to manipulate s3 files in several ways.""" 127 128 _logger = LoggingHandler(__name__).get_logger() 129 130 def get_function(self) -> None: 131 """Get a specific function to execute.""" 132 available_functions = { 133 "delete_objects": self.delete_objects, 134 "copy_objects": self.copy_objects, 135 "request_restore": self.request_restore, 136 "check_restore_status": self.check_restore_status, 137 "request_restore_to_destination_and_wait": ( 138 self.request_restore_to_destination_and_wait 139 ), 140 } 141 142 self._logger.info("Function being executed: {}".format(self.function)) 143 if self.function in available_functions.keys(): 144 func = available_functions[self.function] 145 func() 146 else: 147 raise NotImplementedError( 148 f"The requested function {self.function} is not implemented." 149 ) 150 151 def _delete_objects(self, bucket: str, objects_paths: list) -> None: 152 """Delete objects recursively in s3. 153 154 Params: 155 bucket: name of bucket to perform the delete operation. 156 objects_paths: objects to be deleted. 157 """ 158 s3 = boto3.client("s3") 159 160 for path in objects_paths: 161 if _check_directory(bucket, path): 162 path = get_directory_path(path) 163 else: 164 path = path.strip() 165 166 more_objects = True 167 paginator = "" 168 objects_to_delete = [] 169 170 while more_objects: 171 objects_found, paginator = _list_objects( 172 s3_client=s3, bucket=bucket, path=path, paginator=paginator 173 ) 174 for obj in objects_found: 175 objects_to_delete.append({FileManagerAPIKeys.KEY.value: obj}) 176 177 if not paginator: 178 more_objects = False 179 180 response = s3.delete_objects( 181 Bucket=bucket, 182 Delete={FileManagerAPIKeys.OBJECTS.value: objects_to_delete}, 183 ) 184 self._logger.info(response) 185 objects_to_delete = [] 186 187 def delete_objects(self) -> None: 188 """Delete objects and 'directories'. 189 190 If dry_run is set to True the function will print a dict with all the 191 paths that would be deleted based on the given keys. 192 """ 193 bucket = self.configs["bucket"] 194 objects_paths = self.configs["object_paths"] 195 dry_run = self.configs["dry_run"] 196 197 if dry_run: 198 response = _dry_run(bucket=bucket, object_paths=objects_paths) 199 200 self._logger.info("Paths that would be deleted:") 201 self._logger.info(response) 202 else: 203 self._delete_objects(bucket, objects_paths) 204 205 def copy_objects(self) -> None: 206 """Copies objects and 'directories'. 207 208 If dry_run is set to True the function will print a dict with all the 209 paths that would be copied based on the given keys. 210 """ 211 source_bucket = self.configs["bucket"] 212 source_object = self.configs["source_object"] 213 destination_bucket = self.configs["destination_bucket"] 214 destination_object = self.configs["destination_object"] 215 dry_run = self.configs["dry_run"] 216 217 S3FileManager._copy_objects( 218 source_bucket=source_bucket, 219 source_object=source_object, 220 destination_bucket=destination_bucket, 221 destination_object=destination_object, 222 dry_run=dry_run, 223 ) 224 225 def move_objects(self) -> None: 226 """Moves objects and 'directories'. 227 228 If dry_run is set to True the function will print a dict with all the 229 paths that would be moved based on the given keys. 230 """ 231 pass 232 233 def request_restore(self) -> None: 234 """Request the restore of archived data.""" 235 source_bucket = self.configs["bucket"] 236 source_object = self.configs["source_object"] 237 restore_expiration = self.configs["restore_expiration"] 238 retrieval_tier = self.configs["retrieval_tier"] 239 dry_run = self.configs["dry_run"] 240 241 ArchiveFileManager.request_restore( 242 source_bucket, 243 source_object, 244 restore_expiration, 245 retrieval_tier, 246 dry_run, 247 ) 248 249 def check_restore_status(self) -> None: 250 """Check the restore status of archived data.""" 251 source_bucket = self.configs["bucket"] 252 source_object = self.configs["source_object"] 253 254 restore_status = ArchiveFileManager.check_restore_status( 255 source_bucket, source_object 256 ) 257 258 self._logger.info( 259 f""" 260 Restore status: 261 - Not Started: {restore_status.get('not_started_objects')} 262 - Ongoing: {restore_status.get('ongoing_objects')} 263 - Restored: {restore_status.get('restored_objects')} 264 Total objects in this restore process: {restore_status.get('total_objects')} 265 """ 266 ) 267 268 def request_restore_to_destination_and_wait(self) -> None: 269 """Request and wait for the restore to complete, polling the restore status. 270 271 After the restore is done, copy the restored files to destination 272 """ 273 source_bucket = self.configs["bucket"] 274 source_object = self.configs["source_object"] 275 destination_bucket = self.configs["destination_bucket"] 276 destination_object = self.configs["destination_object"] 277 restore_expiration = self.configs["restore_expiration"] 278 retrieval_tier = self.configs["retrieval_tier"] 279 dry_run = self.configs["dry_run"] 280 281 ArchiveFileManager.request_restore_and_wait( 282 source_bucket=source_bucket, 283 source_object=source_object, 284 restore_expiration=restore_expiration, 285 retrieval_tier=retrieval_tier, 286 dry_run=dry_run, 287 ) 288 289 S3FileManager._logger.info( 290 f"Restoration complete for {source_bucket} and {source_object}" 291 ) 292 S3FileManager._logger.info( 293 f"Starting to copy data from {source_bucket}/{source_object} to " 294 f"{destination_bucket}/{destination_object}" 295 ) 296 S3FileManager._copy_objects( 297 source_bucket=source_bucket, 298 source_object=source_object, 299 destination_bucket=destination_bucket, 300 destination_object=destination_object, 301 dry_run=dry_run, 302 ) 303 S3FileManager._logger.info( 304 f"Finished copying data, data should be available on {destination_bucket}/" 305 f"{destination_object}" 306 ) 307 308 @staticmethod 309 def _copy_objects( 310 source_bucket: str, 311 source_object: str, 312 destination_bucket: str, 313 destination_object: str, 314 dry_run: bool, 315 ) -> None: 316 """Copies objects and 'directories' in s3. 317 318 Args: 319 source_bucket: name of bucket to perform the copy. 320 source_object: object/folder to be copied. 321 destination_bucket: name of the target bucket to copy. 322 destination_object: target object/folder to copy. 323 dry_run: if dry_run is set to True the function will print a dict with 324 all the paths that would be deleted based on the given keys. 325 """ 326 s3 = boto3.client("s3") 327 328 if dry_run: 329 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 330 331 S3FileManager._logger.info("Paths that would be copied:") 332 S3FileManager._logger.info(response) 333 else: 334 original_object_name = source_object.split("/")[-1] 335 336 if _check_directory(source_bucket, source_object): 337 source_object = get_directory_path(source_object) 338 339 copy_object = _list_objects_recursively( 340 bucket=source_bucket, path=source_object 341 ) 342 343 for obj in copy_object: 344 S3FileManager._logger.info(f"Copying obj: {obj}") 345 346 final_path = obj.replace(source_object, "") 347 348 response = s3.copy_object( 349 Bucket=destination_bucket, 350 CopySource={ 351 FileManagerAPIKeys.BUCKET.value: source_bucket, 352 FileManagerAPIKeys.KEY.value: obj, 353 }, 354 Key=f"{destination_object}/{original_object_name}/{final_path}", 355 ) 356 S3FileManager._logger.info(response) 357 else: 358 S3FileManager._logger.info(f"Copying obj: {source_object}") 359 360 response = s3.copy_object( 361 Bucket=destination_bucket, 362 CopySource={ 363 FileManagerAPIKeys.BUCKET.value: source_bucket, 364 FileManagerAPIKeys.KEY.value: source_object, 365 }, 366 Key=f"""{destination_object}/{original_object_name}""", 367 ) 368 S3FileManager._logger.info(response)
Set of actions to manipulate s3 files in several ways.
130 def get_function(self) -> None: 131 """Get a specific function to execute.""" 132 available_functions = { 133 "delete_objects": self.delete_objects, 134 "copy_objects": self.copy_objects, 135 "request_restore": self.request_restore, 136 "check_restore_status": self.check_restore_status, 137 "request_restore_to_destination_and_wait": ( 138 self.request_restore_to_destination_and_wait 139 ), 140 } 141 142 self._logger.info("Function being executed: {}".format(self.function)) 143 if self.function in available_functions.keys(): 144 func = available_functions[self.function] 145 func() 146 else: 147 raise NotImplementedError( 148 f"The requested function {self.function} is not implemented." 149 )
Get a specific function to execute.
187 def delete_objects(self) -> None: 188 """Delete objects and 'directories'. 189 190 If dry_run is set to True the function will print a dict with all the 191 paths that would be deleted based on the given keys. 192 """ 193 bucket = self.configs["bucket"] 194 objects_paths = self.configs["object_paths"] 195 dry_run = self.configs["dry_run"] 196 197 if dry_run: 198 response = _dry_run(bucket=bucket, object_paths=objects_paths) 199 200 self._logger.info("Paths that would be deleted:") 201 self._logger.info(response) 202 else: 203 self._delete_objects(bucket, objects_paths)
Delete objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.
205 def copy_objects(self) -> None: 206 """Copies objects and 'directories'. 207 208 If dry_run is set to True the function will print a dict with all the 209 paths that would be copied based on the given keys. 210 """ 211 source_bucket = self.configs["bucket"] 212 source_object = self.configs["source_object"] 213 destination_bucket = self.configs["destination_bucket"] 214 destination_object = self.configs["destination_object"] 215 dry_run = self.configs["dry_run"] 216 217 S3FileManager._copy_objects( 218 source_bucket=source_bucket, 219 source_object=source_object, 220 destination_bucket=destination_bucket, 221 destination_object=destination_object, 222 dry_run=dry_run, 223 )
Copies objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be copied based on the given keys.
225 def move_objects(self) -> None: 226 """Moves objects and 'directories'. 227 228 If dry_run is set to True the function will print a dict with all the 229 paths that would be moved based on the given keys. 230 """ 231 pass
Moves objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be moved based on the given keys.
233 def request_restore(self) -> None: 234 """Request the restore of archived data.""" 235 source_bucket = self.configs["bucket"] 236 source_object = self.configs["source_object"] 237 restore_expiration = self.configs["restore_expiration"] 238 retrieval_tier = self.configs["retrieval_tier"] 239 dry_run = self.configs["dry_run"] 240 241 ArchiveFileManager.request_restore( 242 source_bucket, 243 source_object, 244 restore_expiration, 245 retrieval_tier, 246 dry_run, 247 )
Request the restore of archived data.
249 def check_restore_status(self) -> None: 250 """Check the restore status of archived data.""" 251 source_bucket = self.configs["bucket"] 252 source_object = self.configs["source_object"] 253 254 restore_status = ArchiveFileManager.check_restore_status( 255 source_bucket, source_object 256 ) 257 258 self._logger.info( 259 f""" 260 Restore status: 261 - Not Started: {restore_status.get('not_started_objects')} 262 - Ongoing: {restore_status.get('ongoing_objects')} 263 - Restored: {restore_status.get('restored_objects')} 264 Total objects in this restore process: {restore_status.get('total_objects')} 265 """ 266 )
Check the restore status of archived data.
268 def request_restore_to_destination_and_wait(self) -> None: 269 """Request and wait for the restore to complete, polling the restore status. 270 271 After the restore is done, copy the restored files to destination 272 """ 273 source_bucket = self.configs["bucket"] 274 source_object = self.configs["source_object"] 275 destination_bucket = self.configs["destination_bucket"] 276 destination_object = self.configs["destination_object"] 277 restore_expiration = self.configs["restore_expiration"] 278 retrieval_tier = self.configs["retrieval_tier"] 279 dry_run = self.configs["dry_run"] 280 281 ArchiveFileManager.request_restore_and_wait( 282 source_bucket=source_bucket, 283 source_object=source_object, 284 restore_expiration=restore_expiration, 285 retrieval_tier=retrieval_tier, 286 dry_run=dry_run, 287 ) 288 289 S3FileManager._logger.info( 290 f"Restoration complete for {source_bucket} and {source_object}" 291 ) 292 S3FileManager._logger.info( 293 f"Starting to copy data from {source_bucket}/{source_object} to " 294 f"{destination_bucket}/{destination_object}" 295 ) 296 S3FileManager._copy_objects( 297 source_bucket=source_bucket, 298 source_object=source_object, 299 destination_bucket=destination_bucket, 300 destination_object=destination_object, 301 dry_run=dry_run, 302 ) 303 S3FileManager._logger.info( 304 f"Finished copying data, data should be available on {destination_bucket}/" 305 f"{destination_object}" 306 )
Request and wait for the restore to complete, polling the restore status.
After the restore is done, copy the restored files to destination
Inherited Members
371class ArchiveFileManager(object): 372 """Set of actions to restore archives.""" 373 374 _logger = LoggingHandler(__name__).get_logger() 375 376 @staticmethod 377 def _get_archived_object(bucket: str, object_key: str) -> Optional[Any]: 378 """Get the archived object if it's an object. 379 380 Args: 381 bucket: name of bucket to check get the object. 382 object_key: object to get. 383 384 Returns: 385 S3 Object if it's an archived object, otherwise None. 386 """ 387 s3 = boto3.resource("s3") 388 object_to_restore = s3.Object(bucket, object_key) 389 390 if ( 391 object_to_restore.storage_class is not None 392 and object_to_restore.storage_class in ARCHIVE_STORAGE_CLASS 393 ): 394 return object_to_restore 395 else: 396 return None 397 398 @staticmethod 399 def _check_object_restore_status( 400 bucket: str, object_key: str 401 ) -> Optional[RestoreStatus]: 402 """Check the restore status of the archive. 403 404 Args: 405 bucket: name of bucket to check the restore status. 406 object_key: object to check the restore status. 407 408 Returns: 409 The restore status represented by an enum, possible values are: 410 NOT_STARTED, ONGOING or RESTORED 411 """ 412 archived_object = ArchiveFileManager._get_archived_object(bucket, object_key) 413 414 if archived_object is None: 415 status = None 416 elif archived_object.restore is None: 417 status = RestoreStatus.NOT_STARTED 418 elif 'ongoing-request="true"' in archived_object.restore: 419 status = RestoreStatus.ONGOING 420 else: 421 status = RestoreStatus.RESTORED 422 423 return status 424 425 @staticmethod 426 def check_restore_status(source_bucket: str, source_object: str) -> dict: 427 """Check the restore status of archived data. 428 429 Args: 430 source_bucket: name of bucket to check the restore status. 431 source_object: object to check the restore status. 432 433 Returns: 434 A dict containing the amount of objects in each status. 435 """ 436 not_started_objects = 0 437 ongoing_objects = 0 438 restored_objects = 0 439 total_objects = 0 440 441 if _check_directory(source_bucket, source_object): 442 source_object = get_directory_path(source_object) 443 444 objects_to_restore = _list_objects_recursively( 445 bucket=source_bucket, path=source_object 446 ) 447 448 for obj in objects_to_restore: 449 ArchiveFileManager._logger.info(f"Checking restore status for: {obj}") 450 451 restore_status = ArchiveFileManager._check_object_restore_status( 452 source_bucket, obj 453 ) 454 if not restore_status: 455 ArchiveFileManager._logger.warning( 456 f"Restore status not found for {source_bucket}/{obj}" 457 ) 458 else: 459 total_objects += 1 460 461 if RestoreStatus.NOT_STARTED == restore_status: 462 not_started_objects += 1 463 elif RestoreStatus.ONGOING == restore_status: 464 ongoing_objects += 1 465 else: 466 restored_objects += 1 467 468 ArchiveFileManager._logger.info( 469 f"{obj} restore status is {restore_status.value}" 470 ) 471 472 return { 473 "total_objects": total_objects, 474 "not_started_objects": not_started_objects, 475 "ongoing_objects": ongoing_objects, 476 "restored_objects": restored_objects, 477 } 478 479 @staticmethod 480 def _request_restore_object( 481 bucket: str, object_key: str, expiration: int, retrieval_tier: str 482 ) -> None: 483 """Request a restore of the archive. 484 485 Args: 486 bucket: name of bucket to perform the restore. 487 object_key: object to be restored. 488 expiration: restore expiration. 489 retrieval_tier: type of restore, possible values are: 490 Bulk, Standard or Expedited. 491 """ 492 if not RestoreType.exists(retrieval_tier): 493 raise RestoreTypeNotFoundException( 494 f"Restore type {retrieval_tier} not supported." 495 ) 496 497 if _check_directory(bucket, object_key): 498 object_key = get_directory_path(object_key) 499 500 archived_object = ArchiveFileManager._get_archived_object(bucket, object_key) 501 502 if archived_object and archived_object.restore is None: 503 ArchiveFileManager._logger.info(f"Restoring archive {bucket}/{object_key}.") 504 archived_object.restore_object( 505 RestoreRequest={ 506 "Days": expiration, 507 "GlacierJobParameters": {"Tier": retrieval_tier}, 508 } 509 ) 510 else: 511 ArchiveFileManager._logger.info( 512 f"Restore request for {bucket}/{object_key} not performed." 513 ) 514 515 @staticmethod 516 def request_restore( 517 source_bucket: str, 518 source_object: str, 519 restore_expiration: int, 520 retrieval_tier: str, 521 dry_run: bool, 522 ) -> None: 523 """Request the restore of archived data. 524 525 Args: 526 source_bucket: name of bucket to perform the restore. 527 source_object: object to be restored. 528 restore_expiration: restore expiration in days. 529 retrieval_tier: type of restore, possible values are: 530 Bulk, Standard or Expedited. 531 dry_run: if dry_run is set to True the function will print a dict with 532 all the paths that would be deleted based on the given keys. 533 """ 534 if _check_directory(source_bucket, source_object): 535 source_object = get_directory_path(source_object) 536 537 if dry_run: 538 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 539 540 ArchiveFileManager._logger.info("Paths that would be restored:") 541 ArchiveFileManager._logger.info(response) 542 else: 543 objects_to_restore = _list_objects_recursively( 544 bucket=source_bucket, path=source_object 545 ) 546 547 for obj in objects_to_restore: 548 ArchiveFileManager._request_restore_object( 549 source_bucket, 550 obj, 551 restore_expiration, 552 retrieval_tier, 553 ) 554 555 @staticmethod 556 def request_restore_and_wait( 557 source_bucket: str, 558 source_object: str, 559 restore_expiration: int, 560 retrieval_tier: str, 561 dry_run: bool, 562 ) -> None: 563 """Request and wait for the restore to complete, polling the restore status. 564 565 Args: 566 source_bucket: name of bucket to perform the restore. 567 source_object: object to be restored. 568 restore_expiration: restore expiration in days. 569 retrieval_tier: type of restore, possible values are: 570 Bulk, Standard or Expedited. 571 dry_run: if dry_run is set to True the function will print a dict with 572 all the paths that would be deleted based on the given keys. 573 """ 574 if retrieval_tier != RestoreType.EXPEDITED.value: 575 ArchiveFileManager._logger.error( 576 f"Retrieval Tier {retrieval_tier} not allowed on this operation! This " 577 "kind of restore should be used just with `Expedited` retrieval tier " 578 "to save cluster costs." 579 ) 580 raise ValueError( 581 f"Retrieval Tier {retrieval_tier} not allowed on this operation! This " 582 "kind of restore should be used just with `Expedited` retrieval tier " 583 "to save cluster costs." 584 ) 585 586 ArchiveFileManager.request_restore( 587 source_bucket=source_bucket, 588 source_object=source_object, 589 restore_expiration=restore_expiration, 590 retrieval_tier=retrieval_tier, 591 dry_run=dry_run, 592 ) 593 restore_status = ArchiveFileManager.check_restore_status( 594 source_bucket, source_object 595 ) 596 ArchiveFileManager._logger.info(f"Restore status: {restore_status}") 597 598 if not dry_run: 599 ArchiveFileManager._logger.info("Checking the restore status in 5 minutes.") 600 wait_time = 300 601 while restore_status.get("total_objects") > restore_status.get( 602 "restored_objects" 603 ): 604 ArchiveFileManager._logger.info( 605 "Not all objects have been restored yet, checking the status again " 606 f"in {wait_time} seconds." 607 ) 608 time.sleep(wait_time) 609 wait_time = 30 610 restore_status = ArchiveFileManager.check_restore_status( 611 source_bucket, source_object 612 ) 613 ArchiveFileManager._logger.info(f"Restore status: {restore_status}")
Set of actions to restore archives.
425 @staticmethod 426 def check_restore_status(source_bucket: str, source_object: str) -> dict: 427 """Check the restore status of archived data. 428 429 Args: 430 source_bucket: name of bucket to check the restore status. 431 source_object: object to check the restore status. 432 433 Returns: 434 A dict containing the amount of objects in each status. 435 """ 436 not_started_objects = 0 437 ongoing_objects = 0 438 restored_objects = 0 439 total_objects = 0 440 441 if _check_directory(source_bucket, source_object): 442 source_object = get_directory_path(source_object) 443 444 objects_to_restore = _list_objects_recursively( 445 bucket=source_bucket, path=source_object 446 ) 447 448 for obj in objects_to_restore: 449 ArchiveFileManager._logger.info(f"Checking restore status for: {obj}") 450 451 restore_status = ArchiveFileManager._check_object_restore_status( 452 source_bucket, obj 453 ) 454 if not restore_status: 455 ArchiveFileManager._logger.warning( 456 f"Restore status not found for {source_bucket}/{obj}" 457 ) 458 else: 459 total_objects += 1 460 461 if RestoreStatus.NOT_STARTED == restore_status: 462 not_started_objects += 1 463 elif RestoreStatus.ONGOING == restore_status: 464 ongoing_objects += 1 465 else: 466 restored_objects += 1 467 468 ArchiveFileManager._logger.info( 469 f"{obj} restore status is {restore_status.value}" 470 ) 471 472 return { 473 "total_objects": total_objects, 474 "not_started_objects": not_started_objects, 475 "ongoing_objects": ongoing_objects, 476 "restored_objects": restored_objects, 477 }
Check the restore status of archived data.
Arguments:
- source_bucket: name of bucket to check the restore status.
- source_object: object to check the restore status.
Returns:
A dict containing the amount of objects in each status.
515 @staticmethod 516 def request_restore( 517 source_bucket: str, 518 source_object: str, 519 restore_expiration: int, 520 retrieval_tier: str, 521 dry_run: bool, 522 ) -> None: 523 """Request the restore of archived data. 524 525 Args: 526 source_bucket: name of bucket to perform the restore. 527 source_object: object to be restored. 528 restore_expiration: restore expiration in days. 529 retrieval_tier: type of restore, possible values are: 530 Bulk, Standard or Expedited. 531 dry_run: if dry_run is set to True the function will print a dict with 532 all the paths that would be deleted based on the given keys. 533 """ 534 if _check_directory(source_bucket, source_object): 535 source_object = get_directory_path(source_object) 536 537 if dry_run: 538 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 539 540 ArchiveFileManager._logger.info("Paths that would be restored:") 541 ArchiveFileManager._logger.info(response) 542 else: 543 objects_to_restore = _list_objects_recursively( 544 bucket=source_bucket, path=source_object 545 ) 546 547 for obj in objects_to_restore: 548 ArchiveFileManager._request_restore_object( 549 source_bucket, 550 obj, 551 restore_expiration, 552 retrieval_tier, 553 )
Request the restore of archived data.
Arguments:
- source_bucket: name of bucket to perform the restore.
- source_object: object to be restored.
- restore_expiration: restore expiration in days.
- retrieval_tier: type of restore, possible values are: Bulk, Standard or Expedited.
- dry_run: if dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.
555 @staticmethod 556 def request_restore_and_wait( 557 source_bucket: str, 558 source_object: str, 559 restore_expiration: int, 560 retrieval_tier: str, 561 dry_run: bool, 562 ) -> None: 563 """Request and wait for the restore to complete, polling the restore status. 564 565 Args: 566 source_bucket: name of bucket to perform the restore. 567 source_object: object to be restored. 568 restore_expiration: restore expiration in days. 569 retrieval_tier: type of restore, possible values are: 570 Bulk, Standard or Expedited. 571 dry_run: if dry_run is set to True the function will print a dict with 572 all the paths that would be deleted based on the given keys. 573 """ 574 if retrieval_tier != RestoreType.EXPEDITED.value: 575 ArchiveFileManager._logger.error( 576 f"Retrieval Tier {retrieval_tier} not allowed on this operation! This " 577 "kind of restore should be used just with `Expedited` retrieval tier " 578 "to save cluster costs." 579 ) 580 raise ValueError( 581 f"Retrieval Tier {retrieval_tier} not allowed on this operation! This " 582 "kind of restore should be used just with `Expedited` retrieval tier " 583 "to save cluster costs." 584 ) 585 586 ArchiveFileManager.request_restore( 587 source_bucket=source_bucket, 588 source_object=source_object, 589 restore_expiration=restore_expiration, 590 retrieval_tier=retrieval_tier, 591 dry_run=dry_run, 592 ) 593 restore_status = ArchiveFileManager.check_restore_status( 594 source_bucket, source_object 595 ) 596 ArchiveFileManager._logger.info(f"Restore status: {restore_status}") 597 598 if not dry_run: 599 ArchiveFileManager._logger.info("Checking the restore status in 5 minutes.") 600 wait_time = 300 601 while restore_status.get("total_objects") > restore_status.get( 602 "restored_objects" 603 ): 604 ArchiveFileManager._logger.info( 605 "Not all objects have been restored yet, checking the status again " 606 f"in {wait_time} seconds." 607 ) 608 time.sleep(wait_time) 609 wait_time = 30 610 restore_status = ArchiveFileManager.check_restore_status( 611 source_bucket, source_object 612 ) 613 ArchiveFileManager._logger.info(f"Restore status: {restore_status}")
Request and wait for the restore to complete, polling the restore status.
Arguments:
- source_bucket: name of bucket to perform the restore.
- source_object: object to be restored.
- restore_expiration: restore expiration in days.
- retrieval_tier: type of restore, possible values are: Bulk, Standard or Expedited.
- dry_run: if dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.