lakehouse_engine.core.s3_file_manager

File manager module using boto3.

  1"""File manager module using boto3."""
  2
  3import time
  4from typing import Any, Optional, Tuple
  5
  6import boto3
  7
  8from lakehouse_engine.algorithms.exceptions import RestoreTypeNotFoundException
  9from lakehouse_engine.core.definitions import (
 10    ARCHIVE_STORAGE_CLASS,
 11    FileManagerAPIKeys,
 12    RestoreStatus,
 13    RestoreType,
 14)
 15from lakehouse_engine.core.file_manager import FileManager
 16from lakehouse_engine.utils.file_utils import get_directory_path
 17from lakehouse_engine.utils.logging_handler import LoggingHandler
 18
 19
 20def _dry_run(bucket: str, object_paths: list) -> dict:
 21    """Build the dry run request return format.
 22
 23    Args:
 24        bucket: name of bucket to perform operation.
 25        object_paths: paths of object to list.
 26
 27    Returns:
 28        A dict with a list of objects that would be copied/deleted.
 29    """
 30    response = {}
 31
 32    for path in object_paths:
 33        if _check_directory(bucket, path):
 34            path = get_directory_path(path)
 35
 36        res = _list_objects_recursively(bucket=bucket, path=path)
 37
 38        if res:
 39            response[path] = res
 40        else:
 41            response[path] = ["No such key"]
 42
 43    return response
 44
 45
 46def _list_objects(
 47    s3_client: Any, bucket: str, path: str, paginator: str = ""
 48) -> Tuple[list, str]:
 49    """List 1000 objects in a bucket given a prefix and paginator in s3.
 50
 51    Args:
 52        bucket: name of bucket to perform the list.
 53        path: path to be used as a prefix.
 54        paginator: paginator token to be used.
 55
 56    Returns:
 57         A list of object names.
 58    """
 59    object_list = []
 60
 61    if not paginator:
 62        list_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=path)
 63    else:
 64        list_response = s3_client.list_objects_v2(
 65            Bucket=bucket,
 66            Prefix=path,
 67            ContinuationToken=paginator,
 68        )
 69
 70    if FileManagerAPIKeys.CONTENTS.value in list_response:
 71        for obj in list_response[FileManagerAPIKeys.CONTENTS.value]:
 72            object_list.append(obj[FileManagerAPIKeys.KEY.value])
 73
 74    if FileManagerAPIKeys.CONTINUATION.value in list_response:
 75        pagination = list_response[FileManagerAPIKeys.CONTINUATION.value]
 76    else:
 77        pagination = ""
 78
 79    return object_list, pagination
 80
 81
 82def _list_objects_recursively(bucket: str, path: str) -> list:
 83    """Recursively list all objects given a prefix in s3.
 84
 85    Args:
 86        bucket: name of bucket to perform the list.
 87        path: path to be used as a prefix.
 88
 89    Returns:
 90        A list of object names fetched recursively.
 91    """
 92    object_list = []
 93    more_objects = True
 94    paginator = ""
 95
 96    s3 = boto3.client("s3")
 97
 98    while more_objects:
 99        temp_list, paginator = _list_objects(s3, bucket, path, paginator)
