lakehouse_engine.utils.storage.dbfs_storage

Module to represent a DBFS file storage system.

 1"""Module to represent a DBFS file storage system."""
 2
 3from typing import Any
 4from urllib.parse import ParseResult, urlunparse
 5
 6from lakehouse_engine.utils.databricks_utils import DatabricksUtils
 7from lakehouse_engine.utils.logging_handler import LoggingHandler
 8from lakehouse_engine.utils.storage.file_storage import FileStorage
 9
10
11class DBFSStorage(FileStorage):
12    """Class to represent a DBFS file storage system."""
13
14    _LOGGER = LoggingHandler(__name__).get_logger()
15    _MAX_INT = 2147483647
16
17    @classmethod
18    def get_file_payload(cls, url: ParseResult) -> Any:
19        """Get the content of a file.
20
21        Args:
22            url: url of the file.
23
24        Returns:
25            File payload/content.
26        """
27        from lakehouse_engine.core.exec_env import ExecEnv
28
29        str_url = urlunparse(url)
30        cls._LOGGER.info(f"Trying with dbfs_storage: Reading from file: {str_url}")
31        return DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.head(
32            str_url, cls._MAX_INT
33        )
34
35    @classmethod
36    def write_payload_to_file(cls, url: ParseResult, content: str) -> None:
37        """Write payload into a file.
38
39        Args:
40            url: url of the file.
41            content: content to write into the file.
42        """
43        from lakehouse_engine.core.exec_env import ExecEnv
44
45        str_url = urlunparse(url)
46        cls._LOGGER.info(f"Trying with dbfs_storage: Writing into file: {str_url}")
47        DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.put(str_url, content, True)
12class DBFSStorage(FileStorage):
13    """Class to represent a DBFS file storage system."""
14
15    _LOGGER = LoggingHandler(__name__).get_logger()
16    _MAX_INT = 2147483647
17
18    @classmethod
19    def get_file_payload(cls, url: ParseResult) -> Any:
20        """Get the content of a file.
21
22        Args:
23            url: url of the file.
24
25        Returns:
26            File payload/content.
27        """
28        from lakehouse_engine.core.exec_env import ExecEnv
29
30        str_url = urlunparse(url)
31        cls._LOGGER.info(f"Trying with dbfs_storage: Reading from file: {str_url}")
32        return DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.head(
33            str_url, cls._MAX_INT
34        )
35
36    @classmethod
37    def write_payload_to_file(cls, url: ParseResult, content: str) -> None:
38        """Write payload into a file.
39
40        Args:
41            url: url of the file.
42            content: content to write into the file.
43        """
44        from lakehouse_engine.core.exec_env import ExecEnv
45
46        str_url = urlunparse(url)
47        cls._LOGGER.info(f"Trying with dbfs_storage: Writing into file: {str_url}")
48        DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.put(str_url, content, True)

Class to represent a DBFS file storage system.

@classmethod
def get_file_payload(cls, url: urllib.parse.ParseResult) -> Any:
18    @classmethod
19    def get_file_payload(cls, url: ParseResult) -> Any:
20        """Get the content of a file.
21
22        Args:
23            url: url of the file.
24
25        Returns:
26            File payload/content.
27        """
28        from lakehouse_engine.core.exec_env import ExecEnv
29
30        str_url = urlunparse(url)
31        cls._LOGGER.info(f"Trying with dbfs_storage: Reading from file: {str_url}")
32        return DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.head(
33            str_url, cls._MAX_INT
34        )

Get the content of a file.

Arguments:
  • url: url of the file.
Returns:

File payload/content.

@classmethod
def write_payload_to_file(cls, url: urllib.parse.ParseResult, content: str) -> None:
36    @classmethod
37    def write_payload_to_file(cls, url: ParseResult, content: str) -> None:
38        """Write payload into a file.
39
40        Args:
41            url: url of the file.
42            content: content to write into the file.
43        """
44        from lakehouse_engine.core.exec_env import ExecEnv
45
46        str_url = urlunparse(url)
47        cls._LOGGER.info(f"Trying with dbfs_storage: Writing into file: {str_url}")
48        DatabricksUtils.get_db_utils(ExecEnv.SESSION).fs.put(str_url, content, True)

Write payload into a file.

Arguments:
  • url: url of the file.
  • content: content to write into the file.