lakehouse_engine.transformers.condensers

Condensers module.

  1"""Condensers module."""
  2
  3from typing import Callable, List, Optional
  4
  5from pyspark.sql import DataFrame, Window
  6from pyspark.sql.functions import col, row_number
  7
  8from lakehouse_engine.transformers.exceptions import (
  9    UnsupportedStreamingTransformerException,
 10    WrongArgumentsException,
 11)
 12from lakehouse_engine.utils.logging_handler import LoggingHandler
 13
 14
 15class Condensers(object):
 16    """Class containing all the functions to condensate data for later merges."""
 17
 18    _logger = LoggingHandler(__name__).get_logger()
 19
 20    @classmethod
 21    def condense_record_mode_cdc(
 22        cls,
 23        business_key: List[str],
 24        record_mode_col: str,
 25        valid_record_modes: List[str],
 26        ranking_key_desc: Optional[List[str]] = None,
 27        ranking_key_asc: Optional[List[str]] = None,
 28    ) -> Callable:
 29        """Condense Change Data Capture (CDC) based on record_mode strategy.
 30
 31        This CDC data is particularly seen in some CDC enabled systems. Other systems
 32        may have different CDC strategies.
 33
 34        Args:
 35            business_key: The business key (logical primary key) of the data.
 36            ranking_key_desc: In this type of CDC condensation the data needs to be
 37                in descending order in a certain way, using columns specified in this
 38                parameter.
 39            ranking_key_asc: In this type of CDC condensation the data needs to be
 40                in ascending order in a certain way, using columns specified in
 41                this parameter.
 42            record_mode_col: Name of the record mode input_col.
 43            valid_record_modes: Depending on the context, not all record modes may be
 44                considered for condensation. Use this parameter to skip those.
 45
 46        Returns:
 47            A function to be executed in the .transform() spark function.
 48        """
 49        if not ranking_key_desc and not ranking_key_asc:
 50            raise WrongArgumentsException(
 51                "The condense_record_mode_cdc transformer requires data to be either"
 52                "in descending or ascending order, but no arguments for ordering"
 53                "were provided."
 54            )
 55
 56        def inner(df: DataFrame) -> DataFrame:
 57            if not df.isStreaming:
 58                partition_window = Window.partitionBy(
 59                    [col(c) for c in business_key]
 60                ).orderBy(
 61                    [
 62                        col(c).desc()
 63                        for c in (ranking_key_desc if ranking_key_desc else [])
 64                    ]  # type: ignore
 65                    + [
 66                        col(c).asc()
 67                        for c in (ranking_key_asc if ranking_key_asc else [])
 68                    ]  # type: ignore
 69                )
 70
 71                return (
 72                    df.withColumn("ranking", row_number().over(partition_window))
 73                    .filter(
 74                        col(record_mode_col).isNull()
 75                        | col(record_mode_col).isin(valid_record_modes)
 76                    )
 77                    .filter(col("ranking") == 1)
 78                    .drop("ranking")
 79                )
 80            else:
 81                raise UnsupportedStreamingTransformerException(
 82                    "Transformer condense_record_mode_cdc is not supported in "
 83                    "streaming mode."
 84                )
 85
 86        return inner
 87
 88    @classmethod
 89    def group_and_rank(
 90        cls, group_key: List[str], ranking_key: List[str], descending: bool = True
 91    ) -> Callable:
 92        """Condense data based on a simple group by + take latest mechanism.
 93
 94        Args:
 95            group_key: list of column names to use in the group by.
 96            ranking_key: the data needs to be in descending order using columns
 97                specified in this parameter.
 98            descending: if the ranking considers descending order or not. Defaults to
 99                True.
100
101        Returns:
102            A function to be executed in the .transform() spark function.
103        """
104
105        def inner(df: DataFrame) -> DataFrame:
106            if not df.isStreaming:
107                partition_window = Window.partitionBy(
108                    [col(c) for c in group_key]
109                ).orderBy(
110                    [
111                        col(c).desc() if descending else col(c).asc()
112                        for c in (ranking_key if ranking_key else [])
113                    ]  # type: ignore
114                )
115
116                return (
117                    df.withColumn("ranking", row_number().over(partition_window))
118                    .filter(col("ranking") == 1)
119                    .drop("ranking")
120                )
121            else:
122                raise UnsupportedStreamingTransformerException(
123                    "Transformer group_and_rank is not supported in streaming mode."
124                )
125
126        return inner
class Condensers:
 16class Condensers(object):
 17    """Class containing all the functions to condensate data for later merges."""
 18
 19    _logger = LoggingHandler(__name__).get_logger()
 20
 21    @classmethod
 22    def condense_record_mode_cdc(
 23        cls,
 24        business_key: List[str],
 25        record_mode_col: str,
 26        valid_record_modes: List[str],
 27        ranking_key_desc: Optional[List[str]] = None,
 28        ranking_key_asc: Optional[List[str]] = None,
 29    ) -> Callable:
 30        """Condense Change Data Capture (CDC) based on record_mode strategy.
 31
 32        This CDC data is particularly seen in some CDC enabled systems. Other systems
 33        may have different CDC strategies.
 34
 35        Args:
 36            business_key: The business key (logical primary key) of the data.
 37            ranking_key_desc: In this type of CDC condensation the data needs to be
 38                in descending order in a certain way, using columns specified in this
 39                parameter.
 40            ranking_key_asc: In this type of CDC condensation the data needs to be
 41                in ascending order in a certain way, using columns specified in
 42                this parameter.
 43            record_mode_col: Name of the record mode input_col.
 44            valid_record_modes: Depending on the context, not all record modes may be
 45                considered for condensation. Use this parameter to skip those.
 46
 47        Returns:
 48            A function to be executed in the .transform() spark function.
 49        """
 50        if not ranking_key_desc and not ranking_key_asc:
 51            raise WrongArgumentsException(
 52                "The condense_record_mode_cdc transformer requires data to be either"
 53                "in descending or ascending order, but no arguments for ordering"
 54                "were provided."
 55            )
 56
 57        def inner(df: DataFrame) -> DataFrame:
 58            if not df.isStreaming:
 59                partition_window = Window.partitionBy(
 60                    [col(c) for c in business_key]
 61                ).orderBy(
 62                    [
 63                        col(c).desc()
 64                        for c in (ranking_key_desc if ranking_key_desc else [])
 65                    ]  # type: ignore
 66                    + [
 67                        col(c).asc()
 68                        for c in (ranking_key_asc if ranking_key_asc else [])
 69                    ]  # type: ignore
 70                )
 71
 72                return (
 73                    df.withColumn("ranking", row_number().over(partition_window))
 74                    .filter(
 75                        col(record_mode_col).isNull()
 76                        | col(record_mode_col).isin(valid_record_modes)
 77                    )
 78                    .filter(col("ranking") == 1)
 79                    .drop("ranking")
 80                )
 81            else:
 82                raise UnsupportedStreamingTransformerException(
 83                    "Transformer condense_record_mode_cdc is not supported in "
 84                    "streaming mode."
 85                )
 86
 87        return inner
 88
 89    @classmethod
 90    def group_and_rank(
 91        cls, group_key: List[str], ranking_key: List[str], descending: bool = True
 92    ) -> Callable:
 93        """Condense data based on a simple group by + take latest mechanism.
 94
 95        Args:
 96            group_key: list of column names to use in the group by.
 97            ranking_key: the data needs to be in descending order using columns
 98                specified in this parameter.
 99            descending: if the ranking considers descending order or not. Defaults to
100                True.
101
102        Returns:
103            A function to be executed in the .transform() spark function.
104        """
105
106        def inner(df: DataFrame) -> DataFrame:
107            if not df.isStreaming:
108                partition_window = Window.partitionBy(
109                    [col(c) for c in group_key]
110                ).orderBy(
111                    [
112                        col(c).desc() if descending else col(c).asc()
113                        for c in (ranking_key if ranking_key else [])
114                    ]  # type: ignore
115                )
116
117                return (
118                    df.withColumn("ranking", row_number().over(partition_window))
119                    .filter(col("ranking") == 1)
120                    .drop("ranking")
121                )
122            else:
123                raise UnsupportedStreamingTransformerException(
124                    "Transformer group_and_rank is not supported in streaming mode."
125                )
126
127        return inner