100
101        object_list.extend(temp_list)
102
103        if not paginator:
104            more_objects = False
105
106    return object_list
107
108
109def _check_directory(bucket: str, path: str) -> bool:
110    """Checks if the object is a 'directory' in s3.
111
112    Args:
113        bucket: name of bucket to perform the check.
114        path: path to be used as a prefix.
115
116    Returns:
117        If path represents a 'directory'.
118    """
119    s3 = boto3.client("s3")
120    objects, _ = _list_objects(s3, bucket, path)
121    return len(objects) > 1
122
123
124class S3FileManager(FileManager):
125    """Set of actions to manipulate s3 files in several ways."""
126
127    _logger = LoggingHandler(__name__).get_logger()
128
129    def get_function(self) -> None:
130        """Get a specific function to execute."""
131        available_functions = {
132            "delete_objects": self.delete_objects,
133            "copy_objects": self.copy_objects,
134            "request_restore": self.request_restore,
135            "check_restore_status": self.check_restore_status,
136            "request_restore_to_destination_and_wait": (
137                self.request_restore_to_destination_and_wait
138            ),
139        }
140
141        self._logger.info("Function being executed: {}".format(self.function))
142        if self.function in available_functions.keys():
143            func = available_functions[self.function]
144            func()
145        else:
146            raise NotImplementedError(
147                f"The requested function {self.function} is not implemented."
148            )
149
150    def _delete_objects(self, bucket: str, objects_paths: list) -> None:
151        """Delete objects recursively in s3.
152
153        Params:
154            bucket: name of bucket to perform the delete operation.
155            objects_paths: objects to be deleted.
156        """
157        s3 = boto3.client("s3")
158
159        for path in objects_paths:
160            if _check_directory(bucket, path):
161                path = get_directory_path(path)
162            else:
163                path = path.strip()
164
165            more_objects = True
166            paginator = ""
167            objects_to_delete = []
168
169            while more_objects:
170                objects_found, paginator = _list_objects(
171                    s3_client=s3, bucket=bucket, path=path, paginator=paginator
172                )
173                for obj in objects_found:
174                    objects_to_delete.append({FileManagerAPIKeys.KEY.value: obj})
175
176                if not paginator:
177                    more_objects = False
178
179                response = s3.delete_objects(
180                    Bucket=bucket,
181                    Delete={FileManagerAPIKeys.OBJECTS.value: objects_to_delete},
182                )
183                self._logger.info(response)
184                objects_to_delete = []
185
186    def delete_objects(self) -> None:
187        """Delete objects and 'directories'.
188
189        If dry_run is set to True the function will print a dict with all the
190        paths that would be deleted based on the given keys.
191        """
192        bucket = self.configs["bucket"]
193        objects_paths = self.configs["object_paths"]
194        dry_run = self.configs["dry_run"]
195
196        if dry_run:
197            response = _dry_run(bucket=bucket, object_paths=objects_paths)
198
199            self._logger.info("Paths that would be deleted:")
200            self._logger.info(response)
201        else:
202            self._delete_objects(bucket, objects_paths)
203
204    def copy_objects(self) -> None:
205        """Copies objects and 'directories'.
206
207        If dry_run is set to True the function will print a dict with all the
208        paths that would be copied based on the given keys.
209        """
210        source_bucket = self.configs["bucket"]
211        source_object = self.configs["source_object"]
212        destination_bucket = self.configs["destination_bucket"]
213        destination_object = self.configs["destination_object"]
214        dry_run = self.configs["dry_run"]
215
216        S3FileManager._copy_objects(
217            source_bucket=source_bucket,
218            source_object=source_object,
219            destination_bucket=destination_bucket,
220            destination_object=destination_object,
221            dry_run=dry_run,
222        )
223
224    def move_objects(self) -> None:
225        """Moves objects and 'directories'.
226
227        If dry_run is set to True the function will print a dict with all the
228        paths that would be moved based on the given keys.
229        """
230        pass
231
232    def request_restore(self) -> None:
233        """Request the restore of archived data."""
234        source_bucket = self.configs["bucket"]
235        source_object = self.configs["source_object"]
236        restore_expiration = self.configs["restore_expiration"]
237        retrieval_tier = self.configs["retrieval_tier"]
238        dry_run = self.configs["dry_run"]
239
240        ArchiveFileManager.request_restore(
241            source_bucket,
242            source_object,
243            restore_expiration,
244            retrieval_tier,
245            dry_run,
246        )
247
248    def check_restore_status(self) -> None:
249        """Check the restore status of archived data."""
250        source_bucket = self.configs["bucket"]
251        source_object = self.configs["source_object"]
252
253        restore_status = ArchiveFileManager.check_restore_status(
254            source_bucket, source_object
255        )
256
257        self._logger.info(
258            f"""
259            Restore status:
260            - Not Started: {restore_status.get('not_started_objects')}
261            - Ongoing: {restore_status.get('ongoing_objects')}
262            - Restored: {restore_status.get('restored_objects')}
263            Total objects in this restore process: {restore_status.get('total_objects')}
264            """
265        )
266
267    def request_restore_to_destination_and_wait(self) -> None:
268        """Request and wait for the restore to complete, polling the restore status.
269
270        After the restore is done, copy the restored files to destination
271        """
272        source_bucket = self.configs["bucket"]
273        source_object = self.configs["source_object"]
274        destination_bucket = self.configs["destination_bucket"]
275        destination_object = self.configs["destination_object"]
276        restore_expiration = self.configs["restore_expiration"]
277        retrieval_tier = self.configs["retrieval_tier"]
278        dry_run = self.configs["dry_run"]
279
280        ArchiveFileManager.request_restore_and_wait(
281            source_bucket=source_bucket,
282            source_object=source_object,
283            restore_expiration=restore_expiration,
284            retrieval_tier=retrieval_tier,
285            dry_run=dry_run,
286        )
287
288        S3FileManager._logger.info(
289            f"Restoration complete for {source_bucket} and {source_object}"
290        )
291        S3FileManager._logger.info(
292            f"Starting to copy data from {source_bucket}/{source_object} to "
293            f"{destination_bucket}/{destination_object}"
294        )
295        S3FileManager._copy_objects(
296            source_bucket=source_bucket,
297            source_object=source_object,
298            destination_bucket=destination_bucket,
299            destination_object=destination_object,
300            dry_run=dry_run,
301        )
302        S3FileManager._logger.info(
303            f"Finished copying data, data should be available on {destination_bucket}/"
304            f"{destination_object}"
305        )
306
307    @staticmethod
308    def _copy_objects(
309        source_bucket: str,
310        source_object: str,
311        destination_bucket: str,
312        destination_object: str,
313        dry_run: bool,
314    ) -> None:
315        """Copies objects and 'directories' in s3.
316
317        Args:
318            source_bucket: name of bucket to perform the copy.
319            source_object: object/folder to be copied.
320            destination_bucket: name of the target bucket to copy.
321            destination_object: target object/folder to copy.
322            dry_run: if dry_run is set to True the function will print a dict with
323                all the paths that would be deleted based on the given keys.
324        """
325        s3 = boto3.client("s3")
326
327        if dry_run:
328            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
329
330            S3FileManager._logger.info("Paths that would be copied:")
331            S3FileManager._logger.info(response)
332        else:
333            original_object_name = source_object.split("/")[-1]
334
335            if _check_directory(source_bucket, source_object):
336                source_object = get_directory_path(source_object)
337
338                copy_object = _list_objects_recursively(
339                    bucket=source_bucket, path=source_object
340                )
341
342                for obj in copy_object:
343                    S3FileManager._logger.info(f"Copying obj: {obj}")
344
345                    final_path = obj.replace(source_object, "")
346
347                    response = s3.copy_object(
348                        Bucket=destination_bucket,
349                        CopySource={
350                            FileManagerAPIKeys.BUCKET.value: source_bucket,
351                            FileManagerAPIKeys.KEY.value: obj,
352                        },
353                        Key=f"{destination_object}/{original_object_name}/{final_path}",
354                    )
355                    S3FileManager._logger.info(response)
356            else:
357                S3FileManager._logger.info(f"Copying obj: {source_object}")
358
359                response = s3.copy_object(
360                    Bucket=destination_bucket,
361                    CopySource={
362                        FileManagerAPIKeys.BUCKET.value: source_bucket,
363                        FileManagerAPIKeys.KEY.value: source_object,
364                    },
365                    Key=f"""{destination_object}/{original_object_name}""",
366                )
367                S3FileManager._logger.info(response)
368
369
370class ArchiveFileManager(object):
371    """Set of actions to restore archives."""
372
373    _logger = LoggingHandler(__name__).get_logger()
374
375    @staticmethod
376    def _get_archived_object(bucket: str, object_key: str) -> Optional[Any]:
377        """Get the archived object if it's an object.
378
379        Args:
380            bucket: name of bucket to check get the object.
381            object_key: object to get.
382
383        Returns:
384            S3 Object if it's an archived object, otherwise None.
385        """
386        s3 = boto3.resource("s3")
387        object_to_restore = s3.Object(bucket, object_key)
388
389        if (
390            object_to_restore.storage_class is not None
391            and object_to_restore.storage_class in ARCHIVE_STORAGE_CLASS
392        ):
393            return object_to_restore
394        else:
395            return None
396
397    @staticmethod
398    def _check_object_restore_status(
399        bucket: str, object_key: str
400    ) -> Optional[RestoreStatus]:
401        """Check the restore status of the archive.
402
403        Args:
404            bucket: name of bucket to check the restore status.
405            object_key: object to check the restore status.
406
407        Returns:
408            The restore status represented by an enum, possible values are:
409                NOT_STARTED, ONGOING or RESTORED
410        """
411        archived_object = ArchiveFileManager._get_archived_object(bucket, object_key)
412
413        if archived_object is None:
414            status = None
415        elif archived_object.restore is None:
416            status = RestoreStatus.NOT_STARTED
417        elif 'ongoing-request="true"' in archived_object.restore:
418            status = RestoreStatus.ONGOING
419        else:
420            status = RestoreStatus.RESTORED
421
422        return status
423
424    @staticmethod
425    def check_restore_status(source_bucket: str, source_object: str) -> dict:
426        """Check the restore status of archived data.
427
428        Args:
429            source_bucket: name of bucket to check the restore status.
430            source_object: object to check the restore status.
431
432        Returns:
433            A dict containing the amount of objects in each status.
434        """
435        not_started_objects = 0
436        ongoing_objects = 0
437        restored_objects = 0
438        total_objects = 0
439
440        if _check_directory(source_bucket, source_object):
441            source_object = get_directory_path(source_object)
442
443        objects_to_restore = _list_objects_recursively(
444            bucket=source_bucket, path=source_object
445        )
446
447        for obj in objects_to_restore:
448            ArchiveFileManager._logger.info(f"Checking restore status for: {obj}")
449
450            restore_status = ArchiveFileManager._check_object_restore_status(
451                source_bucket, obj
452            )
453            if not restore_status:
454                ArchiveFileManager._logger.warning(
455                    f"Restore status not found for {source_bucket}/{obj}"
456                )
457            else:
458                total_objects += 1
459
460                if RestoreStatus.NOT_STARTED == restore_status:
461                    not_started_objects += 1
462                elif RestoreStatus.ONGOING == restore_status:
463                    ongoing_objects += 1
464                else:
465                    restored_objects += 1
466
467                ArchiveFileManager._logger.info(
468                    f"{obj} restore status is {restore_status.value}"
469                )
470
471        return {
472            "total_objects": total_objects,
473            "not_started_objects": not_started_objects,
474            "ongoing_objects": ongoing_objects,
475            "restored_objects": restored_objects,
476        }
477
478    @staticmethod
479    def _request_restore_object(
480        bucket: str, object_key: str, expiration: int, retrieval_tier: str
481    ) -> None:
482        """Request a restore of the archive.
483
484        Args:
485            bucket: name of bucket to perform the restore.
486            object_key: object to be restored.
487            expiration: restore expiration.
488            retrieval_tier: type of restore, possible values are:
489                Bulk, Standard or Expedited.
490        """
491        if not RestoreType.exists(retrieval_tier):
492            raise RestoreTypeNotFoundException(
493                f"Restore type {retrieval_tier} not supported."
494            )
495
496        if _check_directory(bucket, object_key):
497            object_key = get_directory_path(object_key)
498
499        archived_object = ArchiveFileManager._get_archived_object(bucket, object_key)
500
501        if archived_object and archived_object.restore is None:
502            ArchiveFileManager._logger.info(f"Restoring archive {bucket}/{object_key}.")
503            archived_object.restore_object(
504                RestoreRequest={
505                    "Days": expiration,
506                    "GlacierJobParameters": {"Tier": retrieval_tier},
507                }
508            )
509        else:
510            ArchiveFileManager._logger.info(
511                f"Restore request for {bucket}/{object_key} not performed."
512            )
513
514    @staticmethod
515    def request_restore(
516        source_bucket: str,
517        source_object: str,
518        restore_expiration: int,
519        retrieval_tier: str,
520        dry_run: bool,
521    ) -> None:
522        """Request the restore of archived data.
523
524        Args:
525            source_bucket: name of bucket to perform the restore.
526            source_object: object to be restored.
527            restore_expiration: restore expiration in days.
528            retrieval_tier: type of restore, possible values are:
529                Bulk, Standard or Expedited.
530            dry_run: if dry_run is set to True the function will print a dict with
531                all the paths that would be deleted based on the given keys.
532        """
533        if _check_directory(source_bucket, source_object):
534            source_object = get_directory_path(source_object)
535
536        if dry_run:
537            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
538
539            ArchiveFileManager._logger.info("Paths that would be restored:")
540            ArchiveFileManager._logger.info(response)
541        else:
542            objects_to_restore = _list_objects_recursively(
543                bucket=source_bucket, path=source_object
544            )
545
546            for obj in objects_to_restore:
547                ArchiveFileManager._request_restore_object(
548                    source_bucket,
549                    obj,
550                    restore_expiration,
551                    retrieval_tier,
552                )
553
554    @staticmethod
555    def request_restore_and_wait(
556        source_bucket: str,
557        source_object: str,
558        restore_expiration: int,
559        retrieval_tier: str,
560        dry_run: bool,
561    ) -> None:
562        """Request and wait for the restore to complete, polling the restore status.
563
564        Args:
565            source_bucket: name of bucket to perform the restore.
566            source_object: object to be restored.
567            restore_expiration: restore expiration in days.
568            retrieval_tier: type of restore, possible values are:
569                Bulk, Standard or Expedited.
570            dry_run: if dry_run is set to True the function will print a dict with
571                all the paths that would be deleted based on the given keys.
572        """
573        if retrieval_tier != RestoreType.EXPEDITED.value:
574            ArchiveFileManager._logger.error(
575                f"Retrieval Tier {retrieval_tier} not allowed on this operation! This "
576                "kind of restore should be used just with `Expedited` retrieval tier "
577                "to save cluster costs."
578            )
579            raise ValueError(
580                f"Retrieval Tier {retrieval_tier} not allowed on this operation! This "
581                "kind of restore should be used just with `Expedited` retrieval tier "
582                "to save cluster costs."
583            )
584
585        ArchiveFileManager.request_restore(
586            source_bucket=source_bucket,
587            source_object=source_object,
588            restore_expiration=restore_expiration,
589            retrieval_tier=retrieval_tier,
590            dry_run=dry_run,
591        )
592        restore_status = ArchiveFileManager.check_restore_status(
593            source_bucket, source_object
594        )
595        ArchiveFileManager._logger.info(f"Restore status: {restore_status}")
596
597        if not dry_run:
598            ArchiveFileManager._logger.info("Checking the restore status in 5 minutes.")
599            wait_time = 300
600            while restore_status.get("total_objects") > restore_status.get(
601                "restored_objects"
602            ):
603                ArchiveFileManager._logger.info(
604                    "Not all objects have been restored yet, checking the status again "
605                    f"in {wait_time} seconds."
606                )
607                time.sleep(wait_time)
608                wait_time = 30
609                restore_status = ArchiveFileManager.check_restore_status(
610                    source_bucket, source_object
611                )
612                ArchiveFileManager._logger.info(f"Restore status: {restore_status}")
class S3FileManager(lakehouse_engine.core.file_manager.FileManager):
125class S3FileManager(FileManager):
126    """Set of actions to manipulate s3 files in several ways."""
127
128    _logger = LoggingHandler(__name__).get_logger()
129
130    def get_function(self) -> None:
131        """Get a specific function to execute."""
132        available_functions = {
133            "delete_objects": self.delete_objects,
134            "copy_objects": self.copy_objects,
135            "request_restore": self.request_restore,
136            "check_restore_status": self.check_restore_status,
137            "request_restore_to_destination_and_wait": (
138                self.request_restore_to_destination_and_wait
139            ),
140        }
141
142        self._logger.info("Function being executed: {}".format(self.function))
143        if self.function in available_functions.keys():
144            func = available_functions[self.function]
145            func()
146        else:
147            raise NotImplementedError(
148                f"The requested function {self.function} is not implemented."
149            )
150
151    def _delete_objects(self, bucket: str, objects_paths: list) -> None:
152        """Delete objects recursively in s3.
153
154        Params:
155            bucket: name of bucket to perform the delete operation.
156            objects_paths: objects to be deleted.
157        """
158        s3 = boto3.client("s3")
159
160        for path in objects_paths:
161            if _check_directory(bucket, path):
162                path = get_directory_path(path)
163            else:
164                path = path.strip()
165
166            more_objects = True
167            paginator = ""
168            objects_to_delete = []
169
170            while more_objects:
171                objects_found, paginator = _list_objects(
172                    s3_client=s3, bucket=bucket, path=path, paginator=paginator
173                )
174                for obj in objects_found:
175                    objects_to_delete.append({FileManagerAPIKeys.KEY.value: obj})
176
177                if not paginator:
178                    more_objects = False
179
180                response = s3.delete_objects(
181                    Bucket=bucket,
182                    Delete={FileManagerAPIKeys.OBJECTS.value: objects_to_delete},
183                )
184                self._logger.info(response)
185                objects_to_delete = []
186
187    def delete_objects(self) -> None:
188        """Delete objects and 'directories'.
189
190        If dry_run is set to True the function will print a dict with all the
191        paths that would be deleted based on the given keys.
192        """
193        bucket = self.configs["bucket"]
194        objects_paths = self.configs["object_paths"]
195        dry_run = self.configs["dry_run"]
196
197        if dry_run:
198            response = _dry_run(bucket=bucket, object_paths=objects_paths)
199
200            self._logger.info("Paths that would be deleted:")
201            self._logger.info(response)
202        else:
203            self._delete_objects(bucket, objects_paths)
204
205    def copy_objects(self) -> None:
206        """Copies objects and 'directories'.
207
208        If dry_run is set to True the function will print a dict with all the
209        paths that would be copied based on the given keys.
210        """
211        source_bucket = self.configs["bucket"]
212        source_object = self.configs["source_object"]
213        destination_bucket = self.configs["destination_bucket"]
214        destination_object = self.configs["destination_object"]
215        dry_run = self.configs["dry_run"]
216
217        S3FileManager._copy_objects(
218            source_bucket=source_bucket,
219            source_object=source_object,
220            destination_bucket=destination_bucket,
221            destination_object=destination_object,
222            dry_run=dry_run,
223        )
224
225    def move_objects(self) -> None:
226        """Moves objects and 'directories'.
227
228        If dry_run is set to True the function will print a dict with all the
229        paths that would be moved based on the given keys.
230        """
231        pass
232
233    def request_restore(self) -> None:
234        """Request the restore of archived data."""
235        source_bucket = self.configs["bucket"]
236        source_object = self.configs["source_object"]
237        restore_expiration = self.configs["restore_expiration"]
238        retrieval_tier = self.configs["retrieval_tier"]
239        dry_run = self.configs["dry_run"]
240
241        ArchiveFileManager.request_restore(
242            source_bucket,
243            source_object,
244            restore_expiration,
245            retrieval_tier,
246            dry_run,
247        )
248
249    def check_restore_status(self) -> None:
250        """Check the restore status of archived data."""
251        source_bucket = self.configs["bucket"]
252        source_object = self.configs["source_object"]
253
254        restore_status = ArchiveFileManager.check_restore_status(
255            source_bucket, source_object
256        )
257
258        self._logger.info(
259            f"""
260            Restore status:
261            - Not Started: {restore_status.get('not_started_objects')}
262            - Ongoing: {restore_status.get('ongoing_objects')}
263            - Restored: {restore_status.get('restored_objects')}
264            Total objects in this restore process: {restore_status.get('total_objects')}
265            """
266        )
267
268    def request_restore_to_destination_and_wait(self) -> None:
269        """Request and wait for the restore to complete, polling the restore status.
270
271        After the restore is done, copy the restored files to destination
272        """
273        source_bucket = self.configs["bucket"]
274        source_object = self.configs["source_object"]
275        destination_bucket = self.configs["destination_bucket"]
276        destination_object = self.configs["destination_object"]
277        restore_expiration = self.configs["restore_expiration"]
278        retrieval_tier = self.configs["retrieval_tier"]
279        dry_run = self.configs["dry_run"]
280
281        ArchiveFileManager.request_restore_and_wait(
282            source_bucket=source_bucket,
283            source_object=source_object,
284            restore_expiration=restore_expiration,
285            retrieval_tier=retrieval_tier,
286            dry_run=dry_run,
287        )
288
289        S3FileManager._logger.info(
290            f"Restoration complete for {source_bucket} and {source_object}"
291        )
292        S3FileManager._logger.info(
293            f"Starting to copy data from {source_bucket}/{source_object} to "
294            f"{destination_bucket}/{destination_object}"
295        )
296        S3FileManager._copy_objects(
297            source_bucket=source_bucket,
298            source_object=source_object,
299            destination_bucket=destination_bucket,
300            destination_object=destination_object,
301            dry_run=dry_run,
302        )
303        S3FileManager._logger.info(
304            f"Finished copying data, data should be available on {destination_bucket}/"
305            f"{destination_object}"
306        )
307
308    @staticmethod
309    def _copy_objects(
310        source_bucket: str,
311        source_object: str,
312        destination_bucket: str,
313        destination_object: str,
314        dry_run: bool,
315    ) -> None:
316        """Copies objects and 'directories' in s3.
317
318        Args:
319            source_bucket: name of bucket to perform the copy.
320            source_object: object/folder to be copied.
321            destination_bucket: name of the target bucket to copy.
322            destination_object: target object/folder to copy.
323            dry_run: if dry_run is set to True the function will print a dict with
324                all the paths that would be deleted based on the given keys.
325        """
326        s3 = boto3.client("s3")
327
328        if dry_run:
329            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
330
331            S3FileManager._logger.info("Paths that would be copied:")
332            S3FileManager._logger.info(response)
333        else:
334            original_object_name = source_object.split("/")[-1]
335
336            if _check_directory(source_bucket, source_object):
337                source_object = get_directory_path(source_object)
338
339                copy_object = _list_objects_recursively(
340                    bucket=source_bucket, path=source_object
341                )
342
343                for obj in copy_object:
344                    S3FileManager._logger.info(f"Copying obj: {obj}")
345
346                    final_path = obj.replace(source_object, "")
347
348                    response = s3.copy_object(
349                        Bucket=destination_bucket,
350                        CopySource={
351                            FileManagerAPIKeys.BUCKET.value: source_bucket,
352                            FileManagerAPIKeys.KEY.value: obj,
353                        },
354                        Key=f"{destination_object}/{original_object_name}/{final_path}",
355                    )
356                    S3FileManager._logger.info(response)
357            else:
358                S3FileManager._logger.info(f"Copying obj: {source_object}")
359
360                response = s3.copy_object(
361                    Bucket=destination_bucket,
362                    CopySource={
363                        FileManagerAPIKeys.BUCKET.value: source_bucket,
364                        FileManagerAPIKeys.KEY.value: source_object,
365                    },
366                    Key=f"""{destination_object}/{original_object_name}""",
367                )
368                S3FileManager._logger.info(response)

