lakehouse_engine.core.dbfs_file_manager

File manager module using dbfs.

  1"""File manager module using dbfs."""
  2
  3from lakehouse_engine.core.file_manager import FileManager
  4from lakehouse_engine.utils.databricks_utils import DatabricksUtils
  5from lakehouse_engine.utils.logging_handler import LoggingHandler
  6
  7
  8def _dry_run(bucket: str, object_paths: list) -> dict:
  9    """Build the dry run request return format.
 10
 11    Args:
 12        bucket: name of bucket to perform operation.
 13        object_paths: paths of object to list.
 14
 15    Returns:
 16        A dict with a list of objects that would be copied/deleted.
 17    """
 18    response = {}
 19
 20    for path in object_paths:
 21        path = _get_path(bucket, path)
 22
 23        object_list: list = []
 24        object_list = _list_objects(path, object_list)
 25
 26        if object_list:
 27            response[path] = object_list
 28        else:
 29            response[path] = ["No such key"]
 30
 31    return response
 32
 33
 34def _list_objects(path: str, objects_list: list) -> list:
 35    """List all the objects in a path.
 36
 37    Args:
 38        path: path to be used to perform the list.
 39        objects_list: A list of object names, empty by default.
 40
 41    Returns:
 42         A list of object names.
 43    """
 44    from lakehouse_engine.core.exec_env import ExecEnv
 45
 46    ls_objects_list = DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.ls(path)
 47
 48    for file_or_directory in ls_objects_list:
 49        if file_or_directory.isDir():
 50            _list_objects(file_or_directory.path, objects_list)
 51        else:
 52            objects_list.append(file_or_directory.path)
 53    return objects_list
 54
 55
 56def _get_path(bucket: str, path: str) -> str:
 57    """Get complete path.
 58
 59    For s3 path, the bucket (e.g. bucket-example) and path
 60    (e.g. folder1/folder2) will be filled with part of the path.
 61    For dbfs path, the path will have the complete path
 62    (dbfs:/example) and bucket as null.
 63
 64    Args:
 65        bucket: bucket for s3 objects.
 66        path: path to access the directory of file.
 67
 68    Returns:
 69         The complete path with or without bucket.
 70    """
 71    if bucket.strip():
 72        path = f"s3://{bucket}/{path}".strip()
 73    else:
 74        path = path.strip()
 75
 76    return path
 77
 78
 79class DBFSFileManager(FileManager):
 80    """Set of actions to manipulate dbfs files in several ways."""
 81
 82    _logger = LoggingHandler(__name__).get_logger()
 83
 84    def get_function(self) -> None:
 85        """Get a specific function to execute."""
 86        available_functions = {
 87            "delete_objects": self.delete_objects,
 88            "copy_objects": self.copy_objects,
 89            "move_objects": self.move_objects,
 90        }
 91
 92        self._logger.info("Function being executed: {}".format(self.function))
 93        if self.function in available_functions.keys():
 94            func = available_functions[self.function]
 95            func()
 96        else:
 97            raise NotImplementedError(
 98                f"The requested function {self.function} is not implemented."
 99            )
