lakehouse_engine.core.file_manager
Module for abstract representation of a file manager system.
1"""Module for abstract representation of a file manager system.""" 2 3from abc import ABC, abstractmethod 4from typing import Any 5 6from lakehouse_engine.algorithms.exceptions import RestoreTypeNotFoundException 7from lakehouse_engine.utils.storage.file_storage_functions import FileStorageFunctions 8 9 10class FileManager(ABC): # noqa: B024 11 """Abstract file manager class.""" 12 13 def __init__(self, configs: dict): 14 """Construct FileManager algorithm instances. 15 16 Args: 17 configs: configurations for the FileManager algorithm. 18 """ 19 self.configs = configs 20 self.function = self.configs["function"] 21 22 @abstractmethod 23 def delete_objects(self) -> None: 24 """Delete objects and 'directories'. 25 26 If dry_run is set to True the function will print a dict with all the 27 paths that would be deleted based on the given keys. 28 """ 29 pass 30 31 @abstractmethod 32 def copy_objects(self) -> None: 33 """Copies objects and 'directories'. 34 35 If dry_run is set to True the function will print a dict with all the 36 paths that would be copied based on the given keys. 37 """ 38 pass 39 40 @abstractmethod 41 def move_objects(self) -> None: 42 """Moves objects and 'directories'. 43 44 If dry_run is set to True the function will print a dict with all the 45 paths that would be moved based on the given keys. 46 """ 47 pass 48 49 50class FileManagerFactory(ABC): # noqa: B024 51 """Class for file manager factory.""" 52 53 @staticmethod 54 def execute_function(configs: dict) -> Any: 55 """Get a specific File Manager and function to execute.""" 56 from lakehouse_engine.core.dbfs_file_manager import DBFSFileManager 57 from lakehouse_engine.core.s3_file_manager import S3FileManager 58 59 disable_dbfs_retry = ( 60 configs["disable_dbfs_retry"] 61 if "disable_dbfs_retry" in configs.keys() 62 else False 63 ) 64 65 if disable_dbfs_retry: 66 S3FileManager(configs).get_function() 67 elif FileStorageFunctions.is_boto3_configured(): 68 try: 69 S3FileManager(configs).get_function() 70 except (ValueError, NotImplementedError, RestoreTypeNotFoundException): 71 raise 72 except Exception: 73 DBFSFileManager(configs).get_function() 74 else: 75 DBFSFileManager(configs).get_function()
class
FileManager(abc.ABC):
11class FileManager(ABC): # noqa: B024 12 """Abstract file manager class.""" 13 14 def __init__(self, configs: dict): 15 """Construct FileManager algorithm instances. 16 17 Args: 18 configs: configurations for the FileManager algorithm. 19 """ 20 self.configs = configs 21 self.function = self.configs["function"] 22 23 @abstractmethod 24 def delete_objects(self) -> None: 25 """Delete objects and 'directories'. 26 27 If dry_run is set to True the function will print a dict with all the 28 paths that would be deleted based on the given keys. 29 """ 30 pass 31 32 @abstractmethod 33 def copy_objects(self) -> None: 34 """Copies objects and 'directories'. 35 36 If dry_run is set to True the function will print a dict with all the 37 paths that would be copied based on the given keys. 38 """ 39 pass 40 41 @abstractmethod 42 def move_objects(self) -> None: 43 """Moves objects and 'directories'. 44 45 If dry_run is set to True the function will print a dict with all the 46 paths that would be moved based on the given keys. 47 """ 48 pass
Abstract file manager class.
FileManager(configs: dict)
14 def __init__(self, configs: dict): 15 """Construct FileManager algorithm instances. 16 17 Args: 18 configs: configurations for the FileManager algorithm. 19 """ 20 self.configs = configs 21 self.function = self.configs["function"]
Construct FileManager algorithm instances.
Arguments:
- configs: configurations for the FileManager algorithm.
@abstractmethod
def
delete_objects(self) -> None:
23 @abstractmethod 24 def delete_objects(self) -> None: 25 """Delete objects and 'directories'. 26 27 If dry_run is set to True the function will print a dict with all the 28 paths that would be deleted based on the given keys. 29 """ 30 pass
Delete objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.
@abstractmethod
def
copy_objects(self) -> None:
32 @abstractmethod 33 def copy_objects(self) -> None: 34 """Copies objects and 'directories'. 35 36 If dry_run is set to True the function will print a dict with all the 37 paths that would be copied based on the given keys. 38 """ 39 pass
Copies objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be copied based on the given keys.
@abstractmethod
def
move_objects(self) -> None:
41 @abstractmethod 42 def move_objects(self) -> None: 43 """Moves objects and 'directories'. 44 45 If dry_run is set to True the function will print a dict with all the 46 paths that would be moved based on the given keys. 47 """ 48 pass
Moves objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be moved based on the given keys.
class
FileManagerFactory(abc.ABC):
51class FileManagerFactory(ABC): # noqa: B024 52 """Class for file manager factory.""" 53 54 @staticmethod 55 def execute_function(configs: dict) -> Any: 56 """Get a specific File Manager and function to execute.""" 57 from lakehouse_engine.core.dbfs_file_manager import DBFSFileManager 58 from lakehouse_engine.core.s3_file_manager import S3FileManager 59 60 disable_dbfs_retry = ( 61 configs["disable_dbfs_retry"] 62 if "disable_dbfs_retry" in configs.keys() 63 else False 64 ) 65 66 if disable_dbfs_retry: 67 S3FileManager(configs).get_function() 68 elif FileStorageFunctions.is_boto3_configured(): 69 try: 70 S3FileManager(configs).get_function() 71 except (ValueError, NotImplementedError, RestoreTypeNotFoundException): 72 raise 73 except Exception: 74 DBFSFileManager(configs).get_function() 75 else: 76 DBFSFileManager(configs).get_function()
Class for file manager factory.
@staticmethod
def
execute_function(configs: dict) -> Any:
54 @staticmethod 55 def execute_function(configs: dict) -> Any: 56 """Get a specific File Manager and function to execute.""" 57 from lakehouse_engine.core.dbfs_file_manager import DBFSFileManager 58 from lakehouse_engine.core.s3_file_manager import S3FileManager 59 60 disable_dbfs_retry = ( 61 configs["disable_dbfs_retry"] 62 if "disable_dbfs_retry" in configs.keys() 63 else False 64 ) 65 66 if disable_dbfs_retry: 67 S3FileManager(configs).get_function() 68 elif FileStorageFunctions.is_boto3_configured(): 69 try: 70 S3FileManager(configs).get_function() 71 except (ValueError, NotImplementedError, RestoreTypeNotFoundException): 72 raise 73 except Exception: 74 DBFSFileManager(configs).get_function() 75 else: 76 DBFSFileManager(configs).get_function()
Get a specific File Manager and function to execute.