lakehouse_engine.utils.configs.config_utils

Module to read configurations.

  1"""Module to read configurations."""
  2
  3import importlib.resources
  4from typing import Any, Optional, Union
  5
  6import pkg_resources
  7import yaml
  8
  9from lakehouse_engine.utils.logging_handler import LoggingHandler
 10from lakehouse_engine.utils.storage.file_storage_functions import FileStorageFunctions
 11
 12
 13class ConfigUtils(object):
 14    """Config utilities class."""
 15
 16    _LOGGER = LoggingHandler(__name__).get_logger()
 17    SENSITIVE_INFO = [
 18        "kafka.ssl.keystore.password",
 19        "kafka.ssl.truststore.password",
 20        "password",
 21        "secret",
 22        "credential",
 23        "credentials",
 24        "pass",
 25        "key",
 26    ]
 27
 28    @classmethod
 29    def get_acon(
 30        cls,
 31        acon_path: Optional[str] = None,
 32        acon: Optional[dict] = None,
 33        disable_dbfs_retry: bool = False,
 34    ) -> dict:
 35        """Get acon based on a filesystem path or on a dict.
 36
 37        Args:
 38            acon_path: path of the acon (algorithm configuration) file.
 39            acon: acon provided directly through python code (e.g., notebooks
 40                or other apps).
 41            disable_dbfs_retry: optional flag to disable file storage dbfs.
 42
 43        Returns:
 44            Dict representation of an acon.
 45        """
 46        acon = (
 47            acon if acon else ConfigUtils.read_json_acon(acon_path, disable_dbfs_retry)
 48        )
 49        return acon
 50
 51    @staticmethod
 52    def get_config(package: str = "lakehouse_engine.configs") -> Any:
 53        """Get the lakehouse engine configuration file.
 54
 55        Returns:
 56            Configuration dictionary
 57        """
 58        with importlib.resources.open_binary(package, "engine.yaml") as config:
 59            config = yaml.safe_load(config)
 60        return config
 61
 62    @classmethod
 63    def get_engine_version(cls) -> str:
 64        """Get Lakehouse Engine version from the installed packages.
 65
 66        Returns:
 67            String of engine version.
 68        """
 69        try:
 70            version = pkg_resources.get_distribution("lakehouse-engine").version
 71        except pkg_resources.DistributionNotFound:
 72            cls._LOGGER.info("Could not identify Lakehouse Engine version.")
 73            version = ""
 74        return str(version)
 75
 76    @staticmethod
 77    def read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any:
 78        """Read an acon (algorithm configuration) file.
 79
 80        Args:
 81            path: path to the acon file.
 82            disable_dbfs_retry: optional flag to disable file storage dbfs.
 83
 84        Returns:
 85            The acon file content as a dict.
 86        """
 87        return FileStorageFunctions.read_json(path, disable_dbfs_retry)
 88
 89    @staticmethod
 90    def read_sql(path: str, disable_dbfs_retry: bool = False) -> Any:
 91        """Read a DDL file in Spark SQL format from a cloud object storage system.
 92
 93        Args:
 94            path: path to the SQL file.
 95            disable_dbfs_retry: optional flag to disable file storage dbfs.
 96
 97        Returns:
 98            Content of the SQL file.
 99        """