100
101    @staticmethod
102    def _delete_objects(bucket: str, objects_paths: list) -> None:
103        """Delete objects recursively.
104
105        Params:
106            bucket: name of bucket to perform the delete operation.
107            objects_paths: objects to be deleted.
108        """
109        from lakehouse_engine.core.exec_env import ExecEnv
110
111        for path in objects_paths:
112            path = _get_path(bucket, path)
113
114            DBFSFileManager._logger.info(f"Deleting: {path}")
115
116            try:
117                delete_operation = DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.rm(
118                    path, True
119                )
120
121                if delete_operation:
122                    DBFSFileManager._logger.info(f"Deleted: {path}")
123                else:
124                    DBFSFileManager._logger.info(f"Not able to delete: {path}")
125            except Exception as e:
126                DBFSFileManager._logger.error(f"Error deleting {path} - {e}")
127                raise e
128
129    def delete_objects(self) -> None:
130        """Delete objects and 'directories'.
131
132        If dry_run is set to True the function will print a dict with all the
133        paths that would be deleted based on the given keys.
134        """
135        bucket = self.configs["bucket"]
136        objects_paths = self.configs["object_paths"]
137        dry_run = self.configs["dry_run"]
138
139        if dry_run:
140            response = _dry_run(bucket=bucket, object_paths=objects_paths)
141
142            self._logger.info("Paths that would be deleted:")
143            self._logger.info(response)
144        else:
145            self._delete_objects(bucket, objects_paths)
146
147    def copy_objects(self) -> None:
148        """Copies objects and 'directories'.
149
150        If dry_run is set to True the function will print a dict with all the
151        paths that would be copied based on the given keys.
152        """
153        source_bucket = self.configs["bucket"]
154        source_object = self.configs["source_object"]
155        destination_bucket = self.configs["destination_bucket"]
156        destination_object = self.configs["destination_object"]
157        dry_run = self.configs["dry_run"]
158
159        if dry_run:
160            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
161
162            self._logger.info("Paths that would be copied:")
163            self._logger.info(response)
164        else:
165            self._copy_objects(
166                source_bucket=source_bucket,
167                source_object=source_object,
168                destination_bucket=destination_bucket,
169                destination_object=destination_object,
170            )
171
172    @staticmethod
173    def _copy_objects(
174        source_bucket: str,
175        source_object: str,
176        destination_bucket: str,
177        destination_object: str,
178    ) -> None:
179        """Copies objects and 'directories'.
180
181        Args:
182            source_bucket: name of bucket to perform the copy.
183            source_object: object/folder to be copied.
184            destination_bucket: name of the target bucket to copy.
185            destination_object: target object/folder to copy.
186        """
187        from lakehouse_engine.core.exec_env import ExecEnv
188
189        copy_from = _get_path(source_bucket, source_object)
190        copy_to = _get_path(destination_bucket, destination_object)
191
192        DBFSFileManager._logger.info(f"Copying: {copy_from} to {copy_to}")
193
194        try:
195            DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.cp(
196                copy_from, copy_to, True
197            )
198
199            DBFSFileManager._logger.info(f"Copied: {copy_from} to {copy_to}")
200        except Exception as e:
201            DBFSFileManager._logger.error(
202                f"Error copying file {copy_from} to {copy_to} - {e}"
203            )
204            raise e
205
206    def move_objects(self) -> None:
207        """Moves objects and 'directories'.
208
209        If dry_run is set to True the function will print a dict with all the
210        paths that would be moved based on the given keys.
211        """
212        source_bucket = self.configs["bucket"]
213        source_object = self.configs["source_object"]
214        destination_bucket = self.configs["destination_bucket"]
215        destination_object = self.configs["destination_object"]
216        dry_run = self.configs["dry_run"]
217
218        if dry_run:
219            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
220
221            self._logger.info("Paths that would be moved:")
222            self._logger.info(response)
223        else:
224            self._move_objects(
225                source_bucket=source_bucket,
226                source_object=source_object,
227                destination_bucket=destination_bucket,
228                destination_object=destination_object,
229            )
230
231    @staticmethod
232    def _move_objects(
233        source_bucket: str,
234        source_object: str,
235        destination_bucket: str,
236        destination_object: str,
237    ) -> None:
238        """Moves objects and 'directories'.
239
240        Args:
241            source_bucket: name of bucket to perform the move.
242            source_object: object/folder to be moved.
243            destination_bucket: name of the target bucket to move.
244            destination_object: target object/folder to move.
245        """
246        from lakehouse_engine.core.exec_env import ExecEnv
247
248        move_from = _get_path(source_bucket, source_object)
249        move_to = _get_path(destination_bucket, destination_object)
250
251        DBFSFileManager._logger.info(f"Moving: {move_from} to {move_to}")
252
253        try:
254            DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.mv(
255                move_from, move_to, True
256            )
257
258            DBFSFileManager._logger.info(f"Moved: {move_from} to {move_to}")
259        except Exception as e:
260            DBFSFileManager._logger.error(
261                f"Error moving file {move_from} to {move_to} - {e}"
262            )
263            raise e
class DBFSFileManager(lakehouse_engine.core.file_manager.FileManager):
 80class DBFSFileManager(FileManager):
 81    """Set of actions to manipulate dbfs files in several ways."""
 82
 83    _logger = LoggingHandler(__name__).get_logger()
 84
 85    def get_function(self) -> None:
 86        """Get a specific function to execute."""
 87        available_functions = {
 88            "delete_objects": self.delete_objects,
 89            "copy_objects": self.copy_objects,
 90            "move_objects": self.move_objects,
 91        }
 92
 93        self._logger.info("Function being executed: {}".format(self.function))
 94        if self.function in available_functions.keys():
 95            func = available_functions[self.function]
 96            func()
 97        else:
 98            raise NotImplementedError(
 99                f"The requested function {self.function} is not implemented."
100            )
101
102    @staticmethod
103    def _delete_objects(bucket: str, objects_paths: list) -> None:
104        """Delete objects recursively.
105
106        Params:
107            bucket: name of bucket to perform the delete operation.
108            objects_paths: objects to be deleted.
109        """
110        from lakehouse_engine.core.exec_env import ExecEnv
111
112        for path in objects_paths:
113            path = _get_path(bucket, path)
114
115            DBFSFileManager._logger.info(f"Deleting: {path}")
116
117            try:
118                delete_operation = DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.rm(
119                    path, True
120                )
121
122                if delete_operation:
123                    DBFSFileManager._logger.info(f"Deleted: {path}")
124                else:
125                    DBFSFileManager._logger.info(f"Not able to delete: {path}")
126            except Exception as e:
127                DBFSFileManager._logger.error(f"Error deleting {path} - {e}")
128                raise e
129
130    def delete_objects(self) -> None:
131        """Delete objects and 'directories'.
132
133        If dry_run is set to True the function will print a dict with all the
134        paths that would be deleted based on the given keys.
135        """
136        bucket = self.configs["bucket"]
137        objects_paths = self.configs["object_paths"]
138        dry_run = self.configs["dry_run"]
139
140        if dry_run:
141            response = _dry_run(bucket=bucket, object_paths=objects_paths)
142
143            self._logger.info("Paths that would be deleted:")
144            self._logger.info(response)
145        else:
146            self._delete_objects(bucket, objects_paths)
147
148    def copy_objects(self) -> None:
149        """Copies objects and 'directories'.
150
151        If dry_run is set to True the function will print a dict with all the
152        paths that would be copied based on the given keys.
153        """
154        source_bucket = self.configs["bucket"]
155        source_object = self.configs["source_object"]
156        destination_bucket = self.configs["destination_bucket"]
157        destination_object = self.configs["destination_object"]
158        dry_run = self.configs["dry_run"]
159
160        if dry_run:
161            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
162
163            self._logger.info("Paths that would be copied:")
164            self._logger.info(response)
165        else:
166            self._copy_objects(
167                source_bucket=source_bucket,
168                source_object=source_object,
169                destination_bucket=destination_bucket,
170                destination_object=destination_object,
171            )
172
173    @staticmethod
174    def _copy_objects(
175        source_bucket: str,
176        source_object: str,
177        destination_bucket: str,
178        destination_object: str,
179    ) -> None:
180        """Copies objects and 'directories'.
181
182        Args:
183            source_bucket: name of bucket to perform the copy.
184            source_object: object/folder to be copied.
185            destination_bucket: name of the target bucket to copy.
186            destination_object: target object/folder to copy.
187        """
188        from lakehouse_engine.core.exec_env import ExecEnv
189
190        copy_from = _get_path(source_bucket, source_object)
191        copy_to = _get_path(destination_bucket, destination_object)
192
193        DBFSFileManager._logger.info(f"Copying: {copy_from} to {copy_to}")
194
195        try:
196            DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.cp(
197                copy_from, copy_to, True
198            )
199
200            DBFSFileManager._logger.info(f"Copied: {copy_from} to {copy_to}")
201        except Exception as e:
202            DBFSFileManager._logger.error(
203                f"Error copying file {copy_from} to {copy_to} - {e}"
204            )
205            raise e
206
207    def move_objects(self) -> None:
208        """Moves objects and 'directories'.
209
210        If dry_run is set to True the function will print a dict with all the
211        paths that would be moved based on the given keys.
212        """
213        source_bucket = self.configs["bucket"]
214        source_object = self.configs["source_object"]
215        destination_bucket = self.configs["destination_bucket"]
216        destination_object = self.configs["destination_object"]
217        dry_run = self.configs["dry_run"]
218
219        if dry_run:
220            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
221
222            self._logger.info("Paths that would be moved:")
223            self._logger.info(response)
224        else:
225            self._move_objects(
226                source_bucket=source_bucket,
227                source_object=source_object,
228                destination_bucket=destination_bucket,
229                destination_object=destination_object,
230            )
231
232    @staticmethod
233    def _move_objects(
234        source_bucket: str,
235        source_object: str,
236        destination_bucket: str,
237        destination_object: str,
238    ) -> None:
239        """Moves objects and 'directories'.
240
241        Args:
242            source_bucket: name of bucket to perform the move.
243            source_object: object/folder to be moved.
244            destination_bucket: name of the target bucket to move.
245            destination_object: target object/folder to move.
246        """
247        from lakehouse_engine.core.exec_env import ExecEnv
248
249        move_from = _get_path(source_bucket, source_object)
250        move_to = _get_path(destination_bucket, destination_object)
251
252        DBFSFileManager._logger.info(f"Moving: {move_from} to {move_to}")
253
254        try:
255            DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.mv(
256                move_from, move_to, True
257            )
258
259            DBFSFileManager._logger.info(f"Moved: {move_from} to {move_to}")
260        except Exception as e:
261            DBFSFileManager._logger.error(
262                f"Error moving file {move_from} to {move_to} - {e}"
263            )
264            raise e