Class containing all the functions to condensate data for later merges.

@classmethod
def condense_record_mode_cdc( cls, business_key: List[str], record_mode_col: str, valid_record_modes: List[str], ranking_key_desc: Optional[List[str]] = None, ranking_key_asc: Optional[List[str]] = None) -> Callable:
21    @classmethod
22    def condense_record_mode_cdc(
23        cls,
24        business_key: List[str],
25        record_mode_col: str,
26        valid_record_modes: List[str],
27        ranking_key_desc: Optional[List[str]] = None,
28        ranking_key_asc: Optional[List[str]] = None,
29    ) -> Callable:
30        """Condense Change Data Capture (CDC) based on record_mode strategy.
31
32        This CDC data is particularly seen in some CDC enabled systems. Other systems
33        may have different CDC strategies.
34
35        Args:
36            business_key: The business key (logical primary key) of the data.
37            ranking_key_desc: In this type of CDC condensation the data needs to be
38                in descending order in a certain way, using columns specified in this
39                parameter.
40            ranking_key_asc: In this type of CDC condensation the data needs to be
41                in ascending order in a certain way, using columns specified in
42                this parameter.
43            record_mode_col: Name of the record mode input_col.
44            valid_record_modes: Depending on the context, not all record modes may be
45                considered for condensation. Use this parameter to skip those.
46
47        Returns:
48            A function to be executed in the .transform() spark function.
49        """
50        if not ranking_key_desc and not ranking_key_asc:
51            raise WrongArgumentsException(
52                "The condense_record_mode_cdc transformer requires data to be either"
53                "in descending or ascending order, but no arguments for ordering"
54                "were provided."
55            )
56
57        def inner(df: DataFrame) -> DataFrame:
58            if not df.isStreaming:
59                partition_window = Window.partitionBy(
60                    [col(c) for c in business_key]
61                ).orderBy(
62                    [
63                        col(c).desc()
64                        for c in (ranking_key_desc if ranking_key_desc else [])
65                    ]  # type: ignore
66                    + [
67                        col(c).asc()
68                        for c in (ranking_key_asc if ranking_key_asc else [])
69                    ]  # type: ignore
70                )
71
72                return (
73                    df.withColumn("ranking", row_number().over(partition_window))
74                    .filter(
75                        col(record_mode_col).isNull()
76                        | col(record_mode_col).isin(valid_record_modes)
77                    )
78                    .filter(col("ranking") == 1)
79                    .drop("ranking")
80                )
81            else:
82                raise UnsupportedStreamingTransformerException(
83                    "Transformer condense_record_mode_cdc is not supported in "
84                    "streaming mode."
85                )
86
87        return inner

