lakehouse_engine.utils.engine_usage_stats

Utilities for recording the engine activity.

  1"""Utilities for recording the engine activity."""
  2
  3import ast
  4import json
  5import re
  6from datetime import datetime
  7from typing import Dict
  8from urllib.parse import urlparse
  9
 10from lakehouse_engine.core.definitions import CollectEngineUsage, EngineStats
 11from lakehouse_engine.core.exec_env import ExecEnv
 12from lakehouse_engine.utils.configs.config_utils import ConfigUtils
 13from lakehouse_engine.utils.logging_handler import LoggingHandler
 14from lakehouse_engine.utils.storage.file_storage_functions import FileStorageFunctions
 15
 16
 17class EngineUsageStats(object):
 18    """Engine Usage utilities class."""
 19
 20    _LOGGER = LoggingHandler(__name__).get_logger()
 21
 22    @classmethod
 23    def store_engine_usage(
 24        cls,
 25        acon: dict,
 26        func_name: str,
 27        collect_engine_usage: str = None,
 28        spark_confs: dict = None,
 29    ) -> None:
 30        """Collects and store Lakehouse Engine usage statistics.
 31
 32        These statistics include the acon and other relevant information, such as
 33        the lakehouse engine version and the functions/algorithms being used.
 34
 35        Args:
 36            acon: acon dictionary file.
 37            func_name: function name that called this log acon.
 38            collect_engine_usage: Lakehouse usage statistics collection strategy.
 39            spark_confs: optional dictionary with the spark confs to be used when
 40                collecting the engine usage.
 41        """
 42        try:
 43            if (
 44                collect_engine_usage
 45                in [
 46                    CollectEngineUsage.ENABLED.value,
 47                    CollectEngineUsage.PROD_ONLY.value,
 48                ]
 49                or ExecEnv.ENGINE_CONFIG.collect_engine_usage
 50                in CollectEngineUsage.ENABLED.value
 51            ):
 52                start_timestamp = datetime.now()
 53                timestamp_str = start_timestamp.strftime("%Y%m%d%H%M%S")
 54
 55                usage_stats: Dict = {"acon": ConfigUtils.remove_sensitive_info(acon)}
 56                EngineUsageStats.get_spark_conf_values(usage_stats, spark_confs)
 57
 58                engine_usage_path = None
 59                if usage_stats["environment"] == "prod":
 60                    engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_usage_path
 61                elif collect_engine_usage != CollectEngineUsage.PROD_ONLY.value:
 62                    engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_dev_usage_path
 63
 64                if engine_usage_path is not None:
 65                    usage_stats["function"] = func_name
 66                    usage_stats["engine_version"] = ConfigUtils.get_engine_version()
 67                    usage_stats["start_timestamp"] = start_timestamp
 68                    usage_stats["year"] = start_timestamp.year
 69                    usage_stats["month"] = start_timestamp.month
 70                    run_id_extracted = re.search(
 71                        "run-([1-9]\\w+)", usage_stats["run_id"]
 72                    )
 73                    usage_stats["run_id"] = (
 74                        run_id_extracted.group(1) if run_id_extracted else ""
 75                    )
 76
 77                    log_file_name = f"eng_usage_{func_name}_{timestamp_str}.json"
 78
 79                    usage_stats_str = json.dumps(usage_stats, default=str)
 80
 81                    url = urlparse(
 82                        f"{engine_usage_path}/{usage_stats['dp_name']}/"
 83                        f"{start_timestamp.year}/{start_timestamp.month}/"
 84                        f"{log_file_name}",
 85                        allow_fragments=False,
 86                    )
 87
 88                    try:
 89                        FileStorageFunctions.write_payload(
 90                            engine_usage_path, url, usage_stats_str
 91                        )
 92                        cls._LOGGER.info("Storing Lakehouse Engine usage statistics")
 93                    except FileNotFoundError as e:
 94                        cls._LOGGER.error(
 95                            f"Could not write engine stats into file: {e}."
 96                        )
 97        except Exception as e:
 98            cls._LOGGER.error(
 99                "Failed while collecting the lakehouse engine stats: "
100                f"Unexpected {e=}, {type(e)=}."
101            )
102
103    @classmethod
104    def get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None:
105        """Get information from spark session configurations.
106
107        Args:
108            usage_stats: usage_stats dictionary file.
109            spark_confs: optional dictionary with the spark tags to be used when
110                collecting the engine usage.
111        """
112        spark_confs = (
113            EngineStats.DEF_SPARK_CONFS.value
114            if spark_confs is None
115            else EngineStats.DEF_SPARK_CONFS.value | spark_confs
116        )
117
118        for spark_conf_key, spark_conf_value in spark_confs.items():
119            # whenever the spark_conf_value has #, it means it is an array, so we need
120            # to split it and adequately process it
121            if "#" in spark_conf_value:
122                array_key = spark_conf_value.split("#")
123                array_values = ast.literal_eval(
124                    ExecEnv.SESSION.conf.get(array_key[0], "[]")
125                )
126                final_value = [
127                    key_val["value"]
128                    for key_val in array_values
129                    if key_val["key"] == array_key[1]
130                ]
131                usage_stats[spark_conf_key] = (
132                    final_value[0] if len(final_value) > 0 else ""
133                )
134            else:
135                usage_stats[spark_conf_key] = ExecEnv.SESSION.conf.get(
136                    spark_conf_value, ""
137                )
class EngineUsageStats:
 18class EngineUsageStats(object):
 19    """Engine Usage utilities class."""
 20
 21    _LOGGER = LoggingHandler(__name__).get_logger()
 22
 23    @classmethod
 24    def store_engine_usage(
 25        cls,
 26        acon: dict,
 27        func_name: str,
 28        collect_engine_usage: str = None,
 29        spark_confs: dict = None,
 30    ) -> None:
 31        """Collects and store Lakehouse Engine usage statistics.
 32
 33        These statistics include the acon and other relevant information, such as
 34        the lakehouse engine version and the functions/algorithms being used.
 35
 36        Args:
 37            acon: acon dictionary file.
 38            func_name: function name that called this log acon.
 39            collect_engine_usage: Lakehouse usage statistics collection strategy.
 40            spark_confs: optional dictionary with the spark confs to be used when
 41                collecting the engine usage.
 42        """
 43        try:
 44            if (
 45                collect_engine_usage
 46                in [
 47                    CollectEngineUsage.ENABLED.value,
 48                    CollectEngineUsage.PROD_ONLY.value,
 49                ]
 50                or ExecEnv.ENGINE_CONFIG.collect_engine_usage
 51                in CollectEngineUsage.ENABLED.value
 52            ):
 53                start_timestamp = datetime.now()
 54                timestamp_str = start_timestamp.strftime("%Y%m%d%H%M%S")
 55
 56                usage_stats: Dict = {"acon": ConfigUtils.remove_sensitive_info(acon)}
 57                EngineUsageStats.get_spark_conf_values(usage_stats, spark_confs)
 58
 59                engine_usage_path = None
 60                if usage_stats["environment"] == "prod":
 61                    engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_usage_path
 62                elif collect_engine_usage != CollectEngineUsage.PROD_ONLY.value:
 63                    engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_dev_usage_path
 64
 65                if engine_usage_path is not None:
 66                    usage_stats["function"] = func_name
 67                    usage_stats["engine_version"] = ConfigUtils.get_engine_version()
 68                    usage_stats["start_timestamp"] = start_timestamp
 69                    usage_stats["year"] = start_timestamp.year
 70                    usage_stats["month"] = start_timestamp.month
 71                    run_id_extracted = re.search(
 72                        "run-([1-9]\\w+)", usage_stats["run_id"]
 73                    )
 74                    usage_stats["run_id"] = (
 75                        run_id_extracted.group(1) if run_id_extracted else ""
 76                    )
 77
 78                    log_file_name = f"eng_usage_{func_name}_{timestamp_str}.json"
 79
 80                    usage_stats_str = json.dumps(usage_stats, default=str)
 81
 82                    url = urlparse(
 83                        f"{engine_usage_path}/{usage_stats['dp_name']}/"
 84                        f"{start_timestamp.year}/{start_timestamp.month}/"
 85                        f"{log_file_name}",
 86                        allow_fragments=False,
 87                    )
 88
 89                    try:
 90                        FileStorageFunctions.write_payload(
 91                            engine_usage_path, url, usage_stats_str
 92                        )
 93                        cls._LOGGER.info("Storing Lakehouse Engine usage statistics")
 94                    except FileNotFoundError as e:
 95                        cls._LOGGER.error(
 96                            f"Could not write engine stats into file: {e}."
 97                        )
 98        except Exception as e:
 99            cls._LOGGER.error(
100                "Failed while collecting the lakehouse engine stats: "
101                f"Unexpected {e=}, {type(e)=}."
102            )
103
104    @classmethod
105    def get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None:
106        """Get information from spark session configurations.
107
108        Args:
109            usage_stats: usage_stats dictionary file.
110            spark_confs: optional dictionary with the spark tags to be used when
111                collecting the engine usage.
112        """
113        spark_confs = (
114            EngineStats.DEF_SPARK_CONFS.value
115            if spark_confs is None
116            else EngineStats.DEF_SPARK_CONFS.value | spark_confs
117        )
118
119        for spark_conf_key, spark_conf_value in spark_confs.items():
120            # whenever the spark_conf_value has #, it means it is an array, so we need
121            # to split it and adequately process it
122            if "#" in spark_conf_value:
123                array_key = spark_conf_value.split("#")
124                array_values = ast.literal_eval(
125                    ExecEnv.SESSION.conf.get(array_key[0], "[]")
126                )
127                final_value = [
128                    key_val["value"]
129                    for key_val in array_values
130                    if key_val["key"] == array_key[1]
131                ]
132                usage_stats[spark_conf_key] = (
133                    final_value[0] if len(final_value) > 0 else ""
134                )
135            else:
136                usage_stats[spark_conf_key] = ExecEnv.SESSION.conf.get(
137                    spark_conf_value, ""
138                )

