Skip to content


Module with join transformers.


Bases: object

Class containing join transformers.

Source code in mkdocs/lakehouse_engine/packages/transformers/
class Joiners(object):
    """Class containing join transformers."""

    _logger = LoggingHandler(__name__).get_logger()

    def join(
        join_with: DataFrame,
        join_condition: str,
        left_df_alias: str = "a",
        right_df_alias: str = "b",
        join_type: str = "inner",
        broadcast_join: bool = True,
        select_cols: Optional[List[str]] = None,
        watermarker: Optional[dict] = None,
    ) -> Callable:
        """Join two dataframes based on specified type and columns.

        Some stream to stream joins are only possible if you apply Watermark, so this
        method also provides a parameter to enable watermarking specification.

            left_df_alias: alias of the first dataframe.
            join_with: right dataframe.
            right_df_alias: alias of the second dataframe.
            join_condition: condition to join dataframes.
            join_type: type of join. Defaults to inner.
                Available values: inner, cross, outer, full, full outer,
                left, left outer, right, right outer, semi,
                left semi, anti, and left anti.
            broadcast_join: whether to perform a broadcast join or not.
            select_cols: list of columns to select at the end.
            watermarker: properties to apply watermarking.

            A function to be called in .transform() spark function.


        def inner(df: DataFrame) -> DataFrame:
            # To enable join on foreachBatch processing we had
            # to change to global temp view. The goal here is to
            # avoid problems on simultaneously running process,
            # so we added application id on table name.
            app_id = ExecEnv.SESSION.getActiveSession().conf.get("")
            left = f"`{app_id}_{left_df_alias}`"
            right = f"`{app_id}_{right_df_alias}`"
            df_join_with = join_with
            if watermarker:
                left_df_watermarking = watermarker.get(left_df_alias, None)
                right_df_watermarking = watermarker.get(right_df_alias, None)
                if left_df_watermarking:
                    df = Watermarker.with_watermark(
                if right_df_watermarking:
                    df_join_with = Watermarker.with_watermark(

            df.createOrReplaceGlobalTempView(left)  # type: ignore
            df_join_with.createOrReplaceGlobalTempView(right)  # type: ignore

            query = f"""
                SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""}
                {", ".join(select_cols)}
                FROM global_temp.{left} AS {left_df_alias}
                JOIN global_temp.{right} AS {right_df_alias}
                ON {join_condition}
            """  # nosec: B608

  "Execution query: {query}")

            return ExecEnv.SESSION.sql(query)

        return inner

join(join_with, join_condition, left_df_alias='a', right_df_alias='b', join_type='inner', broadcast_join=True, select_cols=None, watermarker=None) classmethod

Join two dataframes based on specified type and columns.

Some stream to stream joins are only possible if you apply Watermark, so this method also provides a parameter to enable watermarking specification.


Name Type Description Default
left_df_alias str

alias of the first dataframe.

join_with DataFrame

right dataframe.

right_df_alias str

alias of the second dataframe.

join_condition str

condition to join dataframes.

join_type str

type of join. Defaults to inner. Available values: inner, cross, outer, full, full outer, left, left outer, right, right outer, semi, left semi, anti, and left anti.

broadcast_join bool

whether to perform a broadcast join or not.

select_cols Optional[List[str]]

list of columns to select at the end.

watermarker Optional[dict]

properties to apply watermarking.



Type Description

A function to be called in .transform() spark function.

View Example of join (See full example here)
34    "function": "join",
35    "args": {
36        "join_with": "customers",
37        "join_type": "left outer",
38        "join_condition": "a.customer = b.customer",
39        "select_cols": [
40            "a.*",
41            " as customer_name"
42        ]
43    }
Source code in mkdocs/lakehouse_engine/packages/transformers/
def join(
    join_with: DataFrame,
    join_condition: str,
    left_df_alias: str = "a",
    right_df_alias: str = "b",
    join_type: str = "inner",
    broadcast_join: bool = True,
    select_cols: Optional[List[str]] = None,
    watermarker: Optional[dict] = None,
) -> Callable:
    """Join two dataframes based on specified type and columns.

    Some stream to stream joins are only possible if you apply Watermark, so this
    method also provides a parameter to enable watermarking specification.

        left_df_alias: alias of the first dataframe.
        join_with: right dataframe.
        right_df_alias: alias of the second dataframe.
        join_condition: condition to join dataframes.
        join_type: type of join. Defaults to inner.
            Available values: inner, cross, outer, full, full outer,
            left, left outer, right, right outer, semi,
            left semi, anti, and left anti.
        broadcast_join: whether to perform a broadcast join or not.
        select_cols: list of columns to select at the end.
        watermarker: properties to apply watermarking.

        A function to be called in .transform() spark function.


    def inner(df: DataFrame) -> DataFrame:
        # To enable join on foreachBatch processing we had
        # to change to global temp view. The goal here is to
        # avoid problems on simultaneously running process,
        # so we added application id on table name.
        app_id = ExecEnv.SESSION.getActiveSession().conf.get("")
        left = f"`{app_id}_{left_df_alias}`"
        right = f"`{app_id}_{right_df_alias}`"
        df_join_with = join_with
        if watermarker:
            left_df_watermarking = watermarker.get(left_df_alias, None)
            right_df_watermarking = watermarker.get(right_df_alias, None)
            if left_df_watermarking:
                df = Watermarker.with_watermark(
            if right_df_watermarking:
                df_join_with = Watermarker.with_watermark(

        df.createOrReplaceGlobalTempView(left)  # type: ignore
        df_join_with.createOrReplaceGlobalTempView(right)  # type: ignore

        query = f"""
            SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""}
            {", ".join(select_cols)}
            FROM global_temp.{left} AS {left_df_alias}
            JOIN global_temp.{right} AS {right_df_alias}
            ON {join_condition}
        """  # nosec: B608"Execution query: {query}")

        return ExecEnv.SESSION.sql(query)

    return inner