100        return FileStorageFunctions.read_sql(path, disable_dbfs_retry)
101
102    @classmethod
103    def remove_sensitive_info(
104        cls, dict_to_replace: Union[dict, list]
105    ) -> Union[dict, list]:
106        """Remove sensitive info from a dictionary.
107
108        Args:
109            dict_to_replace: dict where we want to remove sensitive info.
110
111        Returns:
112            dict without sensitive information.
113        """
114        if isinstance(dict_to_replace, list):
115            return [cls.remove_sensitive_info(k) for k in dict_to_replace]
116        elif isinstance(dict_to_replace, dict):
117            return {
118                k: "******" if k in cls.SENSITIVE_INFO else cls.remove_sensitive_info(v)
119                for k, v in dict_to_replace.items()
120            }
121        else:
122            return dict_to_replace
class ConfigUtils:
 14class ConfigUtils(object):
 15    """Config utilities class."""
 16
 17    _LOGGER = LoggingHandler(__name__).get_logger()
 18    SENSITIVE_INFO = [
 19        "kafka.ssl.keystore.password",
 20        "kafka.ssl.truststore.password",
 21        "password",
 22        "secret",
 23        "credential",
 24        "credentials",
 25        "pass",
 26        "key",
 27    ]
 28
 29    @classmethod
 30    def get_acon(
 31        cls,
 32        acon_path: Optional[str] = None,
 33        acon: Optional[dict] = None,
 34        disable_dbfs_retry: bool = False,
 35    ) -> dict:
 36        """Get acon based on a filesystem path or on a dict.
 37
 38        Args:
 39            acon_path: path of the acon (algorithm configuration) file.
 40            acon: acon provided directly through python code (e.g., notebooks
 41                or other apps).
 42            disable_dbfs_retry: optional flag to disable file storage dbfs.
 43
 44        Returns:
 45            Dict representation of an acon.
 46        """
 47        acon = (
 48            acon if acon else ConfigUtils.read_json_acon(acon_path, disable_dbfs_retry)
 49        )
 50        return acon
 51
 52    @staticmethod
 53    def get_config(package: str = "lakehouse_engine.configs") -> Any:
 54        """Get the lakehouse engine configuration file.
 55
 56        Returns:
 57            Configuration dictionary
 58        """
 59        with importlib.resources.open_binary(package, "engine.yaml") as config:
 60            config = yaml.safe_load(config)
 61        return config
 62
 63    @classmethod
 64    def get_engine_version(cls) -> str:
 65        """Get Lakehouse Engine version from the installed packages.
 66
 67        Returns:
 68            String of engine version.
 69        """
 70        try:
 71            version = pkg_resources.get_distribution("lakehouse-engine").version
 72        except pkg_resources.DistributionNotFound:
 73            cls._LOGGER.info("Could not identify Lakehouse Engine version.")
 74            version = ""
 75        return str(version)
 76
 77    @staticmethod
 78    def read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any:
 79        """Read an acon (algorithm configuration) file.
 80
 81        Args:
 82            path: path to the acon file.
 83            disable_dbfs_retry: optional flag to disable file storage dbfs.
 84
 85        Returns:
 86            The acon file content as a dict.
 87        """
 88        return FileStorageFunctions.read_json(path, disable_dbfs_retry)
 89
 90    @staticmethod
 91    def read_sql(path: str, disable_dbfs_retry: bool = False) -> Any:
 92        """Read a DDL file in Spark SQL format from a cloud object storage system.
 93
 94        Args:
 95            path: path to the SQL file.
 96            disable_dbfs_retry: optional flag to disable file storage dbfs.
 97
 98        Returns:
 99            Content of the SQL file.
100        """
101        return FileStorageFunctions.read_sql(path, disable_dbfs_retry)
102
103    @classmethod
104    def remove_sensitive_info(
105        cls, dict_to_replace: Union[dict, list]
106    ) -> Union[dict, list]:
107        """Remove sensitive info from a dictionary.
108
109        Args:
110            dict_to_replace: dict where we want to remove sensitive info.
111
112        Returns:
113            dict without sensitive information.
114        """
115        if isinstance(dict_to_replace, list):
116            return [cls.remove_sensitive_info(k) for k in dict_to_replace]
117        elif isinstance(dict_to_replace, dict):
118            return {
119                k: "******" if k in cls.SENSITIVE_INFO else cls.remove_sensitive_info(v)
120                for k, v in dict_to_replace.items()
121            }
122        else:
123            return dict_to_replace

Config utilities class.

