lakehouse_engine.core.exec_env
Module to take care of creating a singleton of the execution environment class.
1"""Module to take care of creating a singleton of the execution environment class.""" 2 3import ast 4import os 5 6from pyspark.sql import SparkSession 7 8from lakehouse_engine.core.definitions import EngineConfig 9from lakehouse_engine.utils.configs.config_utils import ConfigUtils 10from lakehouse_engine.utils.logging_handler import LoggingHandler 11 12 13class ExecEnv(object): 14 """Represents the basic resources regarding the engine execution environment. 15 16 Currently, it is used to encapsulate both the logic to get the Spark 17 session and the engine configurations. 18 """ 19 20 SESSION: SparkSession 21 _LOGGER = LoggingHandler(__name__).get_logger() 22 DEFAULT_AWS_REGION = "eu-west-1" 23 ENGINE_CONFIG: EngineConfig = EngineConfig(**ConfigUtils.get_config()) 24 25 @classmethod 26 def set_default_engine_config(cls, package: str) -> None: 27 """Set default engine configurations by reading them from a specified package. 28 29 Args: 30 package: package where the engine configurations can be found. 31 """ 32 cls.ENGINE_CONFIG = EngineConfig(**ConfigUtils.get_config(package)) 33 34 @classmethod 35 def get_or_create( 36 cls, 37 session: SparkSession = None, 38 enable_hive_support: bool = True, 39 app_name: str = None, 40 config: dict = None, 41 ) -> None: 42 """Get or create an execution environment session (currently Spark). 43 44 It instantiates a singleton session that can be accessed anywhere from the 45 lakehouse engine. By default, if there is an existing Spark Session in 46 the environment (getActiveSession()), this function re-uses it. It can 47 be further extended in the future to support forcing the creation of new 48 isolated sessions even when a Spark Session is already active. 49 50 Args: 51 session: spark session. 52 enable_hive_support: whether to enable hive support or not. 53 app_name: application name. 54 config: extra spark configs to supply to the spark session. 55 """ 56 default_config = { 57 "spark.databricks.delta.optimizeWrite.enabled": True, 58 "spark.sql.adaptive.enabled": True, 59 "spark.databricks.delta.merge.enableLowShuffle": True, 60 } 61 cls._LOGGER.info( 62 f"Using the following default configs you may want to override them for " 63 f"your job: {default_config}" 64 ) 65 final_config: dict = {**default_config, **(config if config else {})} 66 cls._LOGGER.info(f"Final config is: {final_config}") 67 68 if session: 69 cls.SESSION = session 70 elif SparkSession.getActiveSession(): 71 cls.SESSION = SparkSession.getActiveSession() 72 for key, value in final_config.items(): 73 cls.SESSION.conf.set(key, value) 74 else: 75 cls._LOGGER.info("Creating a new Spark Session") 76 77 session_builder = SparkSession.builder.appName(app_name) 78 for k, v in final_config.items(): 79 session_builder.config(k, v) 80 81 if enable_hive_support: 82 session_builder = session_builder.enableHiveSupport() 83 cls.SESSION = session_builder.getOrCreate() 84 85 if not session: 86 cls._set_environment_variables(final_config.get("os_env_vars")) 87 88 @classmethod 89 def _set_environment_variables(cls, os_env_vars: dict = None) -> None: 90 """Set environment variables at OS level. 91 92 By default, we are setting the AWS_DEFAULT_REGION as we have identified this is 93 beneficial to avoid getBucketLocation permission problems. 94 95 Args: 96 os_env_vars: this parameter can be used to pass the environment variables to 97 be defined. 98 """ 99 if os_env_vars is None: 100 os_env_vars = {} 101 102 for env_var in os_env_vars.items(): 103 os.environ[env_var[0]] = env_var[1] 104 105 if "AWS_DEFAULT_REGION" not in os_env_vars: 106 os.environ["AWS_DEFAULT_REGION"] = cls.SESSION.conf.get( 107 "spark.databricks.clusterUsageTags.region", cls.DEFAULT_AWS_REGION 108 ) 109 110 @classmethod 111 def get_environment(cls) -> str: 112 """Get the environment where the process is running. 113 114 Returns: 115 Name of the environment. 116 """ 117 tag_array = ast.literal_eval( 118 cls.SESSION.conf.get( 119 "spark.databricks.clusterUsageTags.clusterAllTags", "[]" 120 ) 121 ) 122 123 for key_val in tag_array: 124 if key_val["key"] == "environment": 125 return str(key_val["value"]) 126 return "prod"
14class ExecEnv(object): 15 """Represents the basic resources regarding the engine execution environment. 16 17 Currently, it is used to encapsulate both the logic to get the Spark 18 session and the engine configurations. 19 """ 20 21 SESSION: SparkSession 22 _LOGGER = LoggingHandler(__name__).get_logger() 23 DEFAULT_AWS_REGION = "eu-west-1" 24 ENGINE_CONFIG: EngineConfig = EngineConfig(**ConfigUtils.get_config()) 25 26 @classmethod 27 def set_default_engine_config(cls, package: str) -> None: 28 """Set default engine configurations by reading them from a specified package. 29 30 Args: 31 package: package where the engine configurations can be found. 32 """ 33 cls.ENGINE_CONFIG = EngineConfig(**ConfigUtils.get_config(package)) 34 35 @classmethod 36 def get_or_create( 37 cls, 38 session: SparkSession = None, 39 enable_hive_support: bool = True, 40 app_name: str = None, 41 config: dict = None, 42 ) -> None: 43 """Get or create an execution environment session (currently Spark). 44 45 It instantiates a singleton session that can be accessed anywhere from the 46 lakehouse engine. By default, if there is an existing Spark Session in 47 the environment (getActiveSession()), this function re-uses it. It can 48 be further extended in the future to support forcing the creation of new 49 isolated sessions even when a Spark Session is already active. 50 51 Args: 52 session: spark session. 53 enable_hive_support: whether to enable hive support or not. 54 app_name: application name. 55 config: extra spark configs to supply to the spark session. 56 """ 57 default_config = { 58 "spark.databricks.delta.optimizeWrite.enabled": True, 59 "spark.sql.adaptive.enabled": True, 60 "spark.databricks.delta.merge.enableLowShuffle": True, 61 } 62 cls._LOGGER.info( 63 f"Using the following default configs you may want to override them for " 64 f"your job: {default_config}" 65 ) 66 final_config: dict = {**default_config, **(config if config else {})} 67 cls._LOGGER.info(f"Final config is: {final_config}") 68 69 if session: 70 cls.SESSION = session 71 elif SparkSession.getActiveSession(): 72 cls.SESSION = SparkSession.getActiveSession() 73 for key, value in final_config.items(): 74 cls.SESSION.conf.set(key, value) 75 else: 76 cls._LOGGER.info("Creating a new Spark Session") 77 78 session_builder = SparkSession.builder.appName(app_name) 79 for k, v in final_config.items(): 80 session_builder.config(k, v) 81 82 if enable_hive_support: 83 session_builder = session_builder.enableHiveSupport() 84 cls.SESSION = session_builder.getOrCreate() 85 86 if not session: 87 cls._set_environment_variables(final_config.get("os_env_vars")) 88 89 @classmethod 90 def _set_environment_variables(cls, os_env_vars: dict = None) -> None: 91 """Set environment variables at OS level. 92 93 By default, we are setting the AWS_DEFAULT_REGION as we have identified this is 94 beneficial to avoid getBucketLocation permission problems. 95 96 Args: 97 os_env_vars: this parameter can be used to pass the environment variables to 98 be defined. 99 """ 100 if os_env_vars is None: 101 os_env_vars = {} 102 103 for env_var in os_env_vars.items(): 104 os.environ[env_var[0]] = env_var[1] 105 106 if "AWS_DEFAULT_REGION" not in os_env_vars: 107 os.environ["AWS_DEFAULT_REGION"] = cls.SESSION.conf.get( 108 "spark.databricks.clusterUsageTags.region", cls.DEFAULT_AWS_REGION 109 ) 110 111 @classmethod 112 def get_environment(cls) -> str: 113 """Get the environment where the process is running. 114 115 Returns: 116 Name of the environment. 117 """ 118 tag_array = ast.literal_eval( 119 cls.SESSION.conf.get( 120 "spark.databricks.clusterUsageTags.clusterAllTags", "[]" 121 ) 122 ) 123 124 for key_val in tag_array: 125 if key_val["key"] == "environment": 126 return str(key_val["value"]) 127 return "prod"
Represents the basic resources regarding the engine execution environment.
Currently, it is used to encapsulate both the logic to get the Spark session and the engine configurations.
26 @classmethod 27 def set_default_engine_config(cls, package: str) -> None: 28 """Set default engine configurations by reading them from a specified package. 29 30 Args: 31 package: package where the engine configurations can be found. 32 """ 33 cls.ENGINE_CONFIG = EngineConfig(**ConfigUtils.get_config(package))
Set default engine configurations by reading them from a specified package.
Arguments:
- package: package where the engine configurations can be found.
35 @classmethod 36 def get_or_create( 37 cls, 38 session: SparkSession = None, 39 enable_hive_support: bool = True, 40 app_name: str = None, 41 config: dict = None, 42 ) -> None: 43 """Get or create an execution environment session (currently Spark). 44 45 It instantiates a singleton session that can be accessed anywhere from the 46 lakehouse engine. By default, if there is an existing Spark Session in 47 the environment (getActiveSession()), this function re-uses it. It can 48 be further extended in the future to support forcing the creation of new 49 isolated sessions even when a Spark Session is already active. 50 51 Args: 52 session: spark session. 53 enable_hive_support: whether to enable hive support or not. 54 app_name: application name. 55 config: extra spark configs to supply to the spark session. 56 """ 57 default_config = { 58 "spark.databricks.delta.optimizeWrite.enabled": True, 59 "spark.sql.adaptive.enabled": True, 60 "spark.databricks.delta.merge.enableLowShuffle": True, 61 } 62 cls._LOGGER.info( 63 f"Using the following default configs you may want to override them for " 64 f"your job: {default_config}" 65 ) 66 final_config: dict = {**default_config, **(config if config else {})} 67 cls._LOGGER.info(f"Final config is: {final_config}") 68 69 if session: 70 cls.SESSION = session 71 elif SparkSession.getActiveSession(): 72 cls.SESSION = SparkSession.getActiveSession() 73 for key, value in final_config.items(): 74 cls.SESSION.conf.set(key, value) 75 else: 76 cls._LOGGER.info("Creating a new Spark Session") 77 78 session_builder = SparkSession.builder.appName(app_name) 79 for k, v in final_config.items(): 80 session_builder.config(k, v) 81 82 if enable_hive_support: 83 session_builder = session_builder.enableHiveSupport() 84 cls.SESSION = session_builder.getOrCreate() 85 86 if not session: 87 cls._set_environment_variables(final_config.get("os_env_vars"))
Get or create an execution environment session (currently Spark).
It instantiates a singleton session that can be accessed anywhere from the lakehouse engine. By default, if there is an existing Spark Session in the environment (getActiveSession()), this function re-uses it. It can be further extended in the future to support forcing the creation of new isolated sessions even when a Spark Session is already active.
Arguments:
- session: spark session.
- enable_hive_support: whether to enable hive support or not.
- app_name: application name.
- config: extra spark configs to supply to the spark session.
111 @classmethod 112 def get_environment(cls) -> str: 113 """Get the environment where the process is running. 114 115 Returns: 116 Name of the environment. 117 """ 118 tag_array = ast.literal_eval( 119 cls.SESSION.conf.get( 120 "spark.databricks.clusterUsageTags.clusterAllTags", "[]" 121 ) 122 ) 123 124 for key_val in tag_array: 125 if key_val["key"] == "environment": 126 return str(key_val["value"]) 127 return "prod"
Get the environment where the process is running.
Returns:
Name of the environment.