Skip to content

Joiners

Module with join transformers.

Joiners

Bases: object

Class containing join transformers.

Source code in mkdocs/lakehouse_engine/packages/transformers/joiners.py
class Joiners(object):
    """Class containing join transformers."""

    _logger = LoggingHandler(__name__).get_logger()

    @classmethod
    def join(
        cls,
        join_with: DataFrame,
        join_condition: str,
        left_df_alias: str = "a",
        right_df_alias: str = "b",
        join_type: str = "inner",
        broadcast_join: bool = True,
        select_cols: Optional[List[str]] = None,
        watermarker: Optional[dict] = None,
    ) -> Callable:
        """Join two dataframes based on specified type and columns.

        Some stream to stream joins are only possible if you apply Watermark, so this
        method also provides a parameter to enable watermarking specification.

        Args:
            left_df_alias: alias of the first dataframe.
            join_with: right dataframe.
            right_df_alias: alias of the second dataframe.
            join_condition: condition to join dataframes.
            join_type: type of join. Defaults to inner.
                Available values: inner, cross, outer, full, full outer,
                left, left outer, right, right outer, semi,
                left semi, anti, and left anti.
            broadcast_join: whether to perform a broadcast join or not.
            select_cols: list of columns to select at the end.
            watermarker: properties to apply watermarking.

        Returns:
            A function to be called in .transform() spark function.

        {{get_example(method_name='join')}}
        """

        def inner(df: DataFrame) -> DataFrame:
            # To enable join on foreachBatch processing we had
            # to change to global temp view. The goal here is to
            # avoid problems on simultaneously running process,
            # so we added application id on table name.
            app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id")
            left = f"`{app_id}_{left_df_alias}`"
            right = f"`{app_id}_{right_df_alias}`"
            df_join_with = join_with
            if watermarker:
                left_df_watermarking = watermarker.get(left_df_alias, None)
                right_df_watermarking = watermarker.get(right_df_alias, None)
                if left_df_watermarking:
                    df = Watermarker.with_watermark(
                        left_df_watermarking["col"],
                        left_df_watermarking["watermarking_time"],
                    )(df)
                if right_df_watermarking:
                    df_join_with = Watermarker.with_watermark(
                        right_df_watermarking["col"],
                        right_df_watermarking["watermarking_time"],
                    )(df_join_with)

            df.createOrReplaceGlobalTempView(left)  # type: ignore
            df_join_with.createOrReplaceGlobalTempView(right)  # type: ignore

            query = f"""
                SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""}
                {", ".join(select_cols)}
                FROM global_temp.{left} AS {left_df_alias}
                {join_type.upper()}
                JOIN global_temp.{right} AS {right_df_alias}
                ON {join_condition}
            """  # nosec: B608

            cls._logger.info(f"Execution query: {query}")

            return ExecEnv.SESSION.sql(query)

        return inner

join(join_with, join_condition, left_df_alias='a', right_df_alias='b', join_type='inner', broadcast_join=True, select_cols=None, watermarker=None) classmethod

Join two dataframes based on specified type and columns.

Some stream to stream joins are only possible if you apply Watermark, so this method also provides a parameter to enable watermarking specification.

Parameters:

Name Type Description Default
left_df_alias str

alias of the first dataframe.

'a'
join_with DataFrame

right dataframe.

required
right_df_alias str

alias of the second dataframe.

'b'
join_condition str

condition to join dataframes.

required
join_type str

type of join. Defaults to inner. Available values: inner, cross, outer, full, full outer, left, left outer, right, right outer, semi, left semi, anti, and left anti.

'inner'
broadcast_join bool

whether to perform a broadcast join or not.

True
select_cols Optional[List[str]]

list of columns to select at the end.

None
watermarker Optional[dict]

properties to apply watermarking.

None

Returns:

Type Description
Callable

A function to be called in .transform() spark function.

View Example of join (See full example here)
33{
34    "function": "join",
35    "args": {
36        "join_with": "customers",
37        "join_type": "left outer",
38        "join_condition": "a.customer = b.customer",
39        "select_cols": [
40            "a.*",
41            "b.name as customer_name"
42        ]
43    }
44}
Source code in mkdocs/lakehouse_engine/packages/transformers/joiners.py
@classmethod
def join(
    cls,
    join_with: DataFrame,
    join_condition: str,
    left_df_alias: str = "a",
    right_df_alias: str = "b",
    join_type: str = "inner",
    broadcast_join: bool = True,
    select_cols: Optional[List[str]] = None,
    watermarker: Optional[dict] = None,
) -> Callable:
    """Join two dataframes based on specified type and columns.

    Some stream to stream joins are only possible if you apply Watermark, so this
    method also provides a parameter to enable watermarking specification.

    Args:
        left_df_alias: alias of the first dataframe.
        join_with: right dataframe.
        right_df_alias: alias of the second dataframe.
        join_condition: condition to join dataframes.
        join_type: type of join. Defaults to inner.
            Available values: inner, cross, outer, full, full outer,
            left, left outer, right, right outer, semi,
            left semi, anti, and left anti.
        broadcast_join: whether to perform a broadcast join or not.
        select_cols: list of columns to select at the end.
        watermarker: properties to apply watermarking.

    Returns:
        A function to be called in .transform() spark function.

    {{get_example(method_name='join')}}
    """

    def inner(df: DataFrame) -> DataFrame:
        # To enable join on foreachBatch processing we had
        # to change to global temp view. The goal here is to
        # avoid problems on simultaneously running process,
        # so we added application id on table name.
        app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id")
        left = f"`{app_id}_{left_df_alias}`"
        right = f"`{app_id}_{right_df_alias}`"
        df_join_with = join_with
        if watermarker:
            left_df_watermarking = watermarker.get(left_df_alias, None)
            right_df_watermarking = watermarker.get(right_df_alias, None)
            if left_df_watermarking:
                df = Watermarker.with_watermark(
                    left_df_watermarking["col"],
                    left_df_watermarking["watermarking_time"],
                )(df)
            if right_df_watermarking:
                df_join_with = Watermarker.with_watermark(
                    right_df_watermarking["col"],
                    right_df_watermarking["watermarking_time"],
                )(df_join_with)

        df.createOrReplaceGlobalTempView(left)  # type: ignore
        df_join_with.createOrReplaceGlobalTempView(right)  # type: ignore

        query = f"""
            SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""}
            {", ".join(select_cols)}
            FROM global_temp.{left} AS {left_df_alias}
            {join_type.upper()}
            JOIN global_temp.{right} AS {right_df_alias}
            ON {join_condition}
        """  # nosec: B608

        cls._logger.info(f"Execution query: {query}")

        return ExecEnv.SESSION.sql(query)

    return inner