lakehouse_engine.utils.storage.file_storage_functions

Module for common file storage functions.

  1"""Module for common file storage functions."""
  2
  3import json
  4from abc import ABC
  5from typing import Any
  6from urllib.parse import ParseResult, urlparse
  7
  8import boto3
  9
 10from lakehouse_engine.utils.storage.dbfs_storage import DBFSStorage
 11from lakehouse_engine.utils.storage.local_fs_storage import LocalFSStorage
 12from lakehouse_engine.utils.storage.s3_storage import S3Storage
 13
 14
 15class FileStorageFunctions(ABC):  # noqa: B024
 16    """Class for common file storage functions."""
 17
 18    @classmethod
 19    def read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
 20        """Read a json file.
 21
 22        The file should be in a supported file system (e.g., s3, dbfs or
 23        local filesystem).
 24
 25        Args:
 26            path: path to the json file.
 27            disable_dbfs_retry: optional flag to disable file storage dbfs.
 28
 29        Returns:
 30            Dict with json file content.
 31        """
 32        url = urlparse(path, allow_fragments=False)
 33        if disable_dbfs_retry:
 34            return json.load(S3Storage.get_file_payload(url))
 35        elif url.scheme == "s3" and cls.is_boto3_configured():
 36            try:
 37                return json.load(S3Storage.get_file_payload(url))
 38            except Exception:
 39                return json.loads(DBFSStorage.get_file_payload(url))
 40        elif url.scheme == "file":
 41            return json.load(LocalFSStorage.get_file_payload(url))
 42        elif url.scheme in ["dbfs", "s3"]:
 43            return json.loads(DBFSStorage.get_file_payload(url))
 44        else:
 45            raise NotImplementedError(
 46                f"File storage protocol not implemented for {path}."
 47            )
 48
 49    @classmethod
 50    def read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
 51        """Read a sql file.
 52
 53        The file should be in a supported file system (e.g., s3, dbfs or local
 54        filesystem).
 55
 56        Args:
 57            path: path to the sql file.
 58            disable_dbfs_retry: optional flag to disable file storage dbfs.
 59
 60        Returns:
 61            Content of the SQL file.
 62        """
 63        url = urlparse(path, allow_fragments=False)
 64        if disable_dbfs_retry:
 65            return S3Storage.get_file_payload(url).read().decode("utf-8")
 66        elif url.scheme == "s3" and cls.is_boto3_configured():
 67            try:
 68                return S3Storage.get_file_payload(url).read().decode("utf-8")
 69            except Exception:
 70                return DBFSStorage.get_file_payload(url)
 71        elif url.scheme == "file":
 72            return LocalFSStorage.get_file_payload(url).read()
 73        elif url.scheme in ["dbfs", "s3"]:
 74            return DBFSStorage.get_file_payload(url)
 75        else:
 76            raise NotImplementedError(
 77                f"Object storage protocol not implemented for {path}."
 78            )
 79
 80    @classmethod
 81    def write_payload(
 82        cls, path: str, url: ParseResult, content: str, disable_dbfs_retry: bool = False
 83    ) -> None:
 84        """Write payload into a file.
 85
 86        The file should be in a supported file system (e.g., s3, dbfs or local
 87        filesystem).
 88
 89        Args:
 90            path: path to validate the file type.
 91            url: url of the file.
 92            content: content to write into the file.
 93            disable_dbfs_retry: optional flag to disable file storage dbfs.
 94        """
 95        if disable_dbfs_retry:
 96            S3Storage.write_payload_to_file(url, content)
 97        elif path.startswith("s3://") and cls.is_boto3_configured():
 98            try:
 99                S3Storage.write_payload_to_file(url, content)