Set of actions to manipulate dbfs files in several ways.

def get_function(self) -> None:
 85    def get_function(self) -> None:
 86        """Get a specific function to execute."""
 87        available_functions = {
 88            "delete_objects": self.delete_objects,
 89            "copy_objects": self.copy_objects,
 90            "move_objects": self.move_objects,
 91        }
 92
 93        self._logger.info("Function being executed: {}".format(self.function))
 94        if self.function in available_functions.keys():
 95            func = available_functions[self.function]
 96            func()
 97        else:
 98            raise NotImplementedError(
 99                f"The requested function {self.function} is not implemented."
100            )

Get a specific function to execute.

def delete_objects(self) -> None:
130    def delete_objects(self) -> None:
131        """Delete objects and 'directories'.
132
133        If dry_run is set to True the function will print a dict with all the
134        paths that would be deleted based on the given keys.
135        """
136        bucket = self.configs["bucket"]
137        objects_paths = self.configs["object_paths"]
138        dry_run = self.configs["dry_run"]
139
140        if dry_run:
141            response = _dry_run(bucket=bucket, object_paths=objects_paths)
142
143            self._logger.info("Paths that would be deleted:")
144            self._logger.info(response)
145        else:
146            self._delete_objects(bucket, objects_paths)

Delete objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.

