lakehouse_engine.utils.databricks_utils
Utilities for databricks operations.
1"""Utilities for databricks operations.""" 2 3import json 4from typing import Any, Tuple 5 6from pyspark.sql import SparkSession 7 8 9class DatabricksUtils(object): 10 """Databricks utilities class.""" 11 12 @staticmethod 13 def get_db_utils(spark: SparkSession) -> Any: 14 """Get db utils on databricks. 15 16 Args: 17 spark: spark session. 18 19 Returns: 20 Dbutils from databricks. 21 """ 22 try: 23 from pyspark.dbutils import DBUtils 24 25 if "dbutils" not in locals(): 26 dbutils = DBUtils(spark) 27 else: 28 dbutils = locals().get("dbutils") 29 except ImportError: 30 import IPython 31 32 dbutils = IPython.get_ipython().user_ns["dbutils"] 33 return dbutils 34 35 @staticmethod 36 def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]: 37 """Get notebook context from running acon. 38 39 Args: 40 spark: spark session. 41 42 Returns: 43 Dict containing databricks notebook context. 44 """ 45 if "local" in spark.getActiveSession().conf.get("spark.app.id"): 46 return "local", "local" 47 else: 48 dbutils = DatabricksUtils.get_db_utils(spark) 49 notebook_context = json.loads( 50 ( 51 dbutils.notebook.entry_point.getDbutils() 52 .notebook() 53 .getContext() 54 .safeToJson() 55 ) 56 ) 57 58 return notebook_context["attributes"].get("orgId"), notebook_context[ 59 "attributes" 60 ].get("jobName")
class
DatabricksUtils:
10class DatabricksUtils(object): 11 """Databricks utilities class.""" 12 13 @staticmethod 14 def get_db_utils(spark: SparkSession) -> Any: 15 """Get db utils on databricks. 16 17 Args: 18 spark: spark session. 19 20 Returns: 21 Dbutils from databricks. 22 """ 23 try: 24 from pyspark.dbutils import DBUtils 25 26 if "dbutils" not in locals(): 27 dbutils = DBUtils(spark) 28 else: 29 dbutils = locals().get("dbutils") 30 except ImportError: 31 import IPython 32 33 dbutils = IPython.get_ipython().user_ns["dbutils"] 34 return dbutils 35 36 @staticmethod 37 def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]: 38 """Get notebook context from running acon. 39 40 Args: 41 spark: spark session. 42 43 Returns: 44 Dict containing databricks notebook context. 45 """ 46 if "local" in spark.getActiveSession().conf.get("spark.app.id"): 47 return "local", "local" 48 else: 49 dbutils = DatabricksUtils.get_db_utils(spark) 50 notebook_context = json.loads( 51 ( 52 dbutils.notebook.entry_point.getDbutils() 53 .notebook() 54 .getContext() 55 .safeToJson() 56 ) 57 ) 58 59 return notebook_context["attributes"].get("orgId"), notebook_context[ 60 "attributes" 61 ].get("jobName")
Databricks utilities class.
@staticmethod
def
get_db_utils(spark: pyspark.sql.session.SparkSession) -> Any:
13 @staticmethod 14 def get_db_utils(spark: SparkSession) -> Any: 15 """Get db utils on databricks. 16 17 Args: 18 spark: spark session. 19 20 Returns: 21 Dbutils from databricks. 22 """ 23 try: 24 from pyspark.dbutils import DBUtils 25 26 if "dbutils" not in locals(): 27 dbutils = DBUtils(spark) 28 else: 29 dbutils = locals().get("dbutils") 30 except ImportError: 31 import IPython 32 33 dbutils = IPython.get_ipython().user_ns["dbutils"] 34 return dbutils
Get db utils on databricks.
Arguments:
- spark: spark session.
Returns:
Dbutils from databricks.
@staticmethod
def
get_databricks_job_information(spark: pyspark.sql.session.SparkSession) -> Tuple[str, str]:
36 @staticmethod 37 def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]: 38 """Get notebook context from running acon. 39 40 Args: 41 spark: spark session. 42 43 Returns: 44 Dict containing databricks notebook context. 45 """ 46 if "local" in spark.getActiveSession().conf.get("spark.app.id"): 47 return "local", "local" 48 else: 49 dbutils = DatabricksUtils.get_db_utils(spark) 50 notebook_context = json.loads( 51 ( 52 dbutils.notebook.entry_point.getDbutils() 53 .notebook() 54 .getContext() 55 .safeToJson() 56 ) 57 ) 58 59 return notebook_context["attributes"].get("orgId"), notebook_context[ 60 "attributes" 61 ].get("jobName")
Get notebook context from running acon.
Arguments:
- spark: spark session.
Returns:
Dict containing databricks notebook context.