100            except Exception:
101                DBFSStorage.write_payload_to_file(url, content)
102        elif path.startswith(("s3://", "dbfs:/")):
103            DBFSStorage.write_payload_to_file(url, content)
104        else:
105            LocalFSStorage.write_payload_to_file(url, content)
106
107    @staticmethod
108    def is_boto3_configured() -> bool:
109        """Check if boto3 is able to locate credentials and properly configured.
110
111        If boto3 is not properly configured, we might want to try a different reader.
112        """
113        try:
114            boto3.client("sts").get_caller_identity()
115            return True
116        except Exception:
117            return False
class FileStorageFunctions(abc.ABC):
 16class FileStorageFunctions(ABC):  # noqa: B024
 17    """Class for common file storage functions."""
 18
 19    @classmethod
 20    def read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
 21        """Read a json file.
 22
 23        The file should be in a supported file system (e.g., s3, dbfs or
 24        local filesystem).
 25
 26        Args:
 27            path: path to the json file.
 28            disable_dbfs_retry: optional flag to disable file storage dbfs.
 29
 30        Returns:
 31            Dict with json file content.
 32        """
 33        url = urlparse(path, allow_fragments=False)
 34        if disable_dbfs_retry:
 35            return json.load(S3Storage.get_file_payload(url))
 36        elif url.scheme == "s3" and cls.is_boto3_configured():
 37            try:
 38                return json.load(S3Storage.get_file_payload(url))
 39            except Exception:
 40                return json.loads(DBFSStorage.get_file_payload(url))
 41        elif url.scheme == "file":
 42            return json.load(LocalFSStorage.get_file_payload(url))
 43        elif url.scheme in ["dbfs", "s3"]:
 44            return json.loads(DBFSStorage.get_file_payload(url))
 45        else:
 46            raise NotImplementedError(
 47                f"File storage protocol not implemented for {path}."
 48            )
 49
 50    @classmethod
 51    def read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
 52        """Read a sql file.
 53
 54        The file should be in a supported file system (e.g., s3, dbfs or local
 55        filesystem).
 56
 57        Args:
 58            path: path to the sql file.
 59            disable_dbfs_retry: optional flag to disable file storage dbfs.
 60
 61        Returns:
 62            Content of the SQL file.
 63        """
 64        url = urlparse(path, allow_fragments=False)
 65        if disable_dbfs_retry:
 66            return S3Storage.get_file_payload(url).read().decode("utf-8")
 67        elif url.scheme == "s3" and cls.is_boto3_configured():
 68            try:
 69                return S3Storage.get_file_payload(url).read().decode("utf-8")
 70            except Exception:
 71                return DBFSStorage.get_file_payload(url)
 72        elif url.scheme == "file":
 73            return LocalFSStorage.get_file_payload(url).read()
 74        elif url.scheme in ["dbfs", "s3"]:
 75            return DBFSStorage.get_file_payload(url)
 76        else:
 77            raise NotImplementedError(
 78                f"Object storage protocol not implemented for {path}."
 79            )
 80
 81    @classmethod
 82    def write_payload(
 83        cls, path: str, url: ParseResult, content: str, disable_dbfs_retry: bool = False
 84    ) -> None:
 85        """Write payload into a file.
 86
 87        The file should be in a supported file system (e.g., s3, dbfs or local
 88        filesystem).
 89
 90        Args:
 91            path: path to validate the file type.
 92            url: url of the file.
 93            content: content to write into the file.
 94            disable_dbfs_retry: optional flag to disable file storage dbfs.
 95        """
 96        if disable_dbfs_retry:
 97            S3Storage.write_payload_to_file(url, content)
 98        elif path.startswith("s3://") and cls.is_boto3_configured():
 99            try:
100                S3Storage.write_payload_to_file(url, content)
101            except Exception:
102                DBFSStorage.write_payload_to_file(url, content)
103        elif path.startswith(("s3://", "dbfs:/")):
104            DBFSStorage.write_payload_to_file(url, content)
105        else:
106            LocalFSStorage.write_payload_to_file(url, content)
107
108    @staticmethod
109    def is_boto3_configured() -> bool:
110        """Check if boto3 is able to locate credentials and properly configured.
111
112        If boto3 is not properly configured, we might want to try a different reader.
113        """
114        try:
115            boto3.client("sts").get_caller_identity()
116            return True
117        except Exception:
118            return False

Class for common file storage functions.