def copy_objects(self) -> None:
148    def copy_objects(self) -> None:
149        """Copies objects and 'directories'.
150
151        If dry_run is set to True the function will print a dict with all the
152        paths that would be copied based on the given keys.
153        """
154        source_bucket = self.configs["bucket"]
155        source_object = self.configs["source_object"]
156        destination_bucket = self.configs["destination_bucket"]
157        destination_object = self.configs["destination_object"]
158        dry_run = self.configs["dry_run"]
159
160        if dry_run:
161            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
162
163            self._logger.info("Paths that would be copied:")
164            self._logger.info(response)
165        else:
166            self._copy_objects(
167                source_bucket=source_bucket,
168                source_object=source_object,
169                destination_bucket=destination_bucket,
170                destination_object=destination_object,
171            )

Copies objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be copied based on the given keys.

def move_objects(self) -> None:
207    def move_objects(self) -> None:
208        """Moves objects and 'directories'.
209
210        If dry_run is set to True the function will print a dict with all the
211        paths that would be moved based on the given keys.
212        """
213        source_bucket = self.configs["bucket"]
214        source_object = self.configs["source_object"]
215        destination_bucket = self.configs["destination_bucket"]
216        destination_object = self.configs["destination_object"]
217        dry_run = self.configs["dry_run"]
218
219        if dry_run:
220            response = _dry_run(bucket=source_bucket, object_paths=[source_object])
221
222            self._logger.info("Paths that would be moved:")
223            self._logger.info(response)
224        else:
225            self._move_objects(
226                source_bucket=source_bucket,
227                source_object=source_object,
228                destination_bucket=destination_bucket,
229                destination_object=destination_object,
230            )

Moves objects and 'directories'.

If dry_run is set to True the function will print a dict with all the paths that would be moved based on the given keys.