Set of actions to manipulate s3 files in several ways.

def get_function(self) -> None:
130    def get_function(self) -> None:
131        """Get a specific function to execute."""
132        available_functions = {
133            "delete_objects": self.delete_objects,
134            "copy_objects": self.copy_objects,
135            "request_restore": self.request_restore,
136            "check_restore_status": self.check_restore_status,
137            "request_restore_to_destination_and_wait": (
138                self.request_restore_to_destination_and_wait
139            ),
140        }
141
142        self._logger.info("Function being executed: {}".format(self.function))
143        if self.function in available_functions.keys():
144            func = available_functions[self.function]
145            func()
146        else:
147            raise NotImplementedError(
148                f"The requested function {self.function} is not implemented."
149            )

Get a specific function to execute.

def delete_objects(self) -> None:
187    def delete_objects(self) -> None:
188        """Delete objects and 'directories'.
189
190        If dry_run is set to True the function will print a dict with all the
191        paths that would be deleted based on the given keys.
192        """
193        bucket = self.configs["bucket"]
194        objects_paths = self.configs["object_paths"]
195        dry_run = self.configs["dry_run"]
196
197        if dry_run:
198            response = _dry_run(bucket=bucket, object_paths=objects_paths)
199
200            self._logger.info("Paths that would be deleted:")
201            self._logger.info(response)
202        else:
203            self._delete_objects(bucket, objects_paths)

