lakehouse_engine.core.exec_env

Module to take care of creating a singleton of the execution environment class.

  1"""Module to take care of creating a singleton of the execution environment class."""
  2
  3import ast
  4import os
  5
  6from pyspark.sql import SparkSession
  7
  8from lakehouse_engine.core.definitions import EngineConfig
  9from lakehouse_engine.utils.configs.config_utils import ConfigUtils
 10from lakehouse_engine.utils.logging_handler import LoggingHandler
 11
 12
 13class ExecEnv(object):
 14    """Represents the basic resources regarding the engine execution environment.
 15
 16    Currently, it is used to encapsulate both the logic to get the Spark
 17    session and the engine configurations.
 18    """
 19
 20    SESSION: SparkSession
 21    _LOGGER = LoggingHandler(__name__).get_logger()
 22    DEFAULT_AWS_REGION = "eu-west-1"
 23    ENGINE_CONFIG: EngineConfig = EngineConfig(**ConfigUtils.get_config())
 24
 25    @classmethod
 26    def set_default_engine_config(cls, package: str) -> None:
 27        """Set default engine configurations by reading them from a specified package.
 28
 29        Args:
 30            package: package where the engine configurations can be found.
 31        """
 32        cls.ENGINE_CONFIG = EngineConfig(**ConfigUtils.get_config(package))
 33
 34    @classmethod
 35    def get_or_create(
 36        cls,
 37        session: SparkSession = None,
 38        enable_hive_support: bool = True,
 39        app_name: str = None,
 40        config: dict = None,
 41    ) -> None:
 42        """Get or create an execution environment session (currently Spark).
 43
 44        It instantiates a singleton session that can be accessed anywhere from the
 45        lakehouse engine. By default, if there is an existing Spark Session in
 46        the environment (getActiveSession()), this function re-uses it. It can
 47        be further extended in the future to support forcing the creation of new
 48        isolated sessions even when a Spark Session is already active.
 49
 50        Args:
 51            session: spark session.
 52            enable_hive_support: whether to enable hive support or not.
 53            app_name: application name.
 54            config: extra spark configs to supply to the spark session.
 55        """
 56        default_config = {
 57            "spark.databricks.delta.optimizeWrite.enabled": True,
 58            "spark.sql.adaptive.enabled": True,
 59            "spark.databricks.delta.merge.enableLowShuffle": True,
 60        }
 61        cls._LOGGER.info(
 62            f"Using the following default configs you may want to override them for "
 63            f"your job: {default_config}"
 64        )
 65        final_config: dict = {**default_config, **(config if config else {})}
 66        cls._LOGGER.info(f"Final config is: {final_config}")
 67
 68        if session:
 69            cls.SESSION = session
 70        elif SparkSession.getActiveSession():
 71            cls.SESSION = SparkSession.getActiveSession()
 72            for key, value in final_config.items():
 73                cls.SESSION.conf.set(key, value)
 74        else:
 75            cls._LOGGER.info("Creating a new Spark Session")
 76
 77            session_builder = SparkSession.builder.appName(app_name)
 78            for k, v in final_config.items():
 79                session_builder.config(k, v)
 80
 81            if enable_hive_support:
 82                session_builder = session_builder.enableHiveSupport()
 83            cls.SESSION = session_builder.getOrCreate()
 84
 85        if not session:
 86            cls._set_environment_variables(final_config.get("os_env_vars"))
 87
 88    @classmethod
 89    def _set_environment_variables(cls, os_env_vars: dict = None) -> None:
 90        """Set environment variables at OS level.
 91
 92        By default, we are setting the AWS_DEFAULT_REGION as we have identified this is
 93        beneficial to avoid getBucketLocation permission problems.
 94
 95        Args:
 96            os_env_vars: this parameter can be used to pass the environment variables to
 97                be defined.
 98        """
 99        if os_env_vars is None:
100            os_env_vars = {}
101
102        for env_var in os_env_vars.items():
103            os.environ[env_var[0]] = env_var[1]
104
105        if "AWS_DEFAULT_REGION" not in os_env_vars:
106            os.environ["AWS_DEFAULT_REGION"] = cls.SESSION.conf.get(
107                "spark.databricks.clusterUsageTags.region", cls.DEFAULT_AWS_REGION
108            )
109
110    @classmethod
111    def get_environment(cls) -> str:
112        """Get the environment where the process is running.
113
114        Returns:
115            Name of the environment.
116        """
117        tag_array = ast.literal_eval(
118            cls.SESSION.conf.get(
119                "spark.databricks.clusterUsageTags.clusterAllTags", "[]"
120            )
121        )
122
123        for key_val in tag_array:
124            if key_val["key"] == "environment":
125                return str(key_val["value"])
126        return "prod"
class ExecEnv:
 14class ExecEnv(object):
 15    """Represents the basic resources regarding the engine execution environment.
 16
 17    Currently, it is used to encapsulate both the logic to get the Spark
 18    session and the engine configurations.
 19    """
 20
 21    SESSION: SparkSession
 22    _LOGGER = LoggingHandler(__name__).get_logger()
 23    DEFAULT_AWS_REGION = "eu-west-1"
 24    ENGINE_CONFIG: EngineConfig = EngineConfig(**ConfigUtils.get_config())
 25
 26    @classmethod
 27    def set_default_engine_config(cls, package: str) -> None:
 28        """Set default engine configurations by reading them from a specified package.
 29
 30        Args:
 31            package: package where the engine configurations can be found.
 32        """
 33        cls.ENGINE_CONFIG = EngineConfig(**ConfigUtils.get_config(package))
 34
 35    @classmethod
 36    def get_or_create(
 37        cls,
 38        session: SparkSession = None,
 39        enable_hive_support: bool = True,
 40        app_name: str = None,
 41        config: dict = None,
 42    ) -> None:
 43        """Get or create an execution environment session (currently Spark).
 44
 45        It instantiates a singleton session that can be accessed anywhere from the
 46        lakehouse engine. By default, if there is an existing Spark Session in
 47        the environment (getActiveSession()), this function re-uses it. It can
 48        be further extended in the future to support forcing the creation of new
 49        isolated sessions even when a Spark Session is already active.
 50
 51        Args:
 52            session: spark session.
 53            enable_hive_support: whether to enable hive support or not.
 54            app_name: application name.
 55            config: extra spark configs to supply to the spark session.
 56        """
 57        default_config = {
 58            "spark.databricks.delta.optimizeWrite.enabled": True,
 59            "spark.sql.adaptive.enabled": True,
 60            "spark.databricks.delta.merge.enableLowShuffle": True,
 61        }
 62        cls._LOGGER.info(
 63            f"Using the following default configs you may want to override them for "
 64            f"your job: {default_config}"
 65        )
 66        final_config: dict = {**default_config, **(config if config else {})}
 67        cls._LOGGER.info(f"Final config is: {final_config}")
 68
 69        if session:
 70            cls.SESSION = session
 71        elif SparkSession.getActiveSession():
 72            cls.SESSION = SparkSession.getActiveSession()
 73            for key, value in final_config.items():
 74                cls.SESSION.conf.set(key, value)
 75        else:
 76            cls._LOGGER.info("Creating a new Spark Session")
 77
 78            session_builder = SparkSession.builder.appName(app_name)
 79            for k, v in final_config.items():
 80                session_builder.config(k, v)
 81
 82            if enable_hive_support:
 83                session_builder = session_builder.enableHiveSupport()
 84            cls.SESSION = session_builder.getOrCreate()
 85
 86        if not session:
 87            cls._set_environment_variables(final_config.get("os_env_vars"))
 88
 89    @classmethod
 90    def _set_environment_variables(cls, os_env_vars: dict = None) -> None:
 91        """Set environment variables at OS level.
 92
 93        By default, we are setting the AWS_DEFAULT_REGION as we have identified this is
 94        beneficial to avoid getBucketLocation permission problems.
 95
 96        Args:
 97            os_env_vars: this parameter can be used to pass the environment variables to
 98                be defined.
 99        """
100        if os_env_vars is None:
101            os_env_vars = {}
102
103        for env_var in os_env_vars.items():
104            os.environ[env_var[0]] = env_var[1]
105
106        if "AWS_DEFAULT_REGION" not in os_env_vars:
107            os.environ["AWS_DEFAULT_REGION"] = cls.SESSION.conf.get(
108                "spark.databricks.clusterUsageTags.region", cls.DEFAULT_AWS_REGION
109            )
110
111    @classmethod
112    def get_environment(cls) -> str:
113        """Get the environment where the process is running.
114
115        Returns:
116            Name of the environment.
117        """
118        tag_array = ast.literal_eval(
119            cls.SESSION.conf.get(
120                "spark.databricks.clusterUsageTags.clusterAllTags", "[]"
121            )
122        )
123
124        for key_val in tag_array:
125            if key_val["key"] == "environment":
126                return str(key_val["value"])
127        return "prod"

