lakehouse_engine.core.dbfs_file_manager
File manager module using dbfs.
1"""File manager module using dbfs.""" 2 3from lakehouse_engine.core.file_manager import FileManager 4from lakehouse_engine.utils.databricks_utils import DatabricksUtils 5from lakehouse_engine.utils.logging_handler import LoggingHandler 6 7 8def _dry_run(bucket: str, object_paths: list) -> dict: 9 """Build the dry run request return format. 10 11 Args: 12 bucket: name of bucket to perform operation. 13 object_paths: paths of object to list. 14 15 Returns: 16 A dict with a list of objects that would be copied/deleted. 17 """ 18 response = {} 19 20 for path in object_paths: 21 path = _get_path(bucket, path) 22 23 object_list: list = [] 24 object_list = _list_objects(path, object_list) 25 26 if object_list: 27 response[path] = object_list 28 else: 29 response[path] = ["No such key"] 30 31 return response 32 33 34def _list_objects(path: str, objects_list: list) -> list: 35 """List all the objects in a path. 36 37 Args: 38 path: path to be used to perform the list. 39 objects_list: A list of object names, empty by default. 40 41 Returns: 42 A list of object names. 43 """ 44 from lakehouse_engine.core.exec_env import ExecEnv 45 46 ls_objects_list = DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.ls(path) 47 48 for file_or_directory in ls_objects_list: 49 if file_or_directory.isDir(): 50 _list_objects(file_or_directory.path, objects_list) 51 else: 52 objects_list.append(file_or_directory.path) 53 return objects_list 54 55 56def _get_path(bucket: str, path: str) -> str: 57 """Get complete path. 58 59 For s3 path, the bucket (e.g. bucket-example) and path 60 (e.g. folder1/folder2) will be filled with part of the path. 61 For dbfs path, the path will have the complete path 62 (dbfs:/example) and bucket as null. 63 64 Args: 65 bucket: bucket for s3 objects. 66 path: path to access the directory of file. 67 68 Returns: 69 The complete path with or without bucket. 70 """ 71 if bucket.strip(): 72 path = f"s3://{bucket}/{path}".strip() 73 else: 74 path = path.strip() 75 76 return path 77 78 79class DBFSFileManager(FileManager): 80 """Set of actions to manipulate dbfs files in several ways.""" 81 82 _logger = LoggingHandler(__name__).get_logger() 83 84 def get_function(self) -> None: 85 """Get a specific function to execute.""" 86 available_functions = { 87 "delete_objects": self.delete_objects, 88 "copy_objects": self.copy_objects, 89 "move_objects": self.move_objects, 90 } 91 92 self._logger.info("Function being executed: {}".format(self.function)) 93 if self.function in available_functions.keys(): 94 func = available_functions[self.function] 95 func() 96 else: 97 raise NotImplementedError( 98 f"The requested function {self.function} is not implemented." 99 ) 100 101 @staticmethod 102 def _delete_objects(bucket: str, objects_paths: list) -> None: 103 """Delete objects recursively. 104 105 Params: 106 bucket: name of bucket to perform the delete operation. 107 objects_paths: objects to be deleted. 108 """ 109 from lakehouse_engine.core.exec_env import ExecEnv 110 111 for path in objects_paths: 112 path = _get_path(bucket, path) 113 114 DBFSFileManager._logger.info(f"Deleting: {path}") 115 116 try: 117 delete_operation = DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.rm( 118 path, True 119 ) 120 121 if delete_operation: 122 DBFSFileManager._logger.info(f"Deleted: {path}") 123 else: 124 DBFSFileManager._logger.info(f"Not able to delete: {path}") 125 except Exception as e: 126 DBFSFileManager._logger.error(f"Error deleting {path} - {e}") 127 raise e 128 129 def delete_objects(self) -> None: 130 """Delete objects and 'directories'. 131 132 If dry_run is set to True the function will print a dict with all the 133 paths that would be deleted based on the given keys. 134 """ 135 bucket = self.configs["bucket"] 136 objects_paths = self.configs["object_paths"] 137 dry_run = self.configs["dry_run"] 138 139 if dry_run: 140 response = _dry_run(bucket=bucket, object_paths=objects_paths) 141 142 self._logger.info("Paths that would be deleted:") 143 self._logger.info(response) 144 else: 145 self._delete_objects(bucket, objects_paths) 146 147 def copy_objects(self) -> None: 148 """Copies objects and 'directories'. 149 150 If dry_run is set to True the function will print a dict with all the 151 paths that would be copied based on the given keys. 152 """ 153 source_bucket = self.configs["bucket"] 154 source_object = self.configs["source_object"] 155 destination_bucket = self.configs["destination_bucket"] 156 destination_object = self.configs["destination_object"] 157 dry_run = self.configs["dry_run"] 158 159 if dry_run: 160 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 161 162 self._logger.info("Paths that would be copied:") 163 self._logger.info(response) 164 else: 165 self._copy_objects( 166 source_bucket=source_bucket, 167 source_object=source_object, 168 destination_bucket=destination_bucket, 169 destination_object=destination_object, 170 ) 171 172 @staticmethod 173 def _copy_objects( 174 source_bucket: str, 175 source_object: str, 176 destination_bucket: str, 177 destination_object: str, 178 ) -> None: 179 """Copies objects and 'directories'. 180 181 Args: 182 source_bucket: name of bucket to perform the copy. 183 source_object: object/folder to be copied. 184 destination_bucket: name of the target bucket to copy. 185 destination_object: target object/folder to copy. 186 """ 187 from lakehouse_engine.core.exec_env import ExecEnv 188 189 copy_from = _get_path(source_bucket, source_object) 190 copy_to = _get_path(destination_bucket, destination_object) 191 192 DBFSFileManager._logger.info(f"Copying: {copy_from} to {copy_to}") 193 194 try: 195 DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.cp( 196 copy_from, copy_to, True 197 ) 198 199 DBFSFileManager._logger.info(f"Copied: {copy_from} to {copy_to}") 200 except Exception as e: 201 DBFSFileManager._logger.error( 202 f"Error copying file {copy_from} to {copy_to} - {e}" 203 ) 204 raise e 205 206 def move_objects(self) -> None: 207 """Moves objects and 'directories'. 208 209 If dry_run is set to True the function will print a dict with all the 210 paths that would be moved based on the given keys. 211 """ 212 source_bucket = self.configs["bucket"] 213 source_object = self.configs["source_object"] 214 destination_bucket = self.configs["destination_bucket"] 215 destination_object = self.configs["destination_object"] 216 dry_run = self.configs["dry_run"] 217 218 if dry_run: 219 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 220 221 self._logger.info("Paths that would be moved:") 222 self._logger.info(response) 223 else: 224 self._move_objects( 225 source_bucket=source_bucket, 226 source_object=source_object, 227 destination_bucket=destination_bucket, 228 destination_object=destination_object, 229 ) 230 231 @staticmethod 232 def _move_objects( 233 source_bucket: str, 234 source_object: str, 235 destination_bucket: str, 236 destination_object: str, 237 ) -> None: 238 """Moves objects and 'directories'. 239 240 Args: 241 source_bucket: name of bucket to perform the move. 242 source_object: object/folder to be moved. 243 destination_bucket: name of the target bucket to move. 244 destination_object: target object/folder to move. 245 """ 246 from lakehouse_engine.core.exec_env import ExecEnv 247 248 move_from = _get_path(source_bucket, source_object) 249 move_to = _get_path(destination_bucket, destination_object) 250 251 DBFSFileManager._logger.info(f"Moving: {move_from} to {move_to}") 252 253 try: 254 DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.mv( 255 move_from, move_to, True 256 ) 257 258 DBFSFileManager._logger.info(f"Moved: {move_from} to {move_to}") 259 except Exception as e: 260 DBFSFileManager._logger.error( 261 f"Error moving file {move_from} to {move_to} - {e}" 262 ) 263 raise e
80class DBFSFileManager(FileManager): 81 """Set of actions to manipulate dbfs files in several ways.""" 82 83 _logger = LoggingHandler(__name__).get_logger() 84 85 def get_function(self) -> None: 86 """Get a specific function to execute.""" 87 available_functions = { 88 "delete_objects": self.delete_objects, 89 "copy_objects": self.copy_objects, 90 "move_objects": self.move_objects, 91 } 92 93 self._logger.info("Function being executed: {}".format(self.function)) 94 if self.function in available_functions.keys(): 95 func = available_functions[self.function] 96 func() 97 else: 98 raise NotImplementedError( 99 f"The requested function {self.function} is not implemented." 100 ) 101 102 @staticmethod 103 def _delete_objects(bucket: str, objects_paths: list) -> None: 104 """Delete objects recursively. 105 106 Params: 107 bucket: name of bucket to perform the delete operation. 108 objects_paths: objects to be deleted. 109 """ 110 from lakehouse_engine.core.exec_env import ExecEnv 111 112 for path in objects_paths: 113 path = _get_path(bucket, path) 114 115 DBFSFileManager._logger.info(f"Deleting: {path}") 116 117 try: 118 delete_operation = DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.rm( 119 path, True 120 ) 121 122 if delete_operation: 123 DBFSFileManager._logger.info(f"Deleted: {path}") 124 else: 125 DBFSFileManager._logger.info(f"Not able to delete: {path}") 126 except Exception as e: 127 DBFSFileManager._logger.error(f"Error deleting {path} - {e}") 128 raise e 129 130 def delete_objects(self) -> None: 131 """Delete objects and 'directories'. 132 133 If dry_run is set to True the function will print a dict with all the 134 paths that would be deleted based on the given keys. 135 """ 136 bucket = self.configs["bucket"] 137 objects_paths = self.configs["object_paths"] 138 dry_run = self.configs["dry_run"] 139 140 if dry_run: 141 response = _dry_run(bucket=bucket, object_paths=objects_paths) 142 143 self._logger.info("Paths that would be deleted:") 144 self._logger.info(response) 145 else: 146 self._delete_objects(bucket, objects_paths) 147 148 def copy_objects(self) -> None: 149 """Copies objects and 'directories'. 150 151 If dry_run is set to True the function will print a dict with all the 152 paths that would be copied based on the given keys. 153 """ 154 source_bucket = self.configs["bucket"] 155 source_object = self.configs["source_object"] 156 destination_bucket = self.configs["destination_bucket"] 157 destination_object = self.configs["destination_object"] 158 dry_run = self.configs["dry_run"] 159 160 if dry_run: 161 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 162 163 self._logger.info("Paths that would be copied:") 164 self._logger.info(response) 165 else: 166 self._copy_objects( 167 source_bucket=source_bucket, 168 source_object=source_object, 169 destination_bucket=destination_bucket, 170 destination_object=destination_object, 171 ) 172 173 @staticmethod 174 def _copy_objects( 175 source_bucket: str, 176 source_object: str, 177 destination_bucket: str, 178 destination_object: str, 179 ) -> None: 180 """Copies objects and 'directories'. 181 182 Args: 183 source_bucket: name of bucket to perform the copy. 184 source_object: object/folder to be copied. 185 destination_bucket: name of the target bucket to copy. 186 destination_object: target object/folder to copy. 187 """ 188 from lakehouse_engine.core.exec_env import ExecEnv 189 190 copy_from = _get_path(source_bucket, source_object) 191 copy_to = _get_path(destination_bucket, destination_object) 192 193 DBFSFileManager._logger.info(f"Copying: {copy_from} to {copy_to}") 194 195 try: 196 DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.cp( 197 copy_from, copy_to, True 198 ) 199 200 DBFSFileManager._logger.info(f"Copied: {copy_from} to {copy_to}") 201 except Exception as e: 202 DBFSFileManager._logger.error( 203 f"Error copying file {copy_from} to {copy_to} - {e}" 204 ) 205 raise e 206 207 def move_objects(self) -> None: 208 """Moves objects and 'directories'. 209 210 If dry_run is set to True the function will print a dict with all the 211 paths that would be moved based on the given keys. 212 """ 213 source_bucket = self.configs["bucket"] 214 source_object = self.configs["source_object"] 215 destination_bucket = self.configs["destination_bucket"] 216 destination_object = self.configs["destination_object"] 217 dry_run = self.configs["dry_run"] 218 219 if dry_run: 220 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 221 222 self._logger.info("Paths that would be moved:") 223 self._logger.info(response) 224 else: 225 self._move_objects( 226 source_bucket=source_bucket, 227 source_object=source_object, 228 destination_bucket=destination_bucket, 229 destination_object=destination_object, 230 ) 231 232 @staticmethod 233 def _move_objects( 234 source_bucket: str, 235 source_object: str, 236 destination_bucket: str, 237 destination_object: str, 238 ) -> None: 239 """Moves objects and 'directories'. 240 241 Args: 242 source_bucket: name of bucket to perform the move. 243 source_object: object/folder to be moved. 244 destination_bucket: name of the target bucket to move. 245 destination_object: target object/folder to move. 246 """ 247 from lakehouse_engine.core.exec_env import ExecEnv 248 249 move_from = _get_path(source_bucket, source_object) 250 move_to = _get_path(destination_bucket, destination_object) 251 252 DBFSFileManager._logger.info(f"Moving: {move_from} to {move_to}") 253 254 try: 255 DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.mv( 256 move_from, move_to, True 257 ) 258 259 DBFSFileManager._logger.info(f"Moved: {move_from} to {move_to}") 260 except Exception as e: 261 DBFSFileManager._logger.error( 262 f"Error moving file {move_from} to {move_to} - {e}" 263 ) 264 raise e
Set of actions to manipulate dbfs files in several ways.
def
get_function(self) -> None:
85 def get_function(self) -> None: 86 """Get a specific function to execute.""" 87 available_functions = { 88 "delete_objects": self.delete_objects, 89 "copy_objects": self.copy_objects, 90 "move_objects": self.move_objects, 91 } 92 93 self._logger.info("Function being executed: {}".format(self.function)) 94 if self.function in available_functions.keys(): 95 func = available_functions[self.function] 96 func() 97 else: 98 raise NotImplementedError( 99 f"The requested function {self.function} is not implemented." 100 )
Get a specific function to execute.
def
delete_objects(self) -> None:
130 def delete_objects(self) -> None: 131 """Delete objects and 'directories'. 132 133 If dry_run is set to True the function will print a dict with all the 134 paths that would be deleted based on the given keys. 135 """ 136 bucket = self.configs["bucket"] 137 objects_paths = self.configs["object_paths"] 138 dry_run = self.configs["dry_run"] 139 140 if dry_run: 141 response = _dry_run(bucket=bucket, object_paths=objects_paths) 142 143 self._logger.info("Paths that would be deleted:") 144 self._logger.info(response) 145 else: 146 self._delete_objects(bucket, objects_paths)
Delete objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be deleted based on the given keys.
def
copy_objects(self) -> None:
148 def copy_objects(self) -> None: 149 """Copies objects and 'directories'. 150 151 If dry_run is set to True the function will print a dict with all the 152 paths that would be copied based on the given keys. 153 """ 154 source_bucket = self.configs["bucket"] 155 source_object = self.configs["source_object"] 156 destination_bucket = self.configs["destination_bucket"] 157 destination_object = self.configs["destination_object"] 158 dry_run = self.configs["dry_run"] 159 160 if dry_run: 161 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 162 163 self._logger.info("Paths that would be copied:") 164 self._logger.info(response) 165 else: 166 self._copy_objects( 167 source_bucket=source_bucket, 168 source_object=source_object, 169 destination_bucket=destination_bucket, 170 destination_object=destination_object, 171 )
Copies objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be copied based on the given keys.
def
move_objects(self) -> None:
207 def move_objects(self) -> None: 208 """Moves objects and 'directories'. 209 210 If dry_run is set to True the function will print a dict with all the 211 paths that would be moved based on the given keys. 212 """ 213 source_bucket = self.configs["bucket"] 214 source_object = self.configs["source_object"] 215 destination_bucket = self.configs["destination_bucket"] 216 destination_object = self.configs["destination_object"] 217 dry_run = self.configs["dry_run"] 218 219 if dry_run: 220 response = _dry_run(bucket=source_bucket, object_paths=[source_object]) 221 222 self._logger.info("Paths that would be moved:") 223 self._logger.info(response) 224 else: 225 self._move_objects( 226 source_bucket=source_bucket, 227 source_object=source_object, 228 destination_bucket=destination_bucket, 229 destination_object=destination_object, 230 )
Moves objects and 'directories'.
If dry_run is set to True the function will print a dict with all the paths that would be moved based on the given keys.