lakehouse_engine.transformers.condensers
Condensers module.
1"""Condensers module.""" 2 3from typing import Callable, List, Optional 4 5from pyspark.sql import DataFrame, Window 6from pyspark.sql.functions import col, row_number 7 8from lakehouse_engine.transformers.exceptions import ( 9 UnsupportedStreamingTransformerException, 10 WrongArgumentsException, 11) 12from lakehouse_engine.utils.logging_handler import LoggingHandler 13 14 15class Condensers(object): 16 """Class containing all the functions to condensate data for later merges.""" 17 18 _logger = LoggingHandler(__name__).get_logger() 19 20 @classmethod 21 def condense_record_mode_cdc( 22 cls, 23 business_key: List[str], 24 record_mode_col: str, 25 valid_record_modes: List[str], 26 ranking_key_desc: Optional[List[str]] = None, 27 ranking_key_asc: Optional[List[str]] = None, 28 ) -> Callable: 29 """Condense Change Data Capture (CDC) based on record_mode strategy. 30 31 This CDC data is particularly seen in some CDC enabled systems. Other systems 32 may have different CDC strategies. 33 34 Args: 35 business_key: The business key (logical primary key) of the data. 36 ranking_key_desc: In this type of CDC condensation the data needs to be 37 in descending order in a certain way, using columns specified in this 38 parameter. 39 ranking_key_asc: In this type of CDC condensation the data needs to be 40 in ascending order in a certain way, using columns specified in 41 this parameter. 42 record_mode_col: Name of the record mode input_col. 43 valid_record_modes: Depending on the context, not all record modes may be 44 considered for condensation. Use this parameter to skip those. 45 46 Returns: 47 A function to be executed in the .transform() spark function. 48 """ 49 if not ranking_key_desc and not ranking_key_asc: 50 raise WrongArgumentsException( 51 "The condense_record_mode_cdc transformer requires data to be either" 52 "in descending or ascending order, but no arguments for ordering" 53 "were provided." 54 ) 55 56 def inner(df: DataFrame) -> DataFrame: 57 if not df.isStreaming: 58 partition_window = Window.partitionBy( 59 [col(c) for c in business_key] 60 ).orderBy( 61 [ 62 col(c).desc() 63 for c in (ranking_key_desc if ranking_key_desc else []) 64 ] # type: ignore 65 + [ 66 col(c).asc() 67 for c in (ranking_key_asc if ranking_key_asc else []) 68 ] # type: ignore 69 ) 70 71 return ( 72 df.withColumn("ranking", row_number().over(partition_window)) 73 .filter( 74 col(record_mode_col).isNull() 75 | col(record_mode_col).isin(valid_record_modes) 76 ) 77 .filter(col("ranking") == 1) 78 .drop("ranking") 79 ) 80 else: 81 raise UnsupportedStreamingTransformerException( 82 "Transformer condense_record_mode_cdc is not supported in " 83 "streaming mode." 84 ) 85 86 return inner 87 88 @classmethod 89 def group_and_rank( 90 cls, group_key: List[str], ranking_key: List[str], descending: bool = True 91 ) -> Callable: 92 """Condense data based on a simple group by + take latest mechanism. 93 94 Args: 95 group_key: list of column names to use in the group by. 96 ranking_key: the data needs to be in descending order using columns 97 specified in this parameter. 98 descending: if the ranking considers descending order or not. Defaults to 99 True. 100 101 Returns: 102 A function to be executed in the .transform() spark function. 103 """ 104 105 def inner(df: DataFrame) -> DataFrame: 106 if not df.isStreaming: 107 partition_window = Window.partitionBy( 108 [col(c) for c in group_key] 109 ).orderBy( 110 [ 111 col(c).desc() if descending else col(c).asc() 112 for c in (ranking_key if ranking_key else []) 113 ] # type: ignore 114 ) 115 116 return ( 117 df.withColumn("ranking", row_number().over(partition_window)) 118 .filter(col("ranking") == 1) 119 .drop("ranking") 120 ) 121 else: 122 raise UnsupportedStreamingTransformerException( 123 "Transformer group_and_rank is not supported in streaming mode." 124 ) 125 126 return inner
class
Condensers:
16class Condensers(object): 17 """Class containing all the functions to condensate data for later merges.""" 18 19 _logger = LoggingHandler(__name__).get_logger() 20 21 @classmethod 22 def condense_record_mode_cdc( 23 cls, 24 business_key: List[str], 25 record_mode_col: str, 26 valid_record_modes: List[str], 27 ranking_key_desc: Optional[List[str]] = None, 28 ranking_key_asc: Optional[List[str]] = None, 29 ) -> Callable: 30 """Condense Change Data Capture (CDC) based on record_mode strategy. 31 32 This CDC data is particularly seen in some CDC enabled systems. Other systems 33 may have different CDC strategies. 34 35 Args: 36 business_key: The business key (logical primary key) of the data. 37 ranking_key_desc: In this type of CDC condensation the data needs to be 38 in descending order in a certain way, using columns specified in this 39 parameter. 40 ranking_key_asc: In this type of CDC condensation the data needs to be 41 in ascending order in a certain way, using columns specified in 42 this parameter. 43 record_mode_col: Name of the record mode input_col. 44 valid_record_modes: Depending on the context, not all record modes may be 45 considered for condensation. Use this parameter to skip those. 46 47 Returns: 48 A function to be executed in the .transform() spark function. 49 """ 50 if not ranking_key_desc and not ranking_key_asc: 51 raise WrongArgumentsException( 52 "The condense_record_mode_cdc transformer requires data to be either" 53 "in descending or ascending order, but no arguments for ordering" 54 "were provided." 55 ) 56 57 def inner(df: DataFrame) -> DataFrame: 58 if not df.isStreaming: 59 partition_window = Window.partitionBy( 60 [col(c) for c in business_key] 61 ).orderBy( 62 [ 63 col(c).desc() 64 for c in (ranking_key_desc if ranking_key_desc else []) 65 ] # type: ignore 66 + [ 67 col(c).asc() 68 for c in (ranking_key_asc if ranking_key_asc else []) 69 ] # type: ignore 70 ) 71 72 return ( 73 df.withColumn("ranking", row_number().over(partition_window)) 74 .filter( 75 col(record_mode_col).isNull() 76 | col(record_mode_col).isin(valid_record_modes) 77 ) 78 .filter(col("ranking") == 1) 79 .drop("ranking") 80 ) 81 else: 82 raise UnsupportedStreamingTransformerException( 83 "Transformer condense_record_mode_cdc is not supported in " 84 "streaming mode." 85 ) 86 87 return inner 88 89 @classmethod 90 def group_and_rank( 91 cls, group_key: List[str], ranking_key: List[str], descending: bool = True 92 ) -> Callable: 93 """Condense data based on a simple group by + take latest mechanism. 94 95 Args: 96 group_key: list of column names to use in the group by. 97 ranking_key: the data needs to be in descending order using columns 98 specified in this parameter. 99 descending: if the ranking considers descending order or not. Defaults to 100 True. 101 102 Returns: 103 A function to be executed in the .transform() spark function. 104 """ 105 106 def inner(df: DataFrame) -> DataFrame: 107 if not df.isStreaming: 108 partition_window = Window.partitionBy( 109 [col(c) for c in group_key] 110 ).orderBy( 111 [ 112 col(c).desc() if descending else col(c).asc() 113 for c in (ranking_key if ranking_key else []) 114 ] # type: ignore 115 ) 116 117 return ( 118 df.withColumn("ranking", row_number().over(partition_window)) 119 .filter(col("ranking") == 1) 120 .drop("ranking") 121 ) 122 else: 123 raise UnsupportedStreamingTransformerException( 124 "Transformer group_and_rank is not supported in streaming mode." 125 ) 126 127 return inner
Class containing all the functions to condensate data for later merges.
@classmethod
def
condense_record_mode_cdc( cls, business_key: List[str], record_mode_col: str, valid_record_modes: List[str], ranking_key_desc: Optional[List[str]] = None, ranking_key_asc: Optional[List[str]] = None) -> Callable:
21 @classmethod 22 def condense_record_mode_cdc( 23 cls, 24 business_key: List[str], 25 record_mode_col: str, 26 valid_record_modes: List[str], 27 ranking_key_desc: Optional[List[str]] = None, 28 ranking_key_asc: Optional[List[str]] = None, 29 ) -> Callable: 30 """Condense Change Data Capture (CDC) based on record_mode strategy. 31 32 This CDC data is particularly seen in some CDC enabled systems. Other systems 33 may have different CDC strategies. 34 35 Args: 36 business_key: The business key (logical primary key) of the data. 37 ranking_key_desc: In this type of CDC condensation the data needs to be 38 in descending order in a certain way, using columns specified in this 39 parameter. 40 ranking_key_asc: In this type of CDC condensation the data needs to be 41 in ascending order in a certain way, using columns specified in 42 this parameter. 43 record_mode_col: Name of the record mode input_col. 44 valid_record_modes: Depending on the context, not all record modes may be 45 considered for condensation. Use this parameter to skip those. 46 47 Returns: 48 A function to be executed in the .transform() spark function. 49 """ 50 if not ranking_key_desc and not ranking_key_asc: 51 raise WrongArgumentsException( 52 "The condense_record_mode_cdc transformer requires data to be either" 53 "in descending or ascending order, but no arguments for ordering" 54 "were provided." 55 ) 56 57 def inner(df: DataFrame) -> DataFrame: 58 if not df.isStreaming: 59 partition_window = Window.partitionBy( 60 [col(c) for c in business_key] 61 ).orderBy( 62 [ 63 col(c).desc() 64 for c in (ranking_key_desc if ranking_key_desc else []) 65 ] # type: ignore 66 + [ 67 col(c).asc() 68 for c in (ranking_key_asc if ranking_key_asc else []) 69 ] # type: ignore 70 ) 71 72 return ( 73 df.withColumn("ranking", row_number().over(partition_window)) 74 .filter( 75 col(record_mode_col).isNull() 76 | col(record_mode_col).isin(valid_record_modes) 77 ) 78 .filter(col("ranking") == 1) 79 .drop("ranking") 80 ) 81 else: 82 raise UnsupportedStreamingTransformerException( 83 "Transformer condense_record_mode_cdc is not supported in " 84 "streaming mode." 85 ) 86 87 return inner
Condense Change Data Capture (CDC) based on record_mode strategy.
This CDC data is particularly seen in some CDC enabled systems. Other systems may have different CDC strategies.
Arguments:
- business_key: The business key (logical primary key) of the data.
- ranking_key_desc: In this type of CDC condensation the data needs to be in descending order in a certain way, using columns specified in this parameter.
- ranking_key_asc: In this type of CDC condensation the data needs to be in ascending order in a certain way, using columns specified in this parameter.
- record_mode_col: Name of the record mode input_col.
- valid_record_modes: Depending on the context, not all record modes may be considered for condensation. Use this parameter to skip those.
Returns:
A function to be executed in the .transform() spark function.
View Example
20{ 21 "function": "condense_record_mode_cdc", 22 "args": { 23 "business_key": [ 24 "salesorder", 25 "item" 26 ], 27 "ranking_key_desc": [ 28 "extraction_timestamp", 29 "actrequest_timestamp", 30 "datapakid", 31 "partno", 32 "record" 33 ], 34 "record_mode_col": "recordmode", 35 "valid_record_modes": [ 36 "", 37 "N", 38 "R", 39 "D", 40 "X" 41 ] 42 } 43}
@classmethod
def
group_and_rank( cls, group_key: List[str], ranking_key: List[str], descending: bool = True) -> Callable:
89 @classmethod 90 def group_and_rank( 91 cls, group_key: List[str], ranking_key: List[str], descending: bool = True 92 ) -> Callable: 93 """Condense data based on a simple group by + take latest mechanism. 94 95 Args: 96 group_key: list of column names to use in the group by. 97 ranking_key: the data needs to be in descending order using columns 98 specified in this parameter. 99 descending: if the ranking considers descending order or not. Defaults to 100 True. 101 102 Returns: 103 A function to be executed in the .transform() spark function. 104 """ 105 106 def inner(df: DataFrame) -> DataFrame: 107 if not df.isStreaming: 108 partition_window = Window.partitionBy( 109 [col(c) for c in group_key] 110 ).orderBy( 111 [ 112 col(c).desc() if descending else col(c).asc() 113 for c in (ranking_key if ranking_key else []) 114 ] # type: ignore 115 ) 116 117 return ( 118 df.withColumn("ranking", row_number().over(partition_window)) 119 .filter(col("ranking") == 1) 120 .drop("ranking") 121 ) 122 else: 123 raise UnsupportedStreamingTransformerException( 124 "Transformer group_and_rank is not supported in streaming mode." 125 ) 126 127 return inner
Condense data based on a simple group by + take latest mechanism.
Arguments:
- group_key: list of column names to use in the group by.
- ranking_key: the data needs to be in descending order using columns specified in this parameter.
- descending: if the ranking considers descending order or not. Defaults to True.
Returns:
A function to be executed in the .transform() spark function.