lakehouse_engine.transformers.joiners

Module with join transformers.

 1"""Module with join transformers."""
 2
 3from typing import Callable, List, Optional
 4
 5from pyspark.sql import DataFrame
 6
 7from lakehouse_engine.core.exec_env import ExecEnv
 8from lakehouse_engine.transformers.watermarker import Watermarker
 9from lakehouse_engine.utils.logging_handler import LoggingHandler
10
11
12class Joiners(object):
13    """Class containing join transformers."""
14
15    _logger = LoggingHandler(__name__).get_logger()
16
17    @classmethod
18    def join(
19        cls,
20        join_with: DataFrame,
21        join_condition: str,
22        left_df_alias: str = "a",
23        right_df_alias: str = "b",
24        join_type: str = "inner",
25        broadcast_join: bool = True,
26        select_cols: Optional[List[str]] = None,
27        watermarker: Optional[dict] = None,
28    ) -> Callable:
29        """Join two dataframes based on specified type and columns.
30
31        Some stream to stream joins are only possible if you apply Watermark, so this
32        method also provides a parameter to enable watermarking specification.
33
34        Args:
35            left_df_alias: alias of the first dataframe.
36            join_with: right dataframe.
37            right_df_alias: alias of the second dataframe.
38            join_condition: condition to join dataframes.
39            join_type: type of join. Defaults to inner.
40                Available values: inner, cross, outer, full, full outer,
41                left, left outer, right, right outer, semi,
42                left semi, anti, and left anti.
43            broadcast_join: whether to perform a broadcast join or not.
44            select_cols: list of columns to select at the end.
45            watermarker: properties to apply watermarking.
46
47        Returns:
48            A function to be called in .transform() spark function.
49        """
50
51        def inner(df: DataFrame) -> DataFrame:
52            # To enable join on foreachBatch processing we had
53            # to change to global temp view. The goal here is to
54            # avoid problems on simultaneously running process,
55            # so we added application id on table name.
56            app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id")
57            left = f"`{app_id}_{left_df_alias}`"
58            right = f"`{app_id}_{right_df_alias}`"
59            df_join_with = join_with
60            if watermarker:
61                left_df_watermarking = watermarker.get(left_df_alias, None)
62                right_df_watermarking = watermarker.get(right_df_alias, None)
63                if left_df_watermarking:
64                    df = Watermarker.with_watermark(
65                        left_df_watermarking["col"],
66                        left_df_watermarking["watermarking_time"],
67                    )(df)
68                if right_df_watermarking:
69                    df_join_with = Watermarker.with_watermark(
70                        right_df_watermarking["col"],
71                        right_df_watermarking["watermarking_time"],
72                    )(df_join_with)
73
74            df.createOrReplaceGlobalTempView(left)  # type: ignore
75            df_join_with.createOrReplaceGlobalTempView(right)  # type: ignore
76
77            query = f"""
78                SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""}
79                {", ".join(select_cols)}
80                FROM global_temp.{left} AS {left_df_alias}
81                {join_type.upper()}
82                JOIN global_temp.{right} AS {right_df_alias}
83                ON {join_condition}
84            """  # nosec: B608
85
86            cls._logger.info(f"Execution query: {query}")
87
88            return ExecEnv.SESSION.sql(query)
89
90        return inner
class Joiners:
13class Joiners(object):
14    """Class containing join transformers."""
15
16    _logger = LoggingHandler(__name__).get_logger()
17
18    @classmethod
19    def join(
20        cls,
21        join_with: DataFrame,
22        join_condition: str,
23        left_df_alias: str = "a",
24        right_df_alias: str = "b",
25        join_type: str = "inner",
26        broadcast_join: bool = True,
27        select_cols: Optional[List[str]] = None,
28        watermarker: Optional[dict] = None,
29    ) -> Callable:
30        """Join two dataframes based on specified type and columns.
31
32        Some stream to stream joins are only possible if you apply Watermark, so this
33        method also provides a parameter to enable watermarking specification.
34
35        Args:
36            left_df_alias: alias of the first dataframe.
37            join_with: right dataframe.
38            right_df_alias: alias of the second dataframe.
39            join_condition: condition to join dataframes.
40            join_type: type of join. Defaults to inner.
41                Available values: inner, cross, outer, full, full outer,
42                left, left outer, right, right outer, semi,
43                left semi, anti, and left anti.
44            broadcast_join: whether to perform a broadcast join or not.
45            select_cols: list of columns to select at the end.
46            watermarker: properties to apply watermarking.
47
48        Returns:
49            A function to be called in .transform() spark function.
50        """
51
52        def inner(df: DataFrame) -> DataFrame:
53            # To enable join on foreachBatch processing we had
54            # to change to global temp view. The goal here is to
55            # avoid problems on simultaneously running process,
56            # so we added application id on table name.
57            app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id")
58            left = f"`{app_id}_{left_df_alias}`"
59            right = f"`{app_id}_{right_df_alias}`"
60            df_join_with = join_with
61            if watermarker:
62                left_df_watermarking = watermarker.get(left_df_alias, None)
63                right_df_watermarking = watermarker.get(right_df_alias, None)
64                if left_df_watermarking:
65                    df = Watermarker.with_watermark(
66                        left_df_watermarking["col"],
67                        left_df_watermarking["watermarking_time"],
68                    )(df)
69                if right_df_watermarking:
70                    df_join_with = Watermarker.with_watermark(
71                        right_df_watermarking["col"],
72                        right_df_watermarking["watermarking_time"],
73                    )(df_join_with)
74
75            df.createOrReplaceGlobalTempView(left)  # type: ignore
76            df_join_with.createOrReplaceGlobalTempView(right)  # type: ignore
77
78            query = f"""
79                SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""}
80                {", ".join(select_cols)}
81                FROM global_temp.{left} AS {left_df_alias}
82                {join_type.upper()}
83                JOIN global_temp.{right} AS {right_df_alias}
84                ON {join_condition}
85            """  # nosec: B608
86
87            cls._logger.info(f"Execution query: {query}")
88
89            return ExecEnv.SESSION.sql(query)
90
91        return inner