Represents the basic resources regarding the engine execution environment.

Currently, it is used to encapsulate both the logic to get the Spark session and the engine configurations.

SESSION: pyspark.sql.session.SparkSession
DEFAULT_AWS_REGION = 'eu-west-1'
ENGINE_CONFIG: lakehouse_engine.core.definitions.EngineConfig = EngineConfig(dq_bucket='s3://sample-bucket', dq_dev_bucket=None, notif_disallowed_email_servers=None, engine_usage_path=None, engine_dev_usage_path=None, collect_engine_usage='enabled', dq_functions_column_list=['dq_rule_id', 'execution_point', 'filters', 'schema', 'table', 'column', 'dimension'])
@classmethod
def set_default_engine_config(cls, package: str) -> None:
26    @classmethod
27    def set_default_engine_config(cls, package: str) -> None:
28        """Set default engine configurations by reading them from a specified package.
29
30        Args:
31            package: package where the engine configurations can be found.
32        """
33        cls.ENGINE_CONFIG = EngineConfig(**ConfigUtils.get_config(package))

Set default engine configurations by reading them from a specified package.

Arguments:
  • package: package where the engine configurations can be found.
@classmethod
def get_or_create( cls, session: pyspark.sql.session.SparkSession = None, enable_hive_support: bool = True, app_name: str = None, config: dict = None) -> None:
35    @classmethod
36    def get_or_create(
37        cls,
38        session: SparkSession = None,
39        enable_hive_support: bool = True,
40        app_name: str = None,
41        config: dict = None,
42    ) -> None:
43        """Get or create an execution environment session (currently Spark).
44
45        It instantiates a singleton session that can be accessed anywhere from the
46        lakehouse engine. By default, if there is an existing Spark Session in
47        the environment (getActiveSession()), this function re-uses it. It can
48        be further extended in the future to support forcing the creation of new
49        isolated sessions even when a Spark Session is already active.
50
51        Args:
52            session: spark session.
53            enable_hive_support: whether to enable hive support or not.
54            app_name: application name.
55            config: extra spark configs to supply to the spark session.
56        """
57        default_config = {
58            "spark.databricks.delta.optimizeWrite.enabled": True,
59            "spark.sql.adaptive.enabled": True,
60            "spark.databricks.delta.merge.enableLowShuffle": True,
61        }
62        cls._LOGGER.info(
63            f"Using the following default configs you may want to override them for "
64            f"your job: {default_config}"
65        )
66        final_config: dict = {**default_config, **(config if config else {})}
67        cls._LOGGER.info(f"Final config is: {final_config}")
68
69        if session:
70            cls.SESSION = session
71        elif SparkSession.getActiveSession():
72            cls.SESSION = SparkSession.getActiveSession()
73            for key, value in final_config.items():
74                cls.SESSION.conf.set(key, value)
75        else:
76            cls._LOGGER.info("Creating a new Spark Session")
77
78            session_builder = SparkSession.builder.appName(app_name)
79            for k, v in final_config.items():
80                session_builder.config(k, v)
81
82            if enable_hive_support:
83                session_builder = session_builder.enableHiveSupport()
84            cls.SESSION = session_builder.getOrCreate()
85
86        if not session:
87            cls._set_environment_variables(final_config.get("os_env_vars"))

Get or create an execution environment session (currently Spark).

It instantiates a singleton session that can be accessed anywhere from the lakehouse engine. By default, if there is an existing Spark Session in the environment (getActiveSession()), this function re-uses it. It can be further extended in the future to support forcing the creation of new isolated sessions even when a Spark Session is already active.

Arguments:
  • session: spark session.
  • enable_hive_support: whether to enable hive support or not.
  • app_name: application name.
  • config: extra spark configs to supply to the spark session.
@classmethod
def get_environment(cls) -> str:
111    @classmethod
112    def get_environment(cls) -> str:
113        """Get the environment where the process is running.
114
115        Returns:
116            Name of the environment.
117        """
118        tag_array = ast.literal_eval(
119            cls.SESSION.conf.get(
120                "spark.databricks.clusterUsageTags.clusterAllTags", "[]"
121            )
122        )
123
124        for key_val in tag_array:
125            if key_val["key"] == "environment":
126                return str(key_val["value"])
127        return "prod"

Get the environment where the process is running.

Returns:

Name of the environment.