Engine Usage utilities class.

@classmethod
def store_engine_usage( cls, acon: dict, func_name: str, collect_engine_usage: str = None, spark_confs: dict = None) -> None:
 23    @classmethod
 24    def store_engine_usage(
 25        cls,
 26        acon: dict,
 27        func_name: str,
 28        collect_engine_usage: str = None,
 29        spark_confs: dict = None,
 30    ) -> None:
 31        """Collects and store Lakehouse Engine usage statistics.
 32
 33        These statistics include the acon and other relevant information, such as
 34        the lakehouse engine version and the functions/algorithms being used.
 35
 36        Args:
 37            acon: acon dictionary file.
 38            func_name: function name that called this log acon.
 39            collect_engine_usage: Lakehouse usage statistics collection strategy.
 40            spark_confs: optional dictionary with the spark confs to be used when
 41                collecting the engine usage.
 42        """
 43        try:
 44            if (
 45                collect_engine_usage
 46                in [
 47                    CollectEngineUsage.ENABLED.value,
 48                    CollectEngineUsage.PROD_ONLY.value,
 49                ]
 50                or ExecEnv.ENGINE_CONFIG.collect_engine_usage
 51                in CollectEngineUsage.ENABLED.value
 52            ):
 53                start_timestamp = datetime.now()
 54                timestamp_str = start_timestamp.strftime("%Y%m%d%H%M%S")
 55
 56                usage_stats: Dict = {"acon": ConfigUtils.remove_sensitive_info(acon)}
 57                EngineUsageStats.get_spark_conf_values(usage_stats, spark_confs)
 58
 59                engine_usage_path = None
 60                if usage_stats["environment"] == "prod":
 61                    engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_usage_path
 62                elif collect_engine_usage != CollectEngineUsage.PROD_ONLY.value:
 63                    engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_dev_usage_path
 64
 65                if engine_usage_path is not None:
 66                    usage_stats["function"] = func_name
 67                    usage_stats["engine_version"] = ConfigUtils.get_engine_version()
 68                    usage_stats["start_timestamp"] = start_timestamp
 69                    usage_stats["year"] = start_timestamp.year
 70                    usage_stats["month"] = start_timestamp.month
 71                    run_id_extracted = re.search(
 72                        "run-([1-9]\\w+)", usage_stats["run_id"]
 73                    )
 74                    usage_stats["run_id"] = (
 75                        run_id_extracted.group(1) if run_id_extracted else ""
 76                    )
 77
 78                    log_file_name = f"eng_usage_{func_name}_{timestamp_str}.json"
 79
 80                    usage_stats_str = json.dumps(usage_stats, default=str)
 81
 82                    url = urlparse(
 83                        f"{engine_usage_path}/{usage_stats['dp_name']}/"
 84                        f"{start_timestamp.year}/{start_timestamp.month}/"
 85                        f"{log_file_name}",
 86                        allow_fragments=False,
 87                    )
 88
 89                    try:
 90                        FileStorageFunctions.write_payload(
 91                            engine_usage_path, url, usage_stats_str
 92                        )
 93                        cls._LOGGER.info("Storing Lakehouse Engine usage statistics")
 94                    except FileNotFoundError as e:
 95                        cls._LOGGER.error(
 96                            f"Could not write engine stats into file: {e}."
 97                        )
 98        except Exception as e:
 99            cls._LOGGER.error(
100                "Failed while collecting the lakehouse engine stats: "
101                f"Unexpected {e=}, {type(e)=}."
102            )

