lakehouse_engine.utils.databricks_utils

Utilities for databricks operations.

 1"""Utilities for databricks operations."""
 2
 3import json
 4from typing import Any, Tuple
 5
 6from pyspark.sql import SparkSession
 7
 8
 9class DatabricksUtils(object):
10    """Databricks utilities class."""
11
12    @staticmethod
13    def get_db_utils(spark: SparkSession) -> Any:
14        """Get db utils on databricks.
15
16        Args:
17            spark: spark session.
18
19        Returns:
20            Dbutils from databricks.
21        """
22        try:
23            from pyspark.dbutils import DBUtils
24
25            if "dbutils" not in locals():
26                dbutils = DBUtils(spark)
27            else:
28                dbutils = locals().get("dbutils")
29        except ImportError:
30            import IPython
31
32            dbutils = IPython.get_ipython().user_ns["dbutils"]
33        return dbutils
34
35    @staticmethod
36    def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]:
37        """Get notebook context from running acon.
38
39        Args:
40            spark: spark session.
41
42        Returns:
43            Dict containing databricks notebook context.
44        """
45        if "local" in spark.getActiveSession().conf.get("spark.app.id"):
46            return "local", "local"
47        else:
48            dbutils = DatabricksUtils.get_db_utils(spark)
49            notebook_context = json.loads(
50                (
51                    dbutils.notebook.entry_point.getDbutils()
52                    .notebook()
53                    .getContext()
54                    .safeToJson()
55                )
56            )
57
58            return notebook_context["attributes"].get("orgId"), notebook_context[
59                "attributes"
60            ].get("jobName")
class DatabricksUtils:
10class DatabricksUtils(object):
11    """Databricks utilities class."""
12
13    @staticmethod
14    def get_db_utils(spark: SparkSession) -> Any:
15        """Get db utils on databricks.
16
17        Args:
18            spark: spark session.
19
20        Returns:
21            Dbutils from databricks.
22        """
23        try:
24            from pyspark.dbutils import DBUtils
25
26            if "dbutils" not in locals():
27                dbutils = DBUtils(spark)
28            else:
29                dbutils = locals().get("dbutils")
30        except ImportError:
31            import IPython
32
33            dbutils = IPython.get_ipython().user_ns["dbutils"]
34        return dbutils
35
36    @staticmethod
37    def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]:
38        """Get notebook context from running acon.
39
40        Args:
41            spark: spark session.
42
43        Returns:
44            Dict containing databricks notebook context.
45        """
46        if "local" in spark.getActiveSession().conf.get("spark.app.id"):
47            return "local", "local"
48        else:
49            dbutils = DatabricksUtils.get_db_utils(spark)
50            notebook_context = json.loads(
51                (
52                    dbutils.notebook.entry_point.getDbutils()
53                    .notebook()
54                    .getContext()
55                    .safeToJson()
56                )
57            )
58
59            return notebook_context["attributes"].get("orgId"), notebook_context[
60                "attributes"
61            ].get("jobName")

Databricks utilities class.

@staticmethod
def get_db_utils(spark: pyspark.sql.session.SparkSession) -> Any:
13    @staticmethod
14    def get_db_utils(spark: SparkSession) -> Any:
15        """Get db utils on databricks.
16
17        Args:
18            spark: spark session.
19
20        Returns:
21            Dbutils from databricks.
22        """
23        try:
24            from pyspark.dbutils import DBUtils
25
26            if "dbutils" not in locals():
27                dbutils = DBUtils(spark)
28            else:
29                dbutils = locals().get("dbutils")
30        except ImportError:
31            import IPython
32
33            dbutils = IPython.get_ipython().user_ns["dbutils"]
34        return dbutils

Get db utils on databricks.

Arguments:
  • spark: spark session.
Returns:

Dbutils from databricks.

@staticmethod
def get_databricks_job_information(spark: pyspark.sql.session.SparkSession) -> Tuple[str, str]:
36    @staticmethod
37    def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]:
38        """Get notebook context from running acon.
39
40        Args:
41            spark: spark session.
42
43        Returns:
44            Dict containing databricks notebook context.
45        """
46        if "local" in spark.getActiveSession().conf.get("spark.app.id"):
47            return "local", "local"
48        else:
49            dbutils = DatabricksUtils.get_db_utils(spark)
50            notebook_context = json.loads(
51                (
52                    dbutils.notebook.entry_point.getDbutils()
53                    .notebook()
54                    .getContext()
55                    .safeToJson()
56                )
57            )
58
59            return notebook_context["attributes"].get("orgId"), notebook_context[
60                "attributes"
61            ].get("jobName")

Get notebook context from running acon.

Arguments:
  • spark: spark session.
Returns:

Dict containing databricks notebook context.