Delete objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.

def copy_objects(self) -> None:
205    def copy_objects(self) -> None:
206        """Copies objects and 'directories'.
207
208        If dry_run is set to True the function will print a dict with all the
209        paths that would be copied based on the given keys.
210        """
211        source_bucket = self.configs["bucket"]
212        source_object = self.configs["source_object"]
213        destination_bucket = self.configs["destination_bucket"]
214        destination_object = self.configs["destination_object"]
215        dry_run = self.configs["dry_run"]
216
217        S3FileManager._copy_objects(
218            source_bucket=source_bucket,
219            source_object=source_object,
220            destination_bucket=destination_bucket,
221            destination_object=destination_object,
222            dry_run=dry_run,
223        )

Copies objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be copied based on the given keys.

def move_objects(self) -> None:
225    def move_objects(self) -> None:
226        """Moves objects and 'directories'.
227
228        If dry_run is set to True the function will print a dict with all the
229        paths that would be moved based on the given keys.
230        """
231        pass

Moves objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be moved based on the given keys.

def request_restore(self) -> None:
233    def request_restore(self) -> None:
234        """Request the restore of archived data."""
235        source_bucket = self.configs["bucket"]
236        source_object = self.configs["source_object"]
237        restore_expiration = self.configs["restore_expiration"]
238        retrieval_tier = self.configs["retrieval_tier"]
239        dry_run = self.configs["dry_run"]
240
241        ArchiveFileManager.request_restore(
242            source_bucket,
243            source_object,
244            restore_expiration,
245            retrieval_tier,
246            dry_run,
247        )