Class containing join transformers.

@classmethod
def join( cls, join_with: pyspark.sql.dataframe.DataFrame, join_condition: str, left_df_alias: str = 'a', right_df_alias: str = 'b', join_type: str = 'inner', broadcast_join: bool = True, select_cols: Optional[List[str]] = None, watermarker: Optional[dict] = None) -> Callable:
18    @classmethod
19    def join(
20        cls,
21        join_with: DataFrame,
22        join_condition: str,
23        left_df_alias: str = "a",
24        right_df_alias: str = "b",
25        join_type: str = "inner",
26        broadcast_join: bool = True,
27        select_cols: Optional[List[str]] = None,
28        watermarker: Optional[dict] = None,
29    ) -> Callable:
30        """Join two dataframes based on specified type and columns.
31
32        Some stream to stream joins are only possible if you apply Watermark, so this
33        method also provides a parameter to enable watermarking specification.
34
35        Args:
36            left_df_alias: alias of the first dataframe.
37            join_with: right dataframe.
38            right_df_alias: alias of the second dataframe.
39            join_condition: condition to join dataframes.
40            join_type: type of join. Defaults to inner.
41                Available values: inner, cross, outer, full, full outer,
42                left, left outer, right, right outer, semi,
43                left semi, anti, and left anti.
44            broadcast_join: whether to perform a broadcast join or not.
45            select_cols: list of columns to select at the end.
46            watermarker: properties to apply watermarking.
47
48        Returns:
49            A function to be called in .transform() spark function.
50        """
51
52        def inner(df: DataFrame) -> DataFrame:
53            # To enable join on foreachBatch processing we had
54            # to change to global temp view. The goal here is to
55            # avoid problems on simultaneously running process,
56            # so we added application id on table name.
57            app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id")
58            left = f"`{app_id}_{left_df_alias}`"
59            right = f"`{app_id}_{right_df_alias}`"
60            df_join_with = join_with
61            if watermarker:
62                left_df_watermarking = watermarker.get(left_df_alias, None)
63                right_df_watermarking = watermarker.get(right_df_alias, None)
64                if left_df_watermarking:
65                    df = Watermarker.with_watermark(
66                        left_df_watermarking["col"],
67                        left_df_watermarking["watermarking_time"],
68                    )(df)
69                if right_df_watermarking:
70                    df_join_with = Watermarker.with_watermark(
71                        right_df_watermarking["col"],
72                        right_df_watermarking["watermarking_time"],
73                    )(df_join_with)
74
75            df.createOrReplaceGlobalTempView(left)  # type: ignore
76            df_join_with.createOrReplaceGlobalTempView(right)  # type: ignore
77
78            query = f"""
79                SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""}
80                {", ".join(select_cols)}
81                FROM global_temp.{left} AS {left_df_alias}
82                {join_type.upper()}
83                JOIN global_temp.{right} AS {right_df_alias}
84                ON {join_condition}
85            """  # nosec: B608
86
87            cls._logger.info(f"Execution query: {query}")
88
89            return ExecEnv.SESSION.sql(query)
90
91        return inner

Join two dataframes based on specified type and columns.

Some stream to stream joins are only possible if you apply Watermark, so this method also provides a parameter to enable watermarking specification.

Arguments:
  • left_df_alias: alias of the first dataframe.
  • join_with: right dataframe.
  • right_df_alias: alias of the second dataframe.
  • join_condition: condition to join dataframes.
  • join_type: type of join. Defaults to inner. Available values: inner, cross, outer, full, full outer, left, left outer, right, right outer, semi, left semi, anti, and left anti.
  • broadcast_join: whether to perform a broadcast join or not.
  • select_cols: list of columns to select at the end.
  • watermarker: properties to apply watermarking.
Returns:

A function to be called in .transform() spark function.

View Example
33{
34    "function": "join",
35    "args": {
36        "join_with": "customers",
37        "join_type": "left outer",
38        "join_condition": "a.customer = b.customer",
39        "select_cols": [
40            "a.*",
41            "b.name as customer_name"
42        ]
43    }
44}
View Full Acon