Condense Change Data Capture (CDC) based on record_mode strategy.

This CDC data is particularly seen in some CDC enabled systems. Other systems may have different CDC strategies.

Arguments:
  • business_key: The business key (logical primary key) of the data.
  • ranking_key_desc: In this type of CDC condensation the data needs to be in descending order in a certain way, using columns specified in this parameter.
  • ranking_key_asc: In this type of CDC condensation the data needs to be in ascending order in a certain way, using columns specified in this parameter.
  • record_mode_col: Name of the record mode input_col.
  • valid_record_modes: Depending on the context, not all record modes may be considered for condensation. Use this parameter to skip those.
Returns:

A function to be executed in the .transform() spark function.

View Example
20{
21    "function": "condense_record_mode_cdc",
22    "args": {
23        "business_key": [
24            "salesorder",
25            "item"
26        ],
27        "ranking_key_desc": [
28            "extraction_timestamp",
29            "actrequest_timestamp",
30            "datapakid",
31            "partno",
32            "record"
33        ],
34        "record_mode_col": "recordmode",
35        "valid_record_modes": [
36            "",
37            "N",
38            "R",
39            "D",
40            "X"
41        ]
42    }
43}
View Full Acon


@classmethod
def group_and_rank( cls, group_key: List[str], ranking_key: List[str], descending: bool = True) -> Callable:
 89    @classmethod
 90    def group_and_rank(
 91        cls, group_key: List[str], ranking_key: List[str], descending: bool = True
 92    ) -> Callable:
 93        """Condense data based on a simple group by + take latest mechanism.
 94
 95        Args:
 96            group_key: list of column names to use in the group by.
 97            ranking_key: the data needs to be in descending order using columns
 98                specified in this parameter.
 99            descending: if the ranking considers descending order or not. Defaults to
100                True.
101
102        Returns:
103            A function to be executed in the .transform() spark function.
104        """
105
106        def inner(df: DataFrame) -> DataFrame:
107            if not df.isStreaming:
108                partition_window = Window.partitionBy(
109                    [col(c) for c in group_key]
110                ).orderBy(
111                    [
112                        col(c).desc() if descending else col(c).asc()
113                        for c in (ranking_key if ranking_key else [])
114                    ]  # type: ignore
115                )
116
117                return (
118                    df.withColumn("ranking", row_number().over(partition_window))
119                    .filter(col("ranking") == 1)
120                    .drop("ranking")
121                )
122            else:
123                raise UnsupportedStreamingTransformerException(
124                    "Transformer group_and_rank is not supported in streaming mode."
125                )
126
127        return inner

Condense data based on a simple group by + take latest mechanism.

Arguments:
  • group_key: list of column names to use in the group by.
  • ranking_key: the data needs to be in descending order using columns specified in this parameter.
  • descending: if the ranking considers descending order or not. Defaults to True.
Returns:

A function to be executed in the .transform() spark function.

View Example
59{
60    "function": "group_and_rank",
61    "args": {
62        "group_key": [
63            "salesorder",
64            "item"
65        ],
66        "ranking_key": [
67            "extraction_date",
68            "changed_on",
69            "lhe_row_id"
70        ]
71    }
72}
View Full Acon