Request the restore of archived data.

def check_restore_status(self) -> None:
249    def check_restore_status(self) -> None:
250        """Check the restore status of archived data."""
251        source_bucket = self.configs["bucket"]
252        source_object = self.configs["source_object"]
253
254        restore_status = ArchiveFileManager.check_restore_status(
255            source_bucket, source_object
256        )
257
258        self._logger.info(
259            f"""
260            Restore status:
261            - Not Started: {restore_status.get('not_started_objects')}
262            - Ongoing: {restore_status.get('ongoing_objects')}
263            - Restored: {restore_status.get('restored_objects')}
264            Total objects in this restore process: {restore_status.get('total_objects')}
265            """
266        )

Check the restore status of archived data.

def request_restore_to_destination_and_wait(self) -> None:
268    def request_restore_to_destination_and_wait(self) -> None:
269        """Request and wait for the restore to complete, polling the restore status.
270
271        After the restore is done, copy the restored files to destination
272        """
273        source_bucket = self.configs["bucket"]
274        source_object = self.configs["source_object"]
275        destination_bucket = self.configs["destination_bucket"]
276        destination_object = self.configs["destination_object"]
277        restore_expiration = self.configs["restore_expiration"]
278        retrieval_tier = self.configs["retrieval_tier"]
279        dry_run = self.configs["dry_run"]
280
281        ArchiveFileManager.request_restore_and_wait(
282            source_bucket=source_bucket,
283            source_object=source_object,
284            restore_expiration=restore_expiration,
285            retrieval_tier=retrieval_tier,
286            dry_run=dry_run,
287        )
288
289        S3FileManager._logger.info(
290            f"Restoration complete for {source_bucket} and {source_object}"
291        )
292        S3FileManager._logger.info(
293            f"Starting to copy data from {source_bucket}/{source_object} to "
294            f"{destination_bucket}/{destination_object}"
295        )
296        S3FileManager._copy_objects(
297            source_bucket=source_bucket,
298            source_object=source_object,
299            destination_bucket=destination_bucket,
300            destination_object=destination_object,
301            dry_run=dry_run,
302        )
303        S3FileManager._logger.info(
304            f"Finished copying data, data should be available on {destination_bucket}/"
305            f"{destination_object}"
306        )

