lakehouse_engine.utils.engine_usage_stats
Utilities for recording the engine activity.
1"""Utilities for recording the engine activity.""" 2 3import ast 4import json 5import re 6from datetime import datetime 7from typing import Dict 8from urllib.parse import urlparse 9 10from lakehouse_engine.core.definitions import CollectEngineUsage, EngineStats 11from lakehouse_engine.core.exec_env import ExecEnv 12from lakehouse_engine.utils.configs.config_utils import ConfigUtils 13from lakehouse_engine.utils.logging_handler import LoggingHandler 14from lakehouse_engine.utils.storage.file_storage_functions import FileStorageFunctions 15 16 17class EngineUsageStats(object): 18 """Engine Usage utilities class.""" 19 20 _LOGGER = LoggingHandler(__name__).get_logger() 21 22 @classmethod 23 def store_engine_usage( 24 cls, 25 acon: dict, 26 func_name: str, 27 collect_engine_usage: str = None, 28 spark_confs: dict = None, 29 ) -> None: 30 """Collects and store Lakehouse Engine usage statistics. 31 32 These statistics include the acon and other relevant information, such as 33 the lakehouse engine version and the functions/algorithms being used. 34 35 Args: 36 acon: acon dictionary file. 37 func_name: function name that called this log acon. 38 collect_engine_usage: Lakehouse usage statistics collection strategy. 39 spark_confs: optional dictionary with the spark confs to be used when 40 collecting the engine usage. 41 """ 42 try: 43 if ( 44 collect_engine_usage 45 in [ 46 CollectEngineUsage.ENABLED.value, 47 CollectEngineUsage.PROD_ONLY.value, 48 ] 49 or ExecEnv.ENGINE_CONFIG.collect_engine_usage 50 in CollectEngineUsage.ENABLED.value 51 ): 52 start_timestamp = datetime.now() 53 timestamp_str = start_timestamp.strftime("%Y%m%d%H%M%S") 54 55 usage_stats: Dict = {"acon": ConfigUtils.remove_sensitive_info(acon)} 56 EngineUsageStats.get_spark_conf_values(usage_stats, spark_confs) 57 58 engine_usage_path = None 59 if usage_stats["environment"] == "prod": 60 engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_usage_path 61 elif collect_engine_usage != CollectEngineUsage.PROD_ONLY.value: 62 engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_dev_usage_path 63 64 if engine_usage_path is not None: 65 usage_stats["function"] = func_name 66 usage_stats["engine_version"] = ConfigUtils.get_engine_version() 67 usage_stats["start_timestamp"] = start_timestamp 68 usage_stats["year"] = start_timestamp.year 69 usage_stats["month"] = start_timestamp.month 70 run_id_extracted = re.search( 71 "run-([1-9]\\w+)", usage_stats["run_id"] 72 ) 73 usage_stats["run_id"] = ( 74 run_id_extracted.group(1) if run_id_extracted else "" 75 ) 76 77 log_file_name = f"eng_usage_{func_name}_{timestamp_str}.json" 78 79 usage_stats_str = json.dumps(usage_stats, default=str) 80 81 url = urlparse( 82 f"{engine_usage_path}/{usage_stats['dp_name']}/" 83 f"{start_timestamp.year}/{start_timestamp.month}/" 84 f"{log_file_name}", 85 allow_fragments=False, 86 ) 87 88 try: 89 FileStorageFunctions.write_payload( 90 engine_usage_path, url, usage_stats_str 91 ) 92 cls._LOGGER.info("Storing Lakehouse Engine usage statistics") 93 except FileNotFoundError as e: 94 cls._LOGGER.error( 95 f"Could not write engine stats into file: {e}." 96 ) 97 except Exception as e: 98 cls._LOGGER.error( 99 "Failed while collecting the lakehouse engine stats: " 100 f"Unexpected {e=}, {type(e)=}." 101 ) 102 103 @classmethod 104 def get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None: 105 """Get information from spark session configurations. 106 107 Args: 108 usage_stats: usage_stats dictionary file. 109 spark_confs: optional dictionary with the spark tags to be used when 110 collecting the engine usage. 111 """ 112 spark_confs = ( 113 EngineStats.DEF_SPARK_CONFS.value 114 if spark_confs is None 115 else EngineStats.DEF_SPARK_CONFS.value | spark_confs 116 ) 117 118 for spark_conf_key, spark_conf_value in spark_confs.items(): 119 # whenever the spark_conf_value has #, it means it is an array, so we need 120 # to split it and adequately process it 121 if "#" in spark_conf_value: 122 array_key = spark_conf_value.split("#") 123 array_values = ast.literal_eval( 124 ExecEnv.SESSION.conf.get(array_key[0], "[]") 125 ) 126 final_value = [ 127 key_val["value"] 128 for key_val in array_values 129 if key_val["key"] == array_key[1] 130 ] 131 usage_stats[spark_conf_key] = ( 132 final_value[0] if len(final_value) > 0 else "" 133 ) 134 else: 135 usage_stats[spark_conf_key] = ExecEnv.SESSION.conf.get( 136 spark_conf_value, "" 137 )
class
EngineUsageStats:
18class EngineUsageStats(object): 19 """Engine Usage utilities class.""" 20 21 _LOGGER = LoggingHandler(__name__).get_logger() 22 23 @classmethod 24 def store_engine_usage( 25 cls, 26 acon: dict, 27 func_name: str, 28 collect_engine_usage: str = None, 29 spark_confs: dict = None, 30 ) -> None: 31 """Collects and store Lakehouse Engine usage statistics. 32 33 These statistics include the acon and other relevant information, such as 34 the lakehouse engine version and the functions/algorithms being used. 35 36 Args: 37 acon: acon dictionary file. 38 func_name: function name that called this log acon. 39 collect_engine_usage: Lakehouse usage statistics collection strategy. 40 spark_confs: optional dictionary with the spark confs to be used when 41 collecting the engine usage. 42 """ 43 try: 44 if ( 45 collect_engine_usage 46 in [ 47 CollectEngineUsage.ENABLED.value, 48 CollectEngineUsage.PROD_ONLY.value, 49 ] 50 or ExecEnv.ENGINE_CONFIG.collect_engine_usage 51 in CollectEngineUsage.ENABLED.value 52 ): 53 start_timestamp = datetime.now() 54 timestamp_str = start_timestamp.strftime("%Y%m%d%H%M%S") 55 56 usage_stats: Dict = {"acon": ConfigUtils.remove_sensitive_info(acon)} 57 EngineUsageStats.get_spark_conf_values(usage_stats, spark_confs) 58 59 engine_usage_path = None 60 if usage_stats["environment"] == "prod": 61 engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_usage_path 62 elif collect_engine_usage != CollectEngineUsage.PROD_ONLY.value: 63 engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_dev_usage_path 64 65 if engine_usage_path is not None: 66 usage_stats["function"] = func_name 67 usage_stats["engine_version"] = ConfigUtils.get_engine_version() 68 usage_stats["start_timestamp"] = start_timestamp 69 usage_stats["year"] = start_timestamp.year 70 usage_stats["month"] = start_timestamp.month 71 run_id_extracted = re.search( 72 "run-([1-9]\\w+)", usage_stats["run_id"] 73 ) 74 usage_stats["run_id"] = ( 75 run_id_extracted.group(1) if run_id_extracted else "" 76 ) 77 78 log_file_name = f"eng_usage_{func_name}_{timestamp_str}.json" 79 80 usage_stats_str = json.dumps(usage_stats, default=str) 81 82 url = urlparse( 83 f"{engine_usage_path}/{usage_stats['dp_name']}/" 84 f"{start_timestamp.year}/{start_timestamp.month}/" 85 f"{log_file_name}", 86 allow_fragments=False, 87 ) 88 89 try: 90 FileStorageFunctions.write_payload( 91 engine_usage_path, url, usage_stats_str 92 ) 93 cls._LOGGER.info("Storing Lakehouse Engine usage statistics") 94 except FileNotFoundError as e: 95 cls._LOGGER.error( 96 f"Could not write engine stats into file: {e}." 97 ) 98 except Exception as e: 99 cls._LOGGER.error( 100 "Failed while collecting the lakehouse engine stats: " 101 f"Unexpected {e=}, {type(e)=}." 102 ) 103 104 @classmethod 105 def get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None: 106 """Get information from spark session configurations. 107 108 Args: 109 usage_stats: usage_stats dictionary file. 110 spark_confs: optional dictionary with the spark tags to be used when 111 collecting the engine usage. 112 """ 113 spark_confs = ( 114 EngineStats.DEF_SPARK_CONFS.value 115 if spark_confs is None 116 else EngineStats.DEF_SPARK_CONFS.value | spark_confs 117 ) 118 119 for spark_conf_key, spark_conf_value in spark_confs.items(): 120 # whenever the spark_conf_value has #, it means it is an array, so we need 121 # to split it and adequately process it 122 if "#" in spark_conf_value: 123 array_key = spark_conf_value.split("#") 124 array_values = ast.literal_eval( 125 ExecEnv.SESSION.conf.get(array_key[0], "[]") 126 ) 127 final_value = [ 128 key_val["value"] 129 for key_val in array_values 130 if key_val["key"] == array_key[1] 131 ] 132 usage_stats[spark_conf_key] = ( 133 final_value[0] if len(final_value) > 0 else "" 134 ) 135 else: 136 usage_stats[spark_conf_key] = ExecEnv.SESSION.conf.get( 137 spark_conf_value, "" 138 )
Engine Usage utilities class.
@classmethod
def
store_engine_usage( cls, acon: dict, func_name: str, collect_engine_usage: str = None, spark_confs: dict = None) -> None:
23 @classmethod 24 def store_engine_usage( 25 cls, 26 acon: dict, 27 func_name: str, 28 collect_engine_usage: str = None, 29 spark_confs: dict = None, 30 ) -> None: 31 """Collects and store Lakehouse Engine usage statistics. 32 33 These statistics include the acon and other relevant information, such as 34 the lakehouse engine version and the functions/algorithms being used. 35 36 Args: 37 acon: acon dictionary file. 38 func_name: function name that called this log acon. 39 collect_engine_usage: Lakehouse usage statistics collection strategy. 40 spark_confs: optional dictionary with the spark confs to be used when 41 collecting the engine usage. 42 """ 43 try: 44 if ( 45 collect_engine_usage 46 in [ 47 CollectEngineUsage.ENABLED.value, 48 CollectEngineUsage.PROD_ONLY.value, 49 ] 50 or ExecEnv.ENGINE_CONFIG.collect_engine_usage 51 in CollectEngineUsage.ENABLED.value 52 ): 53 start_timestamp = datetime.now() 54 timestamp_str = start_timestamp.strftime("%Y%m%d%H%M%S") 55 56 usage_stats: Dict = {"acon": ConfigUtils.remove_sensitive_info(acon)} 57 EngineUsageStats.get_spark_conf_values(usage_stats, spark_confs) 58 59 engine_usage_path = None 60 if usage_stats["environment"] == "prod": 61 engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_usage_path 62 elif collect_engine_usage != CollectEngineUsage.PROD_ONLY.value: 63 engine_usage_path = ExecEnv.ENGINE_CONFIG.engine_dev_usage_path 64 65 if engine_usage_path is not None: 66 usage_stats["function"] = func_name 67 usage_stats["engine_version"] = ConfigUtils.get_engine_version() 68 usage_stats["start_timestamp"] = start_timestamp 69 usage_stats["year"] = start_timestamp.year 70 usage_stats["month"] = start_timestamp.month 71 run_id_extracted = re.search( 72 "run-([1-9]\\w+)", usage_stats["run_id"] 73 ) 74 usage_stats["run_id"] = ( 75 run_id_extracted.group(1) if run_id_extracted else "" 76 ) 77 78 log_file_name = f"eng_usage_{func_name}_{timestamp_str}.json" 79 80 usage_stats_str = json.dumps(usage_stats, default=str) 81 82 url = urlparse( 83 f"{engine_usage_path}/{usage_stats['dp_name']}/" 84 f"{start_timestamp.year}/{start_timestamp.month}/" 85 f"{log_file_name}", 86 allow_fragments=False, 87 ) 88 89 try: 90 FileStorageFunctions.write_payload( 91 engine_usage_path, url, usage_stats_str 92 ) 93 cls._LOGGER.info("Storing Lakehouse Engine usage statistics") 94 except FileNotFoundError as e: 95 cls._LOGGER.error( 96 f"Could not write engine stats into file: {e}." 97 ) 98 except Exception as e: 99 cls._LOGGER.error( 100 "Failed while collecting the lakehouse engine stats: " 101 f"Unexpected {e=}, {type(e)=}." 102 )
Collects and store Lakehouse Engine usage statistics.
These statistics include the acon and other relevant information, such as the lakehouse engine version and the functions/algorithms being used.
Arguments:
- acon: acon dictionary file.
- func_name: function name that called this log acon.
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
@classmethod
def
get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None:
104 @classmethod 105 def get_spark_conf_values(cls, usage_stats: dict, spark_confs: dict) -> None: 106 """Get information from spark session configurations. 107 108 Args: 109 usage_stats: usage_stats dictionary file. 110 spark_confs: optional dictionary with the spark tags to be used when 111 collecting the engine usage. 112 """ 113 spark_confs = ( 114 EngineStats.DEF_SPARK_CONFS.value 115 if spark_confs is None 116 else EngineStats.DEF_SPARK_CONFS.value | spark_confs 117 ) 118 119 for spark_conf_key, spark_conf_value in spark_confs.items(): 120 # whenever the spark_conf_value has #, it means it is an array, so we need 121 # to split it and adequately process it 122 if "#" in spark_conf_value: 123 array_key = spark_conf_value.split("#") 124 array_values = ast.literal_eval( 125 ExecEnv.SESSION.conf.get(array_key[0], "[]") 126 ) 127 final_value = [ 128 key_val["value"] 129 for key_val in array_values 130 if key_val["key"] == array_key[1] 131 ] 132 usage_stats[spark_conf_key] = ( 133 final_value[0] if len(final_value) > 0 else "" 134 ) 135 else: 136 usage_stats[spark_conf_key] = ExecEnv.SESSION.conf.get( 137 spark_conf_value, "" 138 )
Get information from spark session configurations.
Arguments:
- usage_stats: usage_stats dictionary file.
- spark_confs: optional dictionary with the spark tags to be used when collecting the engine usage.