Skip to content

Condensers

Condensers module.

Condensers

Bases: object

Class containing all the functions to condensate data for later merges.

Source code in mkdocs/lakehouse_engine/packages/transformers/condensers.py
class Condensers(object):
    """Class containing all the functions to condensate data for later merges."""

    _logger = LoggingHandler(__name__).get_logger()

    @classmethod
    def condense_record_mode_cdc(
        cls,
        business_key: List[str],
        record_mode_col: str,
        valid_record_modes: List[str],
        ranking_key_desc: Optional[List[str]] = None,
        ranking_key_asc: Optional[List[str]] = None,
    ) -> Callable:
        """Condense Change Data Capture (CDC) based on record_mode strategy.

        This CDC data is particularly seen in some CDC enabled systems. Other systems
        may have different CDC strategies.

        Args:
            business_key: The business key (logical primary key) of the data.
            ranking_key_desc: In this type of CDC condensation the data needs to be
                in descending order in a certain way, using columns specified in this
                parameter.
            ranking_key_asc: In this type of CDC condensation the data needs to be
                in ascending order in a certain way, using columns specified in
                this parameter.
            record_mode_col: Name of the record mode input_col.
            valid_record_modes: Depending on the context, not all record modes may be
                considered for condensation. Use this parameter to skip those.

        Returns:
            A function to be executed in the .transform() spark function.

        {{get_example(method_name='condense_record_mode_cdc')}}
        """
        if not ranking_key_desc and not ranking_key_asc:
            raise WrongArgumentsException(
                "The condense_record_mode_cdc transformer requires data to be either"
                "in descending or ascending order, but no arguments for ordering"
                "were provided."
            )

        def inner(df: DataFrame) -> DataFrame:
            if not df.isStreaming:
                partition_window = Window.partitionBy(
                    [col(c) for c in business_key]
                ).orderBy(
                    [
                        col(c).desc()
                        for c in (ranking_key_desc if ranking_key_desc else [])
                    ]  # type: ignore
                    + [
                        col(c).asc()
                        for c in (ranking_key_asc if ranking_key_asc else [])
                    ]  # type: ignore
                )

                return (
                    df.withColumn("ranking", row_number().over(partition_window))
                    .filter(
                        col(record_mode_col).isNull()
                        | col(record_mode_col).isin(valid_record_modes)
                    )
                    .filter(col("ranking") == 1)
                    .drop("ranking")
                )
            else:
                raise UnsupportedStreamingTransformerException(
                    "Transformer condense_record_mode_cdc is not supported in "
                    "streaming mode."
                )

        return inner

    @classmethod
    def group_and_rank(
        cls, group_key: List[str], ranking_key: List[str], descending: bool = True
    ) -> Callable:
        """Condense data based on a simple group by + take latest mechanism.

        Args:
            group_key: list of column names to use in the group by.
            ranking_key: the data needs to be in descending order using columns
                specified in this parameter.
            descending: if the ranking considers descending order or not. Defaults to
                True.

        Returns:
            A function to be executed in the .transform() spark function.

        {{get_example(method_name='group_and_rank')}}
        """

        def inner(df: DataFrame) -> DataFrame:
            if not df.isStreaming:
                partition_window = Window.partitionBy(
                    [col(c) for c in group_key]
                ).orderBy(
                    [
                        col(c).desc() if descending else col(c).asc()
                        for c in (ranking_key if ranking_key else [])
                    ]  # type: ignore
                )

                return (
                    df.withColumn("ranking", row_number().over(partition_window))
                    .filter(col("ranking") == 1)
                    .drop("ranking")
                )
            else:
                raise UnsupportedStreamingTransformerException(
                    "Transformer group_and_rank is not supported in streaming mode."
                )

        return inner

condense_record_mode_cdc(business_key, record_mode_col, valid_record_modes, ranking_key_desc=None, ranking_key_asc=None) classmethod

Condense Change Data Capture (CDC) based on record_mode strategy.

This CDC data is particularly seen in some CDC enabled systems. Other systems may have different CDC strategies.

Parameters:

Name Type Description Default
business_key List[str]

The business key (logical primary key) of the data.

required
ranking_key_desc Optional[List[str]]

In this type of CDC condensation the data needs to be in descending order in a certain way, using columns specified in this parameter.

None
ranking_key_asc Optional[List[str]]

In this type of CDC condensation the data needs to be in ascending order in a certain way, using columns specified in this parameter.

None
record_mode_col str

Name of the record mode input_col.

required
valid_record_modes List[str]

Depending on the context, not all record modes may be considered for condensation. Use this parameter to skip those.

required

Returns:

Type Description
Callable

A function to be executed in the .transform() spark function.