Request and wait for the restore to complete, polling the restore status.

After the restore is done, copy the restored files to destination

class ArchiveFileManager:
371class ArchiveFileManager(object):
372    """Set of actions to restore archives."""
373
374    _logger = LoggingHandler(__name__).get_logger()
375
376    @staticmethod
377    def _get_archived_object(bucket: str, object_key: str) -> Optional[Any]:
378        """Get the archived object if it's an object.
379
380        Args:
381            bucket: name of bucket to check get the object.
382            object_key: object to get.
383
384        Returns:
385            S3 Object if it's an archived object, otherwise None.
386        """
387        s3 = boto3.resource("s3")
388        object_to_restore = s3.Object(bucket, object_key)
389
390        if (
391            object_to_restore.storage_class is not None
392            and object_to_restore.storage_class in ARCHIVE_STORAGE_CLASS
393        ):
394            return object_to_restore
395        else:
396            return None
397
398    @staticmethod
399    def _check_object_restore_status(
400        bucket: str, object_key: str
401    ) -> Optional[RestoreStatus]:
402        """Check the restore status of the archive.
403
404        Args:
405            bucket: name of bucket to check the restore status.
406            object_key: object to check the restore status.
407
408        Returns:
409            The restore status represented by an enum, possible values are:
410                NOT_STARTED, ONGOING or RESTORED
411        """
412        archived_object = ArchiveFileManager._get_archived_object(bucket, object_key)
413
414        if archived_object is None:
415            status = None
416        elif archived_object.restore is None:
417            status = RestoreStatus.NOT_STARTED
418        elif 'ongoing-request="true"' in archived_object.restore:
419            status = RestoreStatus.ONGOING
420        else:
421            status = RestoreStatus.RESTORED
422
423        return status
424
425    @staticmethod
426    def check_restore_status(source_bucket: str, source_object: str) -> dict:
427        """Check the restore status of archived data.
428
429        Args:
430            source_bucket: name of bucket to check the restore status.
431            source_object: object to check the restore status.
432
433        Returns:
434            A dict containing the amount of objects in each status.
435        """
436        not_started_objects = 0
437        ongoing_objects = 0
438        restored_objects = 0
439        total_objects = 0
440
441        if _check_directory(source_bucket, source_object):
442            source_object = get_directory_path(source_object)
443
444        objects_to_restore = _list_objects_recursively(
445            bucket=source_bucket, path=source_object
446        )
447
448        for obj in objects_to_restore:
449            ArchiveFileManager._logger.info(f"Checking restore status for: {obj}")
450
451            restore_status = ArchiveFileManager._check_object_restore_status(
452                source_bucket, obj
453            )
454            if not restore_status:
455                ArchiveFileManager._logger.warning(
456                    f"Restore status not found for {source_bucket}/{obj}"
457                )
458            else:
459                total_objects += 1
460
461                if RestoreStatus.NOT_STARTED == restore_status:
462                    not_started_objects += 1
463                elif RestoreStatus.ONGOING == restore_status:
464                    ongoing_objects += 1
465                else:
466                    restored_objects += 1
467
468                ArchiveFileManager._logger.info(
469                    f"{obj} restore status is {restore_status.value}"
470                )
471
472        return {
473            "total_objects": total_objects,
474            "not_started_objects": not_started_objects,
475            "ongoing_objects": ongoing_objects,
476            "restored_objects": restored_objects,
477        }
478
479    @staticmethod
480    def _request_restore_object(
481        bucket: str, object_key: str, expiration: int, retrieval_tier: str
482    ) -> None:
483        """Request a restore of the archive.
484
485        Args:
486            bucket: name of bucket to perform the restore.
487            object_key: object to be restored.
488            expiration: restore expiration.
489            retrieval_tier: type of restore, possible values are:
490                Bulk, Standard or Expedited.
491        """
492        if not RestoreType.exists(retrieval_tier):
493            raise RestoreTypeNotFoundException(
494                f"Restore type {retrieval_tier} not supported."
495            )
496
497        if _check_directory(bucket, object_key):
498            object_key = get_directory_path(object_key)
499
500        archived_object = ArchiveFileManager._get_archived_object(bucket, object_key)
501
502        if archived_object and archived_object.restore is None:
503            ArchiveFileManager._logger.info(f"Restoring archive {bucket}/{object_key}.")
504            archived_object.restore_object(
505                RestoreRequest={
506                    "Days": expiration,
507                    "GlacierJobParameters": {"Tier": retrieval_tier},
508                }
509            )
510        else:
511            ArchiveFileManager._logger.info(
512                f"Restore request for {bucket}/{object_key} not performed."
513            )
514
515    @staticmethod
516    def request_restore(
517        source_bucket: str,
518        source_object: str,
519        restore_expiration: int,
520        retrieval_tier: str,
521        dry_run: bool,
522    ) -> None:
523        """Request the restore of archived data.
524
525        Args:
526            source_bucket: name of bucket to perform the restore.
527            source_object: object to be restored.
528            restore_expiration: restore expiration in days.
529            retrieval_tier: type of restore, possible values are:
530                Bulk, Standard or Expedited.
531            dry_run: if dry_run is set to True the function will print a dict with
532                all the paths that would be deleted based on the given keys.
533        """
534        if _check_directory(source_bucket, source_object):
535            source_object = get_directory_path(source_object)
536
537        if dry_run:
538            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
539
540            ArchiveFileManager._logger.info("Paths that would be restored:")
541            ArchiveFileManager._logger.info(response)
542        else:
543            objects_to_restore = _list_objects_recursively(
544                bucket=source_bucket, path=source_object
545            )
546
547            for obj in objects_to_restore:
548                ArchiveFileManager._request_restore_object(
549                    source_bucket,
550                    obj,
551                    restore_expiration,
552                    retrieval_tier,
553                )
554
555    @staticmethod
556    def request_restore_and_wait(
557        source_bucket: str,
558        source_object: str,
559        restore_expiration: int,
560        retrieval_tier: str,
561        dry_run: bool,
562    ) -> None:
563        """Request and wait for the restore to complete, polling the restore status.
564
565        Args:
566            source_bucket: name of bucket to perform the restore.
567            source_object: object to be restored.
568            restore_expiration: restore expiration in days.
569            retrieval_tier: type of restore, possible values are:
570                Bulk, Standard or Expedited.
571            dry_run: if dry_run is set to True the function will print a dict with
572                all the paths that would be deleted based on the given keys.
573        """
574        if retrieval_tier != RestoreType.EXPEDITED.value:
575            ArchiveFileManager._logger.error(
576                f"Retrieval Tier {retrieval_tier} not allowed on this operation! This "
577                "kind of restore should be used just with `Expedited` retrieval tier "
578                "to save cluster costs."
579            )
580            raise ValueError(
581                f"Retrieval Tier {retrieval_tier} not allowed on this operation! This "
582                "kind of restore should be used just with `Expedited` retrieval tier "
583                "to save cluster costs."
584            )
585
586        ArchiveFileManager.request_restore(
587            source_bucket=source_bucket,
588            source_object=source_object,
589            restore_expiration=restore_expiration,
590            retrieval_tier=retrieval_tier,
591            dry_run=dry_run,
592        )
593        restore_status = ArchiveFileManager.check_restore_status(
594            source_bucket, source_object
595        )
596        ArchiveFileManager._logger.info(f"Restore status: {restore_status}")
597
598        if not dry_run:
599            ArchiveFileManager._logger.info("Checking the restore status in 5 minutes.")
600            wait_time = 300
601            while restore_status.get("total_objects") > restore_status.get(
602                "restored_objects"
603            ):
604                ArchiveFileManager._logger.info(
605                    "Not all objects have been restored yet, checking the status again "
606                    f"in {wait_time} seconds."
607                )
608                time.sleep(wait_time)
609                wait_time = 30
610                restore_status = ArchiveFileManager.check_restore_status(
611                    source_bucket, source_object
612                )
613                ArchiveFileManager._logger.info(f"Restore status: {restore_status}")

