lakehouse_engine.utils.storage.dbfs_storage
Module to represent a DBFS file storage system.
1"""Module to represent a DBFS file storage system.""" 2 3from typing import Any 4from urllib.parse import ParseResult, urlunparse 5 6from lakehouse_engine.utils.databricks_utils import DatabricksUtils 7from lakehouse_engine.utils.logging_handler import LoggingHandler 8from lakehouse_engine.utils.storage.file_storage import FileStorage 9 10 11class DBFSStorage(FileStorage): 12 """Class to represent a DBFS file storage system.""" 13 14 _LOGGER = LoggingHandler(__name__).get_logger() 15 _MAX_INT = 2147483647 16 17 @classmethod 18 def get_file_payload(cls, url: ParseResult) -> Any: 19 """Get the content of a file. 20 21 Args: 22 url: url of the file. 23 24 Returns: 25 File payload/content. 26 """ 27 from lakehouse_engine.core.exec_env import ExecEnv 28 29 str_url = urlunparse(url) 30 cls._LOGGER.info(f"Trying with dbfs_storage: Reading from file: {str_url}") 31 return DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.head( 32 str_url, cls._MAX_INT 33 ) 34 35 @classmethod 36 def write_payload_to_file(cls, url: ParseResult, content: str) -> None: 37 """Write payload into a file. 38 39 Args: 40 url: url of the file. 41 content: content to write into the file. 42 """ 43 from lakehouse_engine.core.exec_env import ExecEnv 44 45 str_url = urlunparse(url) 46 cls._LOGGER.info(f"Trying with dbfs_storage: Writing into file: {str_url}") 47 DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.put(str_url, content, True)
12class DBFSStorage(FileStorage): 13 """Class to represent a DBFS file storage system.""" 14 15 _LOGGER = LoggingHandler(__name__).get_logger() 16 _MAX_INT = 2147483647 17 18 @classmethod 19 def get_file_payload(cls, url: ParseResult) -> Any: 20 """Get the content of a file. 21 22 Args: 23 url: url of the file. 24 25 Returns: 26 File payload/content. 27 """ 28 from lakehouse_engine.core.exec_env import ExecEnv 29 30 str_url = urlunparse(url) 31 cls._LOGGER.info(f"Trying with dbfs_storage: Reading from file: {str_url}") 32 return DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.head( 33 str_url, cls._MAX_INT 34 ) 35 36 @classmethod 37 def write_payload_to_file(cls, url: ParseResult, content: str) -> None: 38 """Write payload into a file. 39 40 Args: 41 url: url of the file. 42 content: content to write into the file. 43 """ 44 from lakehouse_engine.core.exec_env import ExecEnv 45 46 str_url = urlunparse(url) 47 cls._LOGGER.info(f"Trying with dbfs_storage: Writing into file: {str_url}") 48 DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.put(str_url, content, True)
Class to represent a DBFS file storage system.
@classmethod
def
get_file_payload(cls, url: urllib.parse.ParseResult) -> Any:
18 @classmethod 19 def get_file_payload(cls, url: ParseResult) -> Any: 20 """Get the content of a file. 21 22 Args: 23 url: url of the file. 24 25 Returns: 26 File payload/content. 27 """ 28 from lakehouse_engine.core.exec_env import ExecEnv 29 30 str_url = urlunparse(url) 31 cls._LOGGER.info(f"Trying with dbfs_storage: Reading from file: {str_url}") 32 return DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.head( 33 str_url, cls._MAX_INT 34 )
Get the content of a file.
Arguments:
- url: url of the file.
Returns:
File payload/content.
@classmethod
def
write_payload_to_file(cls, url: urllib.parse.ParseResult, content: str) -> None:
36 @classmethod 37 def write_payload_to_file(cls, url: ParseResult, content: str) -> None: 38 """Write payload into a file. 39 40 Args: 41 url: url of the file. 42 content: content to write into the file. 43 """ 44 from lakehouse_engine.core.exec_env import ExecEnv 45 46 str_url = urlunparse(url) 47 cls._LOGGER.info(f"Trying with dbfs_storage: Writing into file: {str_url}") 48 DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.put(str_url, content, True)
Write payload into a file.
Arguments:
- url: url of the file.
- content: content to write into the file.