Skip to content

Databricks utils

Utilities for databricks operations.

DatabricksUtils

Bases: object

Databricks utilities class.

Source code in mkdocs/lakehouse_engine/packages/utils/databricks_utils.py
class DatabricksUtils(object):
    """Databricks utilities class."""

    @staticmethod
    def get_db_utils(spark: SparkSession) -> Any:
        """Get db utils on databricks.

        Args:
            spark: spark session.

        Returns:
            Dbutils from databricks.
        """
        try:
            from pyspark.dbutils import DBUtils

            if "dbutils" not in locals():
                dbutils = DBUtils(spark)
            else:
                dbutils = locals().get("dbutils")
        except ImportError:
            import IPython

            dbutils = IPython.get_ipython().user_ns["dbutils"]
        return dbutils

    @staticmethod
    def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]:
        """Get notebook context from running acon.

        Args:
            spark: spark session.

        Returns:
            Dict containing databricks notebook context.
        """
        if "local" in spark.getActiveSession().conf.get("spark.app.id"):
            return "local", "local"
        else:
            dbutils = DatabricksUtils.get_db_utils(spark)
            notebook_context = json.loads(
                (
                    dbutils.notebook.entry_point.getDbutils()
                    .notebook()
                    .getContext()
                    .safeToJson()
                )
            )

            return notebook_context["attributes"].get("orgId"), notebook_context[
                "attributes"
            ].get("jobName")

get_databricks_job_information(spark) staticmethod

Get notebook context from running acon.

Parameters:

Name Type Description Default
spark SparkSession

spark session.

required

Returns:

Type Description
Tuple[str, str]

Dict containing databricks notebook context.

Source code in mkdocs/lakehouse_engine/packages/utils/databricks_utils.py
@staticmethod
def get_databricks_job_information(spark: SparkSession) -> Tuple[str, str]:
    """Get notebook context from running acon.

    Args:
        spark: spark session.

    Returns:
        Dict containing databricks notebook context.
    """
    if "local" in spark.getActiveSession().conf.get("spark.app.id"):
        return "local", "local"
    else:
        dbutils = DatabricksUtils.get_db_utils(spark)
        notebook_context = json.loads(
            (
                dbutils.notebook.entry_point.getDbutils()
                .notebook()
                .getContext()
                .safeToJson()
            )
        )

        return notebook_context["attributes"].get("orgId"), notebook_context[
            "attributes"
        ].get("jobName")

get_db_utils(spark) staticmethod

Get db utils on databricks.

Parameters:

Name Type Description Default
spark SparkSession

spark session.

required

Returns:

Type Description
Any

Dbutils from databricks.

Source code in mkdocs/lakehouse_engine/packages/utils/databricks_utils.py
@staticmethod
def get_db_utils(spark: SparkSession) -> Any:
    """Get db utils on databricks.

    Args:
        spark: spark session.

    Returns:
        Dbutils from databricks.
    """
    try:
        from pyspark.dbutils import DBUtils

        if "dbutils" not in locals():
            dbutils = DBUtils(spark)
        else:
            dbutils = locals().get("dbutils")
    except ImportError:
        import IPython

        dbutils = IPython.get_ipython().user_ns["dbutils"]
    return dbutils