@classmethod
def read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
19    @classmethod
20    def read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
21        """Read a json file.
22
23        The file should be in a supported file system (e.g., s3, dbfs or
24        local filesystem).
25
26        Args:
27            path: path to the json file.
28            disable_dbfs_retry: optional flag to disable file storage dbfs.
29
30        Returns:
31            Dict with json file content.
32        """
33        url = urlparse(path, allow_fragments=False)
34        if disable_dbfs_retry:
35            return json.load(S3Storage.get_file_payload(url))
36        elif url.scheme == "s3" and cls.is_boto3_configured():
37            try:
38                return json.load(S3Storage.get_file_payload(url))
39            except Exception:
40                return json.loads(DBFSStorage.get_file_payload(url))
41        elif url.scheme == "file":
42            return json.load(LocalFSStorage.get_file_payload(url))
43        elif url.scheme in ["dbfs", "s3"]:
44            return json.loads(DBFSStorage.get_file_payload(url))
45        else:
46            raise NotImplementedError(
47                f"File storage protocol not implemented for {path}."
48            )

Read a json file.

The file should be in a supported file system (e.g., s3, dbfs or local filesystem).

Arguments:
  • path: path to the json file.
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:

Dict with json file content.

@classmethod
def read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
50    @classmethod
51    def read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
52        """Read a sql file.
53
54        The file should be in a supported file system (e.g., s3, dbfs or local
55        filesystem).
56
57        Args:
58            path: path to the sql file.
59            disable_dbfs_retry: optional flag to disable file storage dbfs.
60
61        Returns:
62            Content of the SQL file.
63        """
64        url = urlparse(path, allow_fragments=False)
65        if disable_dbfs_retry:
66            return S3Storage.get_file_payload(url).read().decode("utf-8")
67        elif url.scheme == "s3" and cls.is_boto3_configured():
68            try:
69                return S3Storage.get_file_payload(url).read().decode("utf-8")
70            except Exception:
71                return DBFSStorage.get_file_payload(url)
72        elif url.scheme == "file":
73            return LocalFSStorage.get_file_payload(url).read()
74        elif url.scheme in ["dbfs", "s3"]:
75            return DBFSStorage.get_file_payload(url)
76        else:
77            raise NotImplementedError(
78                f"Object storage protocol not implemented for {path}."
79            )

Read a sql file.

The file should be in a supported file system (e.g., s3, dbfs or local filesystem).

Arguments:
  • path: path to the sql file.
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:

Content of the SQL file.

@classmethod
def write_payload( cls, path: str, url: urllib.parse.ParseResult, content: str, disable_dbfs_retry: bool = False) -> None:
 81    @classmethod
 82    def write_payload(
 83        cls, path: str, url: ParseResult, content: str, disable_dbfs_retry: bool = False
 84    ) -> None:
 85        """Write payload into a file.
 86
 87        The file should be in a supported file system (e.g., s3, dbfs or local
 88        filesystem).
 89
 90        Args:
 91            path: path to validate the file type.
 92            url: url of the file.
 93            content: content to write into the file.
 94            disable_dbfs_retry: optional flag to disable file storage dbfs.
 95        """
 96        if disable_dbfs_retry:
 97            S3Storage.write_payload_to_file(url, content)
 98        elif path.startswith("s3://") and cls.is_boto3_configured():
 99            try:
100                S3Storage.write_payload_to_file(url, content)
101            except Exception:
102                DBFSStorage.write_payload_to_file(url, content)
103        elif path.startswith(("s3://", "dbfs:/")):
104            DBFSStorage.write_payload_to_file(url, content)
105        else:
106            LocalFSStorage.write_payload_to_file(url, content)

Write payload into a file.

The file should be in a supported file system (e.g., s3, dbfs or local filesystem).

Arguments:
  • path: path to validate the file type.
  • url: url of the file.
  • content: content to write into the file.
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
@staticmethod
def is_boto3_configured() -> bool:
108    @staticmethod
109    def is_boto3_configured() -> bool:
110        """Check if boto3 is able to locate credentials and properly configured.
111
112        If boto3 is not properly configured, we might want to try a different reader.
113        """
114        try:
115            boto3.client("sts").get_caller_identity()
116            return True
117        except Exception:
118            return False

Check if boto3 is able to locate credentials and properly configured.

If boto3 is not properly configured, we might want to try a different reader.