View Example of condense_record_mode_cdc (See full example here)
20{
21    "function": "condense_record_mode_cdc",
22    "args": {
23        "business_key": [
24            "salesorder",
25            "item"
26        ],
27        "ranking_key_desc": [
28            "extraction_timestamp",
29            "actrequest_timestamp",
30            "datapakid",
31            "partno",
32            "record"
33        ],
34        "record_mode_col": "recordmode",
35        "valid_record_modes": [
36            "",
37            "N",
38            "R",
39            "D",
40            "X"
41        ]
42    }
43}
Source code in mkdocs/lakehouse_engine/packages/transformers/condensers.py
@classmethod
def condense_record_mode_cdc(
    cls,
    business_key: List[str],
    record_mode_col: str,
    valid_record_modes: List[str],
    ranking_key_desc: Optional[List[str]] = None,
    ranking_key_asc: Optional[List[str]] = None,
) -> Callable:
    """Condense Change Data Capture (CDC) based on record_mode strategy.

    This CDC data is particularly seen in some CDC enabled systems. Other systems
    may have different CDC strategies.

    Args:
        business_key: The business key (logical primary key) of the data.
        ranking_key_desc: In this type of CDC condensation the data needs to be
            in descending order in a certain way, using columns specified in this
            parameter.
        ranking_key_asc: In this type of CDC condensation the data needs to be
            in ascending order in a certain way, using columns specified in
            this parameter.
        record_mode_col: Name of the record mode input_col.
        valid_record_modes: Depending on the context, not all record modes may be
            considered for condensation. Use this parameter to skip those.

    Returns:
        A function to be executed in the .transform() spark function.

    {{get_example(method_name='condense_record_mode_cdc')}}
    """
    if not ranking_key_desc and not ranking_key_asc:
        raise WrongArgumentsException(
            "The condense_record_mode_cdc transformer requires data to be either"
            "in descending or ascending order, but no arguments for ordering"
            "were provided."
        )

    def inner(df: DataFrame) -> DataFrame:
        if not df.isStreaming:
            partition_window = Window.partitionBy(
                [col(c) for c in business_key]
            ).orderBy(
                [
                    col(c).desc()
                    for c in (ranking_key_desc if ranking_key_desc else [])
                ]  # type: ignore
                + [
                    col(c).asc()
                    for c in (ranking_key_asc if ranking_key_asc else [])
                ]  # type: ignore
            )

            return (
                df.withColumn("ranking", row_number().over(partition_window))
                .filter(
                    col(record_mode_col).isNull()
                    | col(record_mode_col).isin(valid_record_modes)
                )
                .filter(col("ranking") == 1)
                .drop("ranking")
            )
        else:
            raise UnsupportedStreamingTransformerException(
                "Transformer condense_record_mode_cdc is not supported in "
                "streaming mode."
            )

    return inner

group_and_rank(group_key, ranking_key, descending=True) classmethod

Condense data based on a simple group by + take latest mechanism.

Parameters:

Name Type Description Default
group_key List[str]

list of column names to use in the group by.

required
ranking_key List[str]

the data needs to be in descending order using columns specified in this parameter.

required
descending bool

if the ranking considers descending order or not. Defaults to True.

True

Returns:

Type Description
Callable

A function to be executed in the .transform() spark function.

View Example of group_and_rank (See full example here)
59{
60    "function": "group_and_rank",
61    "args": {
62        "group_key": [
63            "salesorder",
64            "item"
65        ],
66        "ranking_key": [
67            "extraction_date",
68            "changed_on",
69            "lhe_row_id"
70        ]
71    }
72}
Source code in mkdocs/lakehouse_engine/packages/transformers/condensers.py
@classmethod
def group_and_rank(
    cls, group_key: List[str], ranking_key: List[str], descending: bool = True
) -> Callable:
    """Condense data based on a simple group by + take latest mechanism.

    Args:
        group_key: list of column names to use in the group by.
        ranking_key: the data needs to be in descending order using columns
            specified in this parameter.
        descending: if the ranking considers descending order or not. Defaults to
            True.

    Returns:
        A function to be executed in the .transform() spark function.

    {{get_example(method_name='group_and_rank')}}
    """

    def inner(df: DataFrame) -> DataFrame:
        if not df.isStreaming:
            partition_window = Window.partitionBy(
                [col(c) for c in group_key]
            ).orderBy(
                [
                    col(c).desc() if descending else col(c).asc()
                    for c in (ranking_key if ranking_key else [])
                ]  # type: ignore
            )

            return (
                df.withColumn("ranking", row_number().over(partition_window))
                .filter(col("ranking") == 1)
                .drop("ranking")
            )
        else:
            raise UnsupportedStreamingTransformerException(
                "Transformer group_and_rank is not supported in streaming mode."
            )

    return inner