lakehouse_engine.core.file_manager

Module for abstract representation of a file manager system.

 1"""Module for abstract representation of a file manager system."""
 2
 3from abc import ABC, abstractmethod
 4from typing import Any
 5
 6from lakehouse_engine.algorithms.exceptions import RestoreTypeNotFoundException
 7from lakehouse_engine.utils.storage.file_storage_functions import FileStorageFunctions
 8
 9
10class FileManager(ABC):  # noqa: B024
11    """Abstract file manager class."""
12
13    def __init__(self, configs: dict):
14        """Construct FileManager algorithm instances.
15
16        Args:
17            configs: configurations for the FileManager algorithm.
18        """
19        self.configs = configs
20        self.function = self.configs["function"]
21
22    @abstractmethod
23    def delete_objects(self) -> None:
24        """Delete objects and 'directories'.
25
26        If dry_run is set to True the function will print a dict with all the
27        paths that would be deleted based on the given keys.
28        """
29        pass
30
31    @abstractmethod
32    def copy_objects(self) -> None:
33        """Copies objects and 'directories'.
34
35        If dry_run is set to True the function will print a dict with all the
36        paths that would be copied based on the given keys.
37        """
38        pass
39
40    @abstractmethod
41    def move_objects(self) -> None:
42        """Moves objects and 'directories'.
43
44        If dry_run is set to True the function will print a dict with all the
45        paths that would be moved based on the given keys.
46        """
47        pass
48
49
50class FileManagerFactory(ABC):  # noqa: B024
51    """Class for file manager factory."""
52
53    @staticmethod
54    def execute_function(configs: dict) -> Any:
55        """Get a specific File Manager and function to execute."""
56        from lakehouse_engine.core.dbfs_file_manager import DBFSFileManager
57        from lakehouse_engine.core.s3_file_manager import S3FileManager
58
59        disable_dbfs_retry = (
60            configs["disable_dbfs_retry"]
61            if "disable_dbfs_retry" in configs.keys()
62            else False
63        )
64
65        if disable_dbfs_retry:
66            S3FileManager(configs).get_function()
67        elif FileStorageFunctions.is_boto3_configured():
68            try:
69                S3FileManager(configs).get_function()
70            except (ValueError, NotImplementedError, RestoreTypeNotFoundException):
71                raise
72            except Exception:
73                DBFSFileManager(configs).get_function()
74        else:
75            DBFSFileManager(configs).get_function()
class FileManager(abc.ABC):
11class FileManager(ABC):  # noqa: B024
12    """Abstract file manager class."""
13
14    def __init__(self, configs: dict):
15        """Construct FileManager algorithm instances.
16
17        Args:
18            configs: configurations for the FileManager algorithm.
19        """
20        self.configs = configs
21        self.function = self.configs["function"]
22
23    @abstractmethod
24    def delete_objects(self) -> None:
25        """Delete objects and 'directories'.
26
27        If dry_run is set to True the function will print a dict with all the
28        paths that would be deleted based on the given keys.
29        """
30        pass
31
32    @abstractmethod
33    def copy_objects(self) -> None:
34        """Copies objects and 'directories'.
35
36        If dry_run is set to True the function will print a dict with all the
37        paths that would be copied based on the given keys.
38        """
39        pass
40
41    @abstractmethod
42    def move_objects(self) -> None:
43        """Moves objects and 'directories'.
44
45        If dry_run is set to True the function will print a dict with all the
46        paths that would be moved based on the given keys.
47        """
48        pass

Abstract file manager class.

FileManager(configs: dict)
14    def __init__(self, configs: dict):
15        """Construct FileManager algorithm instances.
16
17        Args:
18            configs: configurations for the FileManager algorithm.
19        """
20        self.configs = configs
21        self.function = self.configs["function"]

Construct FileManager algorithm instances.

Arguments:
  • configs: configurations for the FileManager algorithm.
configs
function
@abstractmethod
def delete_objects(self) -> None:
23    @abstractmethod
24    def delete_objects(self) -> None:
25        """Delete objects and 'directories'.
26
27        If dry_run is set to True the function will print a dict with all the
28        paths that would be deleted based on the given keys.
29        """
30        pass

Delete objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.

@abstractmethod
def copy_objects(self) -> None:
32    @abstractmethod
33    def copy_objects(self) -> None:
34        """Copies objects and 'directories'.
35
36        If dry_run is set to True the function will print a dict with all the
37        paths that would be copied based on the given keys.
38        """
39        pass

Copies objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be copied based on the given keys.

@abstractmethod
def move_objects(self) -> None:
41    @abstractmethod
42    def move_objects(self) -> None:
43        """Moves objects and 'directories'.
44
45        If dry_run is set to True the function will print a dict with all the
46        paths that would be moved based on the given keys.
47        """
48        pass

Moves objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be moved based on the given keys.

class FileManagerFactory(abc.ABC):
51class FileManagerFactory(ABC):  # noqa: B024
52    """Class for file manager factory."""
53
54    @staticmethod
55    def execute_function(configs: dict) -> Any:
56        """Get a specific File Manager and function to execute."""
57        from lakehouse_engine.core.dbfs_file_manager import DBFSFileManager
58        from lakehouse_engine.core.s3_file_manager import S3FileManager
59
60        disable_dbfs_retry = (
61            configs["disable_dbfs_retry"]
62            if "disable_dbfs_retry" in configs.keys()
63            else False
64        )
65
66        if disable_dbfs_retry:
67            S3FileManager(configs).get_function()
68        elif FileStorageFunctions.is_boto3_configured():
69            try:
70                S3FileManager(configs).get_function()
71            except (ValueError, NotImplementedError, RestoreTypeNotFoundException):
72                raise
73            except Exception:
74                DBFSFileManager(configs).get_function()
75        else:
76            DBFSFileManager(configs).get_function()

Class for file manager factory.

@staticmethod
def execute_function(configs: dict) -> Any:
54    @staticmethod
55    def execute_function(configs: dict) -> Any:
56        """Get a specific File Manager and function to execute."""
57        from lakehouse_engine.core.dbfs_file_manager import DBFSFileManager
58        from lakehouse_engine.core.s3_file_manager import S3FileManager
59
60        disable_dbfs_retry = (
61            configs["disable_dbfs_retry"]
62            if "disable_dbfs_retry" in configs.keys()
63            else False
64        )
65
66        if disable_dbfs_retry:
67            S3FileManager(configs).get_function()
68        elif FileStorageFunctions.is_boto3_configured():
69            try:
70                S3FileManager(configs).get_function()
71            except (ValueError, NotImplementedError, RestoreTypeNotFoundException):
72                raise
73            except Exception:
74                DBFSFileManager(configs).get_function()
75        else:
76            DBFSFileManager(configs).get_function()

Get a specific File Manager and function to execute.