lakehouse_engine.utils.configs.config_utils
Module to read configurations.
1"""Module to read configurations.""" 2 3import importlib.resources 4from typing import Any, Optional, Union 5 6import pkg_resources 7import yaml 8 9from lakehouse_engine.utils.logging_handler import LoggingHandler 10from lakehouse_engine.utils.storage.file_storage_functions import FileStorageFunctions 11 12 13class ConfigUtils(object): 14 """Config utilities class.""" 15 16 _LOGGER = LoggingHandler(__name__).get_logger() 17 SENSITIVE_INFO = [ 18 "kafka.ssl.keystore.password", 19 "kafka.ssl.truststore.password", 20 "password", 21 "secret", 22 "credential", 23 "credentials", 24 "pass", 25 "key", 26 ] 27 28 @classmethod 29 def get_acon( 30 cls, 31 acon_path: Optional[str] = None, 32 acon: Optional[dict] = None, 33 disable_dbfs_retry: bool = False, 34 ) -> dict: 35 """Get acon based on a filesystem path or on a dict. 36 37 Args: 38 acon_path: path of the acon (algorithm configuration) file. 39 acon: acon provided directly through python code (e.g., notebooks 40 or other apps). 41 disable_dbfs_retry: optional flag to disable file storage dbfs. 42 43 Returns: 44 Dict representation of an acon. 45 """ 46 acon = ( 47 acon if acon else ConfigUtils.read_json_acon(acon_path, disable_dbfs_retry) 48 ) 49 return acon 50 51 @staticmethod 52 def get_config(package: str = "lakehouse_engine.configs") -> Any: 53 """Get the lakehouse engine configuration file. 54 55 Returns: 56 Configuration dictionary 57 """ 58 with importlib.resources.open_binary(package, "engine.yaml") as config: 59 config = yaml.safe_load(config) 60 return config 61 62 @classmethod 63 def get_engine_version(cls) -> str: 64 """Get Lakehouse Engine version from the installed packages. 65 66 Returns: 67 String of engine version. 68 """ 69 try: 70 version = pkg_resources.get_distribution("lakehouse-engine").version 71 except pkg_resources.DistributionNotFound: 72 cls._LOGGER.info("Could not identify Lakehouse Engine version.") 73 version = "" 74 return str(version) 75 76 @staticmethod 77 def read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any: 78 """Read an acon (algorithm configuration) file. 79 80 Args: 81 path: path to the acon file. 82 disable_dbfs_retry: optional flag to disable file storage dbfs. 83 84 Returns: 85 The acon file content as a dict. 86 """ 87 return FileStorageFunctions.read_json(path, disable_dbfs_retry) 88 89 @staticmethod 90 def read_sql(path: str, disable_dbfs_retry: bool = False) -> Any: 91 """Read a DDL file in Spark SQL format from a cloud object storage system. 92 93 Args: 94 path: path to the SQL file. 95 disable_dbfs_retry: optional flag to disable file storage dbfs. 96 97 Returns: 98 Content of the SQL file. 99 """ 100 return FileStorageFunctions.read_sql(path, disable_dbfs_retry) 101 102 @classmethod 103 def remove_sensitive_info( 104 cls, dict_to_replace: Union[dict, list] 105 ) -> Union[dict, list]: 106 """Remove sensitive info from a dictionary. 107 108 Args: 109 dict_to_replace: dict where we want to remove sensitive info. 110 111 Returns: 112 dict without sensitive information. 113 """ 114 if isinstance(dict_to_replace, list): 115 return [cls.remove_sensitive_info(k) for k in dict_to_replace] 116 elif isinstance(dict_to_replace, dict): 117 return { 118 k: "******" if k in cls.SENSITIVE_INFO else cls.remove_sensitive_info(v) 119 for k, v in dict_to_replace.items() 120 } 121 else: 122 return dict_to_replace
class
ConfigUtils:
14class ConfigUtils(object): 15 """Config utilities class.""" 16 17 _LOGGER = LoggingHandler(__name__).get_logger() 18 SENSITIVE_INFO = [ 19 "kafka.ssl.keystore.password", 20 "kafka.ssl.truststore.password", 21 "password", 22 "secret", 23 "credential", 24 "credentials", 25 "pass", 26 "key", 27 ] 28 29 @classmethod 30 def get_acon( 31 cls, 32 acon_path: Optional[str] = None, 33 acon: Optional[dict] = None, 34 disable_dbfs_retry: bool = False, 35 ) -> dict: 36 """Get acon based on a filesystem path or on a dict. 37 38 Args: 39 acon_path: path of the acon (algorithm configuration) file. 40 acon: acon provided directly through python code (e.g., notebooks 41 or other apps). 42 disable_dbfs_retry: optional flag to disable file storage dbfs. 43 44 Returns: 45 Dict representation of an acon. 46 """ 47 acon = ( 48 acon if acon else ConfigUtils.read_json_acon(acon_path, disable_dbfs_retry) 49 ) 50 return acon 51 52 @staticmethod 53 def get_config(package: str = "lakehouse_engine.configs") -> Any: 54 """Get the lakehouse engine configuration file. 55 56 Returns: 57 Configuration dictionary 58 """ 59 with importlib.resources.open_binary(package, "engine.yaml") as config: 60 config = yaml.safe_load(config) 61 return config 62 63 @classmethod 64 def get_engine_version(cls) -> str: 65 """Get Lakehouse Engine version from the installed packages. 66 67 Returns: 68 String of engine version. 69 """ 70 try: 71 version = pkg_resources.get_distribution("lakehouse-engine").version 72 except pkg_resources.DistributionNotFound: 73 cls._LOGGER.info("Could not identify Lakehouse Engine version.") 74 version = "" 75 return str(version) 76 77 @staticmethod 78 def read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any: 79 """Read an acon (algorithm configuration) file. 80 81 Args: 82 path: path to the acon file. 83 disable_dbfs_retry: optional flag to disable file storage dbfs. 84 85 Returns: 86 The acon file content as a dict. 87 """ 88 return FileStorageFunctions.read_json(path, disable_dbfs_retry) 89 90 @staticmethod 91 def read_sql(path: str, disable_dbfs_retry: bool = False) -> Any: 92 """Read a DDL file in Spark SQL format from a cloud object storage system. 93 94 Args: 95 path: path to the SQL file. 96 disable_dbfs_retry: optional flag to disable file storage dbfs. 97 98 Returns: 99 Content of the SQL file. 100 """ 101 return FileStorageFunctions.read_sql(path, disable_dbfs_retry) 102 103 @classmethod 104 def remove_sensitive_info( 105 cls, dict_to_replace: Union[dict, list] 106 ) -> Union[dict, list]: 107 """Remove sensitive info from a dictionary. 108 109 Args: 110 dict_to_replace: dict where we want to remove sensitive info. 111 112 Returns: 113 dict without sensitive information. 114 """ 115 if isinstance(dict_to_replace, list): 116 return [cls.remove_sensitive_info(k) for k in dict_to_replace] 117 elif isinstance(dict_to_replace, dict): 118 return { 119 k: "******" if k in cls.SENSITIVE_INFO else cls.remove_sensitive_info(v) 120 for k, v in dict_to_replace.items() 121 } 122 else: 123 return dict_to_replace
Config utilities class.
SENSITIVE_INFO =
['kafka.ssl.keystore.password', 'kafka.ssl.truststore.password', 'password', 'secret', 'credential', 'credentials', 'pass', 'key']
@classmethod
def
get_acon( cls, acon_path: Optional[str] = None, acon: Optional[dict] = None, disable_dbfs_retry: bool = False) -> dict:
29 @classmethod 30 def get_acon( 31 cls, 32 acon_path: Optional[str] = None, 33 acon: Optional[dict] = None, 34 disable_dbfs_retry: bool = False, 35 ) -> dict: 36 """Get acon based on a filesystem path or on a dict. 37 38 Args: 39 acon_path: path of the acon (algorithm configuration) file. 40 acon: acon provided directly through python code (e.g., notebooks 41 or other apps). 42 disable_dbfs_retry: optional flag to disable file storage dbfs. 43 44 Returns: 45 Dict representation of an acon. 46 """ 47 acon = ( 48 acon if acon else ConfigUtils.read_json_acon(acon_path, disable_dbfs_retry) 49 ) 50 return acon
Get acon based on a filesystem path or on a dict.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:
Dict representation of an acon.
52 @staticmethod 53 def get_config(package: str = "lakehouse_engine.configs") -> Any: 54 """Get the lakehouse engine configuration file. 55 56 Returns: 57 Configuration dictionary 58 """ 59 with importlib.resources.open_binary(package, "engine.yaml") as config: 60 config = yaml.safe_load(config) 61 return config
Get the lakehouse engine configuration file.
Returns:
Configuration dictionary
@classmethod
def
get_engine_version(cls) -> str:
63 @classmethod 64 def get_engine_version(cls) -> str: 65 """Get Lakehouse Engine version from the installed packages. 66 67 Returns: 68 String of engine version. 69 """ 70 try: 71 version = pkg_resources.get_distribution("lakehouse-engine").version 72 except pkg_resources.DistributionNotFound: 73 cls._LOGGER.info("Could not identify Lakehouse Engine version.") 74 version = "" 75 return str(version)
Get Lakehouse Engine version from the installed packages.
Returns:
String of engine version.
@staticmethod
def
read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any:
77 @staticmethod 78 def read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any: 79 """Read an acon (algorithm configuration) file. 80 81 Args: 82 path: path to the acon file. 83 disable_dbfs_retry: optional flag to disable file storage dbfs. 84 85 Returns: 86 The acon file content as a dict. 87 """ 88 return FileStorageFunctions.read_json(path, disable_dbfs_retry)
Read an acon (algorithm configuration) file.
Arguments:
- path: path to the acon file.
- disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:
The acon file content as a dict.
@staticmethod
def
read_sql(path: str, disable_dbfs_retry: bool = False) -> Any:
90 @staticmethod 91 def read_sql(path: str, disable_dbfs_retry: bool = False) -> Any: 92 """Read a DDL file in Spark SQL format from a cloud object storage system. 93 94 Args: 95 path: path to the SQL file. 96 disable_dbfs_retry: optional flag to disable file storage dbfs. 97 98 Returns: 99 Content of the SQL file. 100 """ 101 return FileStorageFunctions.read_sql(path, disable_dbfs_retry)
Read a DDL file in Spark SQL format from a cloud object storage system.
Arguments:
- path: path to the SQL file.
- disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:
Content of the SQL file.
@classmethod
def
remove_sensitive_info(cls, dict_to_replace: Union[dict, list]) -> Union[dict, list]:
103 @classmethod 104 def remove_sensitive_info( 105 cls, dict_to_replace: Union[dict, list] 106 ) -> Union[dict, list]: 107 """Remove sensitive info from a dictionary. 108 109 Args: 110 dict_to_replace: dict where we want to remove sensitive info. 111 112 Returns: 113 dict without sensitive information. 114 """ 115 if isinstance(dict_to_replace, list): 116 return [cls.remove_sensitive_info(k) for k in dict_to_replace] 117 elif isinstance(dict_to_replace, dict): 118 return { 119 k: "******" if k in cls.SENSITIVE_INFO else cls.remove_sensitive_info(v) 120 for k, v in dict_to_replace.items() 121 } 122 else: 123 return dict_to_replace
Remove sensitive info from a dictionary.
Arguments:
- dict_to_replace: dict where we want to remove sensitive info.
Returns:
dict without sensitive information.