lakehouse_engine.utils.storage.file_storage_functions
Module for common file storage functions.
1"""Module for common file storage functions.""" 2 3import json 4from abc import ABC 5from typing import Any 6from urllib.parse import ParseResult, urlparse 7 8import boto3 9 10from lakehouse_engine.utils.storage.dbfs_storage import DBFSStorage 11from lakehouse_engine.utils.storage.local_fs_storage import LocalFSStorage 12from lakehouse_engine.utils.storage.s3_storage import S3Storage 13 14 15class FileStorageFunctions(ABC): # noqa: B024 16 """Class for common file storage functions.""" 17 18 @classmethod 19 def read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any: 20 """Read a json file. 21 22 The file should be in a supported file system (e.g., s3, dbfs or 23 local filesystem). 24 25 Args: 26 path: path to the json file. 27 disable_dbfs_retry: optional flag to disable file storage dbfs. 28 29 Returns: 30 Dict with json file content. 31 """ 32 url = urlparse(path, allow_fragments=False) 33 if disable_dbfs_retry: 34 return json.load(S3Storage.get_file_payload(url)) 35 elif url.scheme == "s3" and cls.is_boto3_configured(): 36 try: 37 return json.load(S3Storage.get_file_payload(url)) 38 except Exception: 39 return json.loads(DBFSStorage.get_file_payload(url)) 40 elif url.scheme == "file": 41 return json.load(LocalFSStorage.get_file_payload(url)) 42 elif url.scheme in ["dbfs", "s3"]: 43 return json.loads(DBFSStorage.get_file_payload(url)) 44 else: 45 raise NotImplementedError( 46 f"File storage protocol not implemented for {path}." 47 ) 48 49 @classmethod 50 def read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any: 51 """Read a sql file. 52 53 The file should be in a supported file system (e.g., s3, dbfs or local 54 filesystem). 55 56 Args: 57 path: path to the sql file. 58 disable_dbfs_retry: optional flag to disable file storage dbfs. 59 60 Returns: 61 Content of the SQL file. 62 """ 63 url = urlparse(path, allow_fragments=False) 64 if disable_dbfs_retry: 65 return S3Storage.get_file_payload(url).read().decode("utf-8") 66 elif url.scheme == "s3" and cls.is_boto3_configured(): 67 try: 68 return S3Storage.get_file_payload(url).read().decode("utf-8") 69 except Exception: 70 return DBFSStorage.get_file_payload(url) 71 elif url.scheme == "file": 72 return LocalFSStorage.get_file_payload(url).read() 73 elif url.scheme in ["dbfs", "s3"]: 74 return DBFSStorage.get_file_payload(url) 75 else: 76 raise NotImplementedError( 77 f"Object storage protocol not implemented for {path}." 78 ) 79 80 @classmethod 81 def write_payload( 82 cls, path: str, url: ParseResult, content: str, disable_dbfs_retry: bool = False 83 ) -> None: 84 """Write payload into a file. 85 86 The file should be in a supported file system (e.g., s3, dbfs or local 87 filesystem). 88 89 Args: 90 path: path to validate the file type. 91 url: url of the file. 92 content: content to write into the file. 93 disable_dbfs_retry: optional flag to disable file storage dbfs. 94 """ 95 if disable_dbfs_retry: 96 S3Storage.write_payload_to_file(url, content) 97 elif path.startswith("s3://") and cls.is_boto3_configured(): 98 try: 99 S3Storage.write_payload_to_file(url, content) 100 except Exception: 101 DBFSStorage.write_payload_to_file(url, content) 102 elif path.startswith(("s3://", "dbfs:/")): 103 DBFSStorage.write_payload_to_file(url, content) 104 else: 105 LocalFSStorage.write_payload_to_file(url, content) 106 107 @staticmethod 108 def is_boto3_configured() -> bool: 109 """Check if boto3 is able to locate credentials and properly configured. 110 111 If boto3 is not properly configured, we might want to try a different reader. 112 """ 113 try: 114 boto3.client("sts").get_caller_identity() 115 return True 116 except Exception: 117 return False
class
FileStorageFunctions(abc.ABC):
16class FileStorageFunctions(ABC): # noqa: B024 17 """Class for common file storage functions.""" 18 19 @classmethod 20 def read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any: 21 """Read a json file. 22 23 The file should be in a supported file system (e.g., s3, dbfs or 24 local filesystem). 25 26 Args: 27 path: path to the json file. 28 disable_dbfs_retry: optional flag to disable file storage dbfs. 29 30 Returns: 31 Dict with json file content. 32 """ 33 url = urlparse(path, allow_fragments=False) 34 if disable_dbfs_retry: 35 return json.load(S3Storage.get_file_payload(url)) 36 elif url.scheme == "s3" and cls.is_boto3_configured(): 37 try: 38 return json.load(S3Storage.get_file_payload(url)) 39 except Exception: 40 return json.loads(DBFSStorage.get_file_payload(url)) 41 elif url.scheme == "file": 42 return json.load(LocalFSStorage.get_file_payload(url)) 43 elif url.scheme in ["dbfs", "s3"]: 44 return json.loads(DBFSStorage.get_file_payload(url)) 45 else: 46 raise NotImplementedError( 47 f"File storage protocol not implemented for {path}." 48 ) 49 50 @classmethod 51 def read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any: 52 """Read a sql file. 53 54 The file should be in a supported file system (e.g., s3, dbfs or local 55 filesystem). 56 57 Args: 58 path: path to the sql file. 59 disable_dbfs_retry: optional flag to disable file storage dbfs. 60 61 Returns: 62 Content of the SQL file. 63 """ 64 url = urlparse(path, allow_fragments=False) 65 if disable_dbfs_retry: 66 return S3Storage.get_file_payload(url).read().decode("utf-8") 67 elif url.scheme == "s3" and cls.is_boto3_configured(): 68 try: 69 return S3Storage.get_file_payload(url).read().decode("utf-8") 70 except Exception: 71 return DBFSStorage.get_file_payload(url) 72 elif url.scheme == "file": 73 return LocalFSStorage.get_file_payload(url).read() 74 elif url.scheme in ["dbfs", "s3"]: 75 return DBFSStorage.get_file_payload(url) 76 else: 77 raise NotImplementedError( 78 f"Object storage protocol not implemented for {path}." 79 ) 80 81 @classmethod 82 def write_payload( 83 cls, path: str, url: ParseResult, content: str, disable_dbfs_retry: bool = False 84 ) -> None: 85 """Write payload into a file. 86 87 The file should be in a supported file system (e.g., s3, dbfs or local 88 filesystem). 89 90 Args: 91 path: path to validate the file type. 92 url: url of the file. 93 content: content to write into the file. 94 disable_dbfs_retry: optional flag to disable file storage dbfs. 95 """ 96 if disable_dbfs_retry: 97 S3Storage.write_payload_to_file(url, content) 98 elif path.startswith("s3://") and cls.is_boto3_configured(): 99 try: 100 S3Storage.write_payload_to_file(url, content) 101 except Exception: 102 DBFSStorage.write_payload_to_file(url, content) 103 elif path.startswith(("s3://", "dbfs:/")): 104 DBFSStorage.write_payload_to_file(url, content) 105 else: 106 LocalFSStorage.write_payload_to_file(url, content) 107 108 @staticmethod 109 def is_boto3_configured() -> bool: 110 """Check if boto3 is able to locate credentials and properly configured. 111 112 If boto3 is not properly configured, we might want to try a different reader. 113 """ 114 try: 115 boto3.client("sts").get_caller_identity() 116 return True 117 except Exception: 118 return False
Class for common file storage functions.
@classmethod
def
read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
19 @classmethod 20 def read_json(cls, path: str, disable_dbfs_retry: bool = False) -> Any: 21 """Read a json file. 22 23 The file should be in a supported file system (e.g., s3, dbfs or 24 local filesystem). 25 26 Args: 27 path: path to the json file. 28 disable_dbfs_retry: optional flag to disable file storage dbfs. 29 30 Returns: 31 Dict with json file content. 32 """ 33 url = urlparse(path, allow_fragments=False) 34 if disable_dbfs_retry: 35 return json.load(S3Storage.get_file_payload(url)) 36 elif url.scheme == "s3" and cls.is_boto3_configured(): 37 try: 38 return json.load(S3Storage.get_file_payload(url)) 39 except Exception: 40 return json.loads(DBFSStorage.get_file_payload(url)) 41 elif url.scheme == "file": 42 return json.load(LocalFSStorage.get_file_payload(url)) 43 elif url.scheme in ["dbfs", "s3"]: 44 return json.loads(DBFSStorage.get_file_payload(url)) 45 else: 46 raise NotImplementedError( 47 f"File storage protocol not implemented for {path}." 48 )
Read a json file.
The file should be in a supported file system (e.g., s3, dbfs or local filesystem).
Arguments:
- path: path to the json file.
- disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:
Dict with json file content.
@classmethod
def
read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any:
50 @classmethod 51 def read_sql(cls, path: str, disable_dbfs_retry: bool = False) -> Any: 52 """Read a sql file. 53 54 The file should be in a supported file system (e.g., s3, dbfs or local 55 filesystem). 56 57 Args: 58 path: path to the sql file. 59 disable_dbfs_retry: optional flag to disable file storage dbfs. 60 61 Returns: 62 Content of the SQL file. 63 """ 64 url = urlparse(path, allow_fragments=False) 65 if disable_dbfs_retry: 66 return S3Storage.get_file_payload(url).read().decode("utf-8") 67 elif url.scheme == "s3" and cls.is_boto3_configured(): 68 try: 69 return S3Storage.get_file_payload(url).read().decode("utf-8") 70 except Exception: 71 return DBFSStorage.get_file_payload(url) 72 elif url.scheme == "file": 73 return LocalFSStorage.get_file_payload(url).read() 74 elif url.scheme in ["dbfs", "s3"]: 75 return DBFSStorage.get_file_payload(url) 76 else: 77 raise NotImplementedError( 78 f"Object storage protocol not implemented for {path}." 79 )
Read a sql file.
The file should be in a supported file system (e.g., s3, dbfs or local filesystem).
Arguments:
- path: path to the sql file.
- disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:
Content of the SQL file.
@classmethod
def
write_payload( cls, path: str, url: urllib.parse.ParseResult, content: str, disable_dbfs_retry: bool = False) -> None:
81 @classmethod 82 def write_payload( 83 cls, path: str, url: ParseResult, content: str, disable_dbfs_retry: bool = False 84 ) -> None: 85 """Write payload into a file. 86 87 The file should be in a supported file system (e.g., s3, dbfs or local 88 filesystem). 89 90 Args: 91 path: path to validate the file type. 92 url: url of the file. 93 content: content to write into the file. 94 disable_dbfs_retry: optional flag to disable file storage dbfs. 95 """ 96 if disable_dbfs_retry: 97 S3Storage.write_payload_to_file(url, content) 98 elif path.startswith("s3://") and cls.is_boto3_configured(): 99 try: 100 S3Storage.write_payload_to_file(url, content) 101 except Exception: 102 DBFSStorage.write_payload_to_file(url, content) 103 elif path.startswith(("s3://", "dbfs:/")): 104 DBFSStorage.write_payload_to_file(url, content) 105 else: 106 LocalFSStorage.write_payload_to_file(url, content)
Write payload into a file.
The file should be in a supported file system (e.g., s3, dbfs or local filesystem).
Arguments:
- path: path to validate the file type.
- url: url of the file.
- content: content to write into the file.
- disable_dbfs_retry: optional flag to disable file storage dbfs.
@staticmethod
def
is_boto3_configured() -> bool:
108 @staticmethod 109 def is_boto3_configured() -> bool: 110 """Check if boto3 is able to locate credentials and properly configured. 111 112 If boto3 is not properly configured, we might want to try a different reader. 113 """ 114 try: 115 boto3.client("sts").get_caller_identity() 116 return True 117 except Exception: 118 return False
Check if boto3 is able to locate credentials and properly configured.
If boto3 is not properly configured, we might want to try a different reader.