SENSITIVE_INFO = ['kafka.ssl.keystore.password', 'kafka.ssl.truststore.password', 'password', 'secret', 'credential', 'credentials', 'pass', 'key']
@classmethod
def get_acon( cls, acon_path: Optional[str] = None, acon: Optional[dict] = None, disable_dbfs_retry: bool = False) -> dict:
29    @classmethod
30    def get_acon(
31        cls,
32        acon_path: Optional[str] = None,
33        acon: Optional[dict] = None,
34        disable_dbfs_retry: bool = False,
35    ) -> dict:
36        """Get acon based on a filesystem path or on a dict.
37
38        Args:
39            acon_path: path of the acon (algorithm configuration) file.
40            acon: acon provided directly through python code (e.g., notebooks
41                or other apps).
42            disable_dbfs_retry: optional flag to disable file storage dbfs.
43
44        Returns:
45            Dict representation of an acon.
46        """
47        acon = (
48            acon if acon else ConfigUtils.read_json_acon(acon_path, disable_dbfs_retry)
49        )
50        return acon

Get acon based on a filesystem path or on a dict.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:

Dict representation of an acon.

@staticmethod
def get_config(package: str = 'lakehouse_engine.configs') -> Any:
52    @staticmethod
53    def get_config(package: str = "lakehouse_engine.configs") -> Any:
54        """Get the lakehouse engine configuration file.
55
56        Returns:
57            Configuration dictionary
58        """
59        with importlib.resources.open_binary(package, "engine.yaml") as config:
60            config = yaml.safe_load(config)
61        return config

Get the lakehouse engine configuration file.

Returns:

Configuration dictionary

@classmethod
def get_engine_version(cls) -> str:
63    @classmethod
64    def get_engine_version(cls) -> str:
65        """Get Lakehouse Engine version from the installed packages.
66
67        Returns:
68            String of engine version.
69        """
70        try:
71            version = pkg_resources.get_distribution("lakehouse-engine").version
72        except pkg_resources.DistributionNotFound:
73            cls._LOGGER.info("Could not identify Lakehouse Engine version.")
74            version = ""
75        return str(version)

Get Lakehouse Engine version from the installed packages.

Returns:

String of engine version.

@staticmethod
def read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any:
77    @staticmethod
78    def read_json_acon(path: str, disable_dbfs_retry: bool = False) -> Any:
79        """Read an acon (algorithm configuration) file.
80
81        Args:
82            path: path to the acon file.
83            disable_dbfs_retry: optional flag to disable file storage dbfs.
84
85        Returns:
86            The acon file content as a dict.
87        """
88        return FileStorageFunctions.read_json(path, disable_dbfs_retry)

Read an acon (algorithm configuration) file.

Arguments:
  • path: path to the acon file.
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:

The acon file content as a dict.

@staticmethod
def read_sql(path: str, disable_dbfs_retry: bool = False) -> Any:
 90    @staticmethod
 91    def read_sql(path: str, disable_dbfs_retry: bool = False) -> Any:
 92        """Read a DDL file in Spark SQL format from a cloud object storage system.
 93
 94        Args:
 95            path: path to the SQL file.
 96            disable_dbfs_retry: optional flag to disable file storage dbfs.
 97
 98        Returns:
 99            Content of the SQL file.
100        """
101        return FileStorageFunctions.read_sql(path, disable_dbfs_retry)

Read a DDL file in Spark SQL format from a cloud object storage system.

Arguments:
  • path: path to the SQL file.
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:

Content of the SQL file.

@classmethod
def remove_sensitive_info(cls, dict_to_replace: Union[dict, list]) -> Union[dict, list]:
103    @classmethod
104    def remove_sensitive_info(
105        cls, dict_to_replace: Union[dict, list]
106    ) -> Union[dict, list]:
107        """Remove sensitive info from a dictionary.
108
109        Args:
110            dict_to_replace: dict where we want to remove sensitive info.
111
112        Returns:
113            dict without sensitive information.
114        """
115        if isinstance(dict_to_replace, list):
116            return [cls.remove_sensitive_info(k) for k in dict_to_replace]
117        elif isinstance(dict_to_replace, dict):
118            return {
119                k: "******" if k in cls.SENSITIVE_INFO else cls.remove_sensitive_info(v)
120                for k, v in dict_to_replace.items()
121            }
122        else:
123            return dict_to_replace

Remove sensitive info from a dictionary.

Arguments:
  • dict_to_replace: dict where we want to remove sensitive info.
Returns:

dict without sensitive information.