Set of actions to restore archives.

@staticmethod
def check_restore_status(source_bucket: str, source_object: str) -> dict:
425    @staticmethod
426    def check_restore_status(source_bucket: str, source_object: str) -> dict:
427        """Check the restore status of archived data.
428
429        Args:
430            source_bucket: name of bucket to check the restore status.
431            source_object: object to check the restore status.
432
433        Returns:
434            A dict containing the amount of objects in each status.
435        """
436        not_started_objects = 0
437        ongoing_objects = 0
438        restored_objects = 0
439        total_objects = 0
440
441        if _check_directory(source_bucket, source_object):
442            source_object = get_directory_path(source_object)
443
444        objects_to_restore = _list_objects_recursively(
445            bucket=source_bucket, path=source_object
446        )
447
448        for obj in objects_to_restore:
449            ArchiveFileManager._logger.info(f"Checking restore status for: {obj}")
450
451            restore_status = ArchiveFileManager._check_object_restore_status(
452                source_bucket, obj
453            )
454            if not restore_status:
455                ArchiveFileManager._logger.warning(
456                    f"Restore status not found for {source_bucket}/{obj}"
457                )
458            else:
459                total_objects += 1
460
461                if RestoreStatus.NOT_STARTED == restore_status:
462                    not_started_objects += 1
463                elif RestoreStatus.ONGOING == restore_status:
464                    ongoing_objects += 1
465                else:
466                    restored_objects += 1
467
468                ArchiveFileManager._logger.info(
469                    f"{obj} restore status is {restore_status.value}"
470                )
471
472        return {
473            "total_objects": total_objects,
474            "not_started_objects": not_started_objects,
475            "ongoing_objects": ongoing_objects,
476            "restored_objects": restored_objects,
477        }