Collects and store Lakehouse Engine usage statistics.

These statistics include the acon and other relevant information, such as the lakehouse engine version and the functions/algorithms being used.

Arguments:
  • acon: acon dictionary file.
  • func_name: function name that called this log acon.
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
@classmethod
def get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None:
104    @classmethod
105    def get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None:
106        """Get information from spark session configurations.
107
108        Args:
109            usage_stats: usage_stats dictionary file.
110            spark_confs: optional dictionary with the spark tags to be used when
111                collecting the engine usage.
112        """
113        spark_confs = (
114            EngineStats.DEF_SPARK_CONFS.value
115            if spark_confs is None
116            else EngineStats.DEF_SPARK_CONFS.value | spark_confs
117        )
118
119        for spark_conf_key, spark_conf_value in spark_confs.items():
120            # whenever the spark_conf_value has #, it means it is an array, so we need
121            # to split it and adequately process it
122            if "#" in spark_conf_value:
123                array_key = spark_conf_value.split("#")
124                array_values = ast.literal_eval(
125                    ExecEnv.SESSION.conf.get(array_key[0], "[]")
126                )
127                final_value = [
128                    key_val["value"]
129                    for key_val in array_values
130                    if key_val["key"] == array_key[1]
131                ]
132                usage_stats[spark_conf_key] = (
133                    final_value[0] if len(final_value) > 0 else ""
134                )
135            else:
136                usage_stats[spark_conf_key] = ExecEnv.SESSION.conf.get(
137                    spark_conf_value, ""
138                )

Get information from spark session configurations.

Arguments:
  • usage_stats: usage_stats dictionary file.
  • spark_confs: optional dictionary with the spark tags to be used when collecting the engine usage.