lakehouse_engine.transformers.joiners
Module with join transformers.
1"""Module with join transformers.""" 2 3from typing import Callable, List, Optional 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.exec_env import ExecEnv 8from lakehouse_engine.transformers.watermarker import Watermarker 9from lakehouse_engine.utils.logging_handler import LoggingHandler 10 11 12class Joiners(object): 13 """Class containing join transformers.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def join( 19 cls, 20 join_with: DataFrame, 21 join_condition: str, 22 left_df_alias: str = "a", 23 right_df_alias: str = "b", 24 join_type: str = "inner", 25 broadcast_join: bool = True, 26 select_cols: Optional[List[str]] = None, 27 watermarker: Optional[dict] = None, 28 ) -> Callable: 29 """Join two dataframes based on specified type and columns. 30 31 Some stream to stream joins are only possible if you apply Watermark, so this 32 method also provides a parameter to enable watermarking specification. 33 34 Args: 35 left_df_alias: alias of the first dataframe. 36 join_with: right dataframe. 37 right_df_alias: alias of the second dataframe. 38 join_condition: condition to join dataframes. 39 join_type: type of join. Defaults to inner. 40 Available values: inner, cross, outer, full, full outer, 41 left, left outer, right, right outer, semi, 42 left semi, anti, and left anti. 43 broadcast_join: whether to perform a broadcast join or not. 44 select_cols: list of columns to select at the end. 45 watermarker: properties to apply watermarking. 46 47 Returns: 48 A function to be called in .transform() spark function. 49 """ 50 51 def inner(df: DataFrame) -> DataFrame: 52 # To enable join on foreachBatch processing we had 53 # to change to global temp view. The goal here is to 54 # avoid problems on simultaneously running process, 55 # so we added application id on table name. 56 app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id") 57 left = f"`{app_id}_{left_df_alias}`" 58 right = f"`{app_id}_{right_df_alias}`" 59 df_join_with = join_with 60 if watermarker: 61 left_df_watermarking = watermarker.get(left_df_alias, None) 62 right_df_watermarking = watermarker.get(right_df_alias, None) 63 if left_df_watermarking: 64 df = Watermarker.with_watermark( 65 left_df_watermarking["col"], 66 left_df_watermarking["watermarking_time"], 67 )(df) 68 if right_df_watermarking: 69 df_join_with = Watermarker.with_watermark( 70 right_df_watermarking["col"], 71 right_df_watermarking["watermarking_time"], 72 )(df_join_with) 73 74 df.createOrReplaceGlobalTempView(left) # type: ignore 75 df_join_with.createOrReplaceGlobalTempView(right) # type: ignore 76 77 query = f""" 78 SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""} 79 {", ".join(select_cols)} 80 FROM global_temp.{left} AS {left_df_alias} 81 {join_type.upper()} 82 JOIN global_temp.{right} AS {right_df_alias} 83 ON {join_condition} 84 """ # nosec: B608 85 86 cls._logger.info(f"Execution query: {query}") 87 88 return ExecEnv.SESSION.sql(query) 89 90 return inner
class
Joiners:
13class Joiners(object): 14 """Class containing join transformers.""" 15 16 _logger = LoggingHandler(__name__).get_logger() 17 18 @classmethod 19 def join( 20 cls, 21 join_with: DataFrame, 22 join_condition: str, 23 left_df_alias: str = "a", 24 right_df_alias: str = "b", 25 join_type: str = "inner", 26 broadcast_join: bool = True, 27 select_cols: Optional[List[str]] = None, 28 watermarker: Optional[dict] = None, 29 ) -> Callable: 30 """Join two dataframes based on specified type and columns. 31 32 Some stream to stream joins are only possible if you apply Watermark, so this 33 method also provides a parameter to enable watermarking specification. 34 35 Args: 36 left_df_alias: alias of the first dataframe. 37 join_with: right dataframe. 38 right_df_alias: alias of the second dataframe. 39 join_condition: condition to join dataframes. 40 join_type: type of join. Defaults to inner. 41 Available values: inner, cross, outer, full, full outer, 42 left, left outer, right, right outer, semi, 43 left semi, anti, and left anti. 44 broadcast_join: whether to perform a broadcast join or not. 45 select_cols: list of columns to select at the end. 46 watermarker: properties to apply watermarking. 47 48 Returns: 49 A function to be called in .transform() spark function. 50 """ 51 52 def inner(df: DataFrame) -> DataFrame: 53 # To enable join on foreachBatch processing we had 54 # to change to global temp view. The goal here is to 55 # avoid problems on simultaneously running process, 56 # so we added application id on table name. 57 app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id") 58 left = f"`{app_id}_{left_df_alias}`" 59 right = f"`{app_id}_{right_df_alias}`" 60 df_join_with = join_with 61 if watermarker: 62 left_df_watermarking = watermarker.get(left_df_alias, None) 63 right_df_watermarking = watermarker.get(right_df_alias, None) 64 if left_df_watermarking: 65 df = Watermarker.with_watermark( 66 left_df_watermarking["col"], 67 left_df_watermarking["watermarking_time"], 68 )(df) 69 if right_df_watermarking: 70 df_join_with = Watermarker.with_watermark( 71 right_df_watermarking["col"], 72 right_df_watermarking["watermarking_time"], 73 )(df_join_with) 74 75 df.createOrReplaceGlobalTempView(left) # type: ignore 76 df_join_with.createOrReplaceGlobalTempView(right) # type: ignore 77 78 query = f""" 79 SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""} 80 {", ".join(select_cols)} 81 FROM global_temp.{left} AS {left_df_alias} 82 {join_type.upper()} 83 JOIN global_temp.{right} AS {right_df_alias} 84 ON {join_condition} 85 """ # nosec: B608 86 87 cls._logger.info(f"Execution query: {query}") 88 89 return ExecEnv.SESSION.sql(query) 90 91 return inner
Class containing join transformers.
@classmethod
def
join( cls, join_with: pyspark.sql.dataframe.DataFrame, join_condition: str, left_df_alias: str = 'a', right_df_alias: str = 'b', join_type: str = 'inner', broadcast_join: bool = True, select_cols: Optional[List[str]] = None, watermarker: Optional[dict] = None) -> Callable:
18 @classmethod 19 def join( 20 cls, 21 join_with: DataFrame, 22 join_condition: str, 23 left_df_alias: str = "a", 24 right_df_alias: str = "b", 25 join_type: str = "inner", 26 broadcast_join: bool = True, 27 select_cols: Optional[List[str]] = None, 28 watermarker: Optional[dict] = None, 29 ) -> Callable: 30 """Join two dataframes based on specified type and columns. 31 32 Some stream to stream joins are only possible if you apply Watermark, so this 33 method also provides a parameter to enable watermarking specification. 34 35 Args: 36 left_df_alias: alias of the first dataframe. 37 join_with: right dataframe. 38 right_df_alias: alias of the second dataframe. 39 join_condition: condition to join dataframes. 40 join_type: type of join. Defaults to inner. 41 Available values: inner, cross, outer, full, full outer, 42 left, left outer, right, right outer, semi, 43 left semi, anti, and left anti. 44 broadcast_join: whether to perform a broadcast join or not. 45 select_cols: list of columns to select at the end. 46 watermarker: properties to apply watermarking. 47 48 Returns: 49 A function to be called in .transform() spark function. 50 """ 51 52 def inner(df: DataFrame) -> DataFrame: 53 # To enable join on foreachBatch processing we had 54 # to change to global temp view. The goal here is to 55 # avoid problems on simultaneously running process, 56 # so we added application id on table name. 57 app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id") 58 left = f"`{app_id}_{left_df_alias}`" 59 right = f"`{app_id}_{right_df_alias}`" 60 df_join_with = join_with 61 if watermarker: 62 left_df_watermarking = watermarker.get(left_df_alias, None) 63 right_df_watermarking = watermarker.get(right_df_alias, None) 64 if left_df_watermarking: 65 df = Watermarker.with_watermark( 66 left_df_watermarking["col"], 67 left_df_watermarking["watermarking_time"], 68 )(df) 69 if right_df_watermarking: 70 df_join_with = Watermarker.with_watermark( 71 right_df_watermarking["col"], 72 right_df_watermarking["watermarking_time"], 73 )(df_join_with) 74 75 df.createOrReplaceGlobalTempView(left) # type: ignore 76 df_join_with.createOrReplaceGlobalTempView(right) # type: ignore 77 78 query = f""" 79 SELECT {f"/*+ BROADCAST({right_df_alias}) */" if broadcast_join else ""} 80 {", ".join(select_cols)} 81 FROM global_temp.{left} AS {left_df_alias} 82 {join_type.upper()} 83 JOIN global_temp.{right} AS {right_df_alias} 84 ON {join_condition} 85 """ # nosec: B608 86 87 cls._logger.info(f"Execution query: {query}") 88 89 return ExecEnv.SESSION.sql(query) 90 91 return inner
Join two dataframes based on specified type and columns.
Some stream to stream joins are only possible if you apply Watermark, so this method also provides a parameter to enable watermarking specification.
Arguments:
- left_df_alias: alias of the first dataframe.
- join_with: right dataframe.
- right_df_alias: alias of the second dataframe.
- join_condition: condition to join dataframes.
- join_type: type of join. Defaults to inner. Available values: inner, cross, outer, full, full outer, left, left outer, right, right outer, semi, left semi, anti, and left anti.
- broadcast_join: whether to perform a broadcast join or not.
- select_cols: list of columns to select at the end.
- watermarker: properties to apply watermarking.
Returns:
A function to be called in .transform() spark function.