Check the restore status of archived data.

Arguments:
  • source_bucket: name of bucket to check the restore status.
  • source_object: object to check the restore status.
Returns:

A dict containing the amount of objects in each status.

@staticmethod
def request_restore( source_bucket: str, source_object: str, restore_expiration: int, retrieval_tier: str, dry_run: bool) -> None:
515    @staticmethod
516    def request_restore(
517        source_bucket: str,
518        source_object: str,
519        restore_expiration: int,
520        retrieval_tier: str,
521        dry_run: bool,
522    ) -> None:
523        """Request the restore of archived data.
524
525        Args:
526            source_bucket: name of bucket to perform the restore.
527            source_object: object to be restored.
528            restore_expiration: restore expiration in days.
529            retrieval_tier: type of restore, possible values are:
530                Bulk, Standard or Expedited.
531            dry_run: if dry_run is set to True the function will print a dict with
532                all the paths that would be deleted based on the given keys.
533        """
534        if _check_directory(source_bucket, source_object):
535            source_object = get_directory_path(source_object)
536
537        if dry_run:
538            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
539
540            ArchiveFileManager._logger.info("Paths that would be restored:")
541            ArchiveFileManager._logger.info(response)
542        else:
543            objects_to_restore = _list_objects_recursively(
544                bucket=source_bucket, path=source_object
545            )
546
547            for obj in objects_to_restore:
548                ArchiveFileManager._request_restore_object(
549                    source_bucket,
550                    obj,
551                    restore_expiration,
552                    retrieval_tier,
553                )

Request the restore of archived data.

Arguments:
  • source_bucket: name of bucket to perform the restore.
  • source_object: object to be restored.
  • restore_expiration: restore expiration in days.
  • retrieval_tier: type of restore, possible values are: Bulk, Standard or Expedited.
  • dry_run: if dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.
@staticmethod
def request_restore_and_wait( source_bucket: str, source_object: str, restore_expiration: int, retrieval_tier: str, dry_run: bool) -> None:
555    @staticmethod
556    def request_restore_and_wait(
557        source_bucket: str,
558        source_object: str,
559        restore_expiration: int,
560        retrieval_tier: str,
561        dry_run: bool,
562    ) -> None:
563        """Request and wait for the restore to complete, polling the restore status.
564
565        Args:
566            source_bucket: name of bucket to perform the restore.
567            source_object: object to be restored.
568            restore_expiration: restore expiration in days.
569            retrieval_tier: type of restore, possible values are:
570                Bulk, Standard or Expedited.
571            dry_run: if dry_run is set to True the function will print a dict with
572                all the paths that would be deleted based on the given keys.
573        """
574        if retrieval_tier != RestoreType.EXPEDITED.value:
575            ArchiveFileManager._logger.error(
576                f"Retrieval Tier {retrieval_tier} not allowed on this operation! This "
577                "kind of restore should be used just with `Expedited` retrieval tier "
578                "to save cluster costs."
579            )
580            raise ValueError(
581                f"Retrieval Tier {retrieval_tier} not allowed on this operation! This "
582                "kind of restore should be used just with `Expedited` retrieval tier "
583                "to save cluster costs."
584            )
585
586        ArchiveFileManager.request_restore(
587            source_bucket=source_bucket,
588            source_object=source_object,
589            restore_expiration=restore_expiration,
590            retrieval_tier=retrieval_tier,
591            dry_run=dry_run,
592        )
593        restore_status = ArchiveFileManager.check_restore_status(
594            source_bucket, source_object
595        )
596        ArchiveFileManager._logger.info(f"Restore status: {restore_status}")
597
598        if not dry_run:
599            ArchiveFileManager._logger.info("Checking the restore status in 5 minutes.")
600            wait_time = 300
601            while restore_status.get("total_objects") > restore_status.get(
602                "restored_objects"
603            ):
604                ArchiveFileManager._logger.info(
605                    "Not all objects have been restored yet, checking the status again "
606                    f"in {wait_time} seconds."
607                )
608                time.sleep(wait_time)
609                wait_time = 30
610                restore_status = ArchiveFileManager.check_restore_status(
611                    source_bucket, source_object
612                )
613                ArchiveFileManager._logger.info(f"Restore status: {restore_status}")

Request and wait for the restore to complete, polling the restore status.

Arguments:
  • source_bucket: name of bucket to perform the restore.
  • source_object: object to be restored.
  • restore_expiration: restore expiration in days.
  • retrieval_tier: type of restore, possible values are: Bulk, Standard or Expedited.
  • dry_run: if dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.