lakehouse_engine.transformers.filters

Module containing the filters transformers.

  1"""Module containing the filters transformers."""
  2
  3from typing import Any, Callable, List, Optional
  4
  5from pyspark.sql import DataFrame
  6from pyspark.sql.functions import col
  7
  8from lakehouse_engine.transformers.watermarker import Watermarker
  9from lakehouse_engine.utils.logging_handler import LoggingHandler
 10
 11
 12class Filters(object):
 13    """Class containing the filters transformers."""
 14
 15    _logger = LoggingHandler(__name__).get_logger()
 16
 17    @classmethod
 18    def incremental_filter(
 19        cls,
 20        input_col: str,
 21        increment_value: Optional[Any] = None,
 22        increment_df: Optional[DataFrame] = None,
 23        increment_col: str = "latest",
 24        greater_or_equal: bool = False,
 25    ) -> Callable:
 26        """Incrementally Filter a certain dataframe given an increment logic.
 27
 28        This logic can either be an increment value or an increment dataframe from
 29        which the get the latest value from. By default, the operator for the
 30        filtering process is greater or equal to cover cases where we receive late
 31        arriving data not cover in a previous load. You can change greater_or_equal
 32        to false to use greater, when you trust the source will never output more data
 33        with the increment after you have load the data (e.g., you will never load
 34        data until the source is still dumping data, which may cause you to get an
 35        incomplete picture of the last arrived data).
 36
 37        Args:
 38            input_col: input column name
 39            increment_value: value to which to filter the data, considering the
 40                provided input_Col.
 41            increment_df: a dataframe to get the increment value from.
 42                you either specify this or the increment_value (this takes precedence).
 43                This is a good approach to get the latest value from a given dataframe
 44                that was read and apply that value as filter here. In this way you can
 45                perform incremental loads based on the last value of a given dataframe
 46                (e.g., table or file based). Can be used together with the
 47                get_max_value transformer to accomplish these incremental based loads.
 48                See our append load feature tests  to see how to provide an acon for
 49                incremental loads, taking advantage of the scenario explained here.
 50            increment_col: name of the column from which to get the increment
 51                value from (when using increment_df approach). This assumes there's
 52                only one row in the increment_df, reason why is a good idea to use
 53                together with the get_max_value transformer. Defaults to "latest"
 54                because that's the default output column name provided by the
 55                get_max_value transformer.
 56            greater_or_equal: if filtering should be done by also including the
 57                increment value or not (useful for scenarios where you are performing
 58                increment loads but still want to include data considering the increment
 59                value, and not only values greater than that increment... examples may
 60                include scenarios where you already loaded data including those values,
 61                but the source produced more data containing those values).
 62                Defaults to false.
 63
 64        Returns:
 65            A function to be called in .transform() spark function.
 66        """
 67
 68        def inner(df: DataFrame) -> DataFrame:
 69            if increment_df:
 70                if greater_or_equal:
 71                    return df.filter(  # type: ignore
 72                        col(input_col) >= increment_df.collect()[0][increment_col]
 73                    )
 74                else:
 75                    return df.filter(  # type: ignore
 76                        col(input_col) > increment_df.collect()[0][increment_col]
 77                    )
 78            else:
 79                if greater_or_equal:
 80                    return df.filter(col(input_col) >= increment_value)  # type: ignore
 81                else:
 82                    return df.filter(col(input_col) > increment_value)  # type: ignore
 83
 84        return inner
 85
 86    @staticmethod
 87    def expression_filter(exp: str) -> Callable:
 88        """Filter a dataframe based on an expression.
 89
 90        Args:
 91            exp: filter expression.
 92
 93        Returns:
 94            A function to be called in .transform() spark function.
 95        """
 96
 97        def inner(df: DataFrame) -> DataFrame:
 98            return df.filter(exp)  # type: ignore
 99
100        return inner
101
102    @staticmethod
103    def column_filter_exp(exp: List[str]) -> Callable:
104        """Filter a dataframe's columns based on a list of SQL expressions.
105
106        Args:
107            exp: column filter expressions.
108
109        Returns:
110            A function to be called in .transform() spark function.
111        """
112
113        def inner(df: DataFrame) -> DataFrame:
114            return df.selectExpr(*exp)  # type: ignore
115
116        return inner
117
118    @staticmethod
119    def drop_duplicate_rows(
120        cols: List[str] = None, watermarker: dict = None
121    ) -> Callable:
122        """Drop duplicate rows using spark function dropDuplicates().
123
124        This transformer can be used with or without arguments.
125        The provided argument needs to be a list of columns.
126        For example: [“Name”,”VAT”] will drop duplicate records within
127        "Name" and "VAT" columns.
128        If the transformer is used without providing any columns list or providing
129        an empty list, such as [] the result will be the same as using
130        the distinct() pyspark function. If the watermark dict is present it will
131        ensure that the drop operation will apply to rows within the watermark timeline
132        window.
133
134
135        Args:
136            cols: column names.
137            watermarker: properties to apply watermarker to the transformer.
138
139        Returns:
140            A function to be called in .transform() spark function.
141        """
142
143        def inner(df: DataFrame) -> DataFrame:
144            if watermarker:
145                df = Watermarker.with_watermark(
146                    watermarker["col"], watermarker["watermarking_time"]
147                )(df)
148            if not cols:
149                return df.dropDuplicates()
150            else:
151                return df.dropDuplicates(cols)
152
153        return inner
class Filters:
 13class Filters(object):
 14    """Class containing the filters transformers."""
 15
 16    _logger = LoggingHandler(__name__).get_logger()
 17
 18    @classmethod
 19    def incremental_filter(
 20        cls,
 21        input_col: str,
 22        increment_value: Optional[Any] = None,
 23        increment_df: Optional[DataFrame] = None,
 24        increment_col: str = "latest",
 25        greater_or_equal: bool = False,
 26    ) -> Callable:
 27        """Incrementally Filter a certain dataframe given an increment logic.
 28
 29        This logic can either be an increment value or an increment dataframe from
 30        which the get the latest value from. By default, the operator for the
 31        filtering process is greater or equal to cover cases where we receive late
 32        arriving data not cover in a previous load. You can change greater_or_equal
 33        to false to use greater, when you trust the source will never output more data
 34        with the increment after you have load the data (e.g., you will never load
 35        data until the source is still dumping data, which may cause you to get an
 36        incomplete picture of the last arrived data).
 37
 38        Args:
 39            input_col: input column name
 40            increment_value: value to which to filter the data, considering the
 41                provided input_Col.
 42            increment_df: a dataframe to get the increment value from.
 43                you either specify this or the increment_value (this takes precedence).
 44                This is a good approach to get the latest value from a given dataframe
 45                that was read and apply that value as filter here. In this way you can
 46                perform incremental loads based on the last value of a given dataframe
 47                (e.g., table or file based). Can be used together with the
 48                get_max_value transformer to accomplish these incremental based loads.
 49                See our append load feature tests  to see how to provide an acon for
 50                incremental loads, taking advantage of the scenario explained here.
 51            increment_col: name of the column from which to get the increment
 52                value from (when using increment_df approach). This assumes there's
 53                only one row in the increment_df, reason why is a good idea to use
 54                together with the get_max_value transformer. Defaults to "latest"
 55                because that's the default output column name provided by the
 56                get_max_value transformer.
 57            greater_or_equal: if filtering should be done by also including the
 58                increment value or not (useful for scenarios where you are performing
 59                increment loads but still want to include data considering the increment
 60                value, and not only values greater than that increment... examples may
 61                include scenarios where you already loaded data including those values,
 62                but the source produced more data containing those values).
 63                Defaults to false.
 64
 65        Returns:
 66            A function to be called in .transform() spark function.
 67        """
 68
 69        def inner(df: DataFrame) -> DataFrame:
 70            if increment_df:
 71                if greater_or_equal:
 72                    return df.filter(  # type: ignore
 73                        col(input_col) >= increment_df.collect()[0][increment_col]
 74                    )
 75                else:
 76                    return df.filter(  # type: ignore
 77                        col(input_col) > increment_df.collect()[0][increment_col]
 78                    )
 79            else:
 80                if greater_or_equal:
 81                    return df.filter(col(input_col) >= increment_value)  # type: ignore
 82                else:
 83                    return df.filter(col(input_col) > increment_value)  # type: ignore
 84
 85        return inner
 86
 87    @staticmethod
 88    def expression_filter(exp: str) -> Callable:
 89        """Filter a dataframe based on an expression.
 90
 91        Args:
 92            exp: filter expression.
 93
 94        Returns:
 95            A function to be called in .transform() spark function.
 96        """
 97
 98        def inner(df: DataFrame) -> DataFrame:
 99            return df.filter(exp)  # type: ignore
100
101        return inner
102
103    @staticmethod
104    def column_filter_exp(exp: List[str]) -> Callable:
105        """Filter a dataframe's columns based on a list of SQL expressions.
106
107        Args:
108            exp: column filter expressions.
109
110        Returns:
111            A function to be called in .transform() spark function.
112        """
113
114        def inner(df: DataFrame) -> DataFrame:
115            return df.selectExpr(*exp)  # type: ignore
116
117        return inner
118
119    @staticmethod
120    def drop_duplicate_rows(
121        cols: List[str] = None, watermarker: dict = None
122    ) -> Callable:
123        """Drop duplicate rows using spark function dropDuplicates().
124
125        This transformer can be used with or without arguments.
126        The provided argument needs to be a list of columns.
127        For example: [“Name”,”VAT”] will drop duplicate records within
128        "Name" and "VAT" columns.
129        If the transformer is used without providing any columns list or providing
130        an empty list, such as [] the result will be the same as using
131        the distinct() pyspark function. If the watermark dict is present it will
132        ensure that the drop operation will apply to rows within the watermark timeline
133        window.
134
135
136        Args:
137            cols: column names.
138            watermarker: properties to apply watermarker to the transformer.
139
140        Returns:
141            A function to be called in .transform() spark function.
142        """
143
144        def inner(df: DataFrame) -> DataFrame:
145            if watermarker:
146                df = Watermarker.with_watermark(
147                    watermarker["col"], watermarker["watermarking_time"]
148                )(df)
149            if not cols:
150                return df.dropDuplicates()
151            else:
152                return df.dropDuplicates(cols)
153
154        return inner

Class containing the filters transformers.

@classmethod
def incremental_filter( cls, input_col: str, increment_value: Optional[Any] = None, increment_df: Optional[pyspark.sql.dataframe.DataFrame] = None, increment_col: str = 'latest', greater_or_equal: bool = False) -> Callable:
18    @classmethod
19    def incremental_filter(
20        cls,
21        input_col: str,
22        increment_value: Optional[Any] = None,
23        increment_df: Optional[DataFrame] = None,
24        increment_col: str = "latest",
25        greater_or_equal: bool = False,
26    ) -> Callable:
27        """Incrementally Filter a certain dataframe given an increment logic.
28
29        This logic can either be an increment value or an increment dataframe from
30        which the get the latest value from. By default, the operator for the
31        filtering process is greater or equal to cover cases where we receive late
32        arriving data not cover in a previous load. You can change greater_or_equal
33        to false to use greater, when you trust the source will never output more data
34        with the increment after you have load the data (e.g., you will never load
35        data until the source is still dumping data, which may cause you to get an
36        incomplete picture of the last arrived data).
37
38        Args:
39            input_col: input column name
40            increment_value: value to which to filter the data, considering the
41                provided input_Col.
42            increment_df: a dataframe to get the increment value from.
43                you either specify this or the increment_value (this takes precedence).
44                This is a good approach to get the latest value from a given dataframe
45                that was read and apply that value as filter here. In this way you can
46                perform incremental loads based on the last value of a given dataframe
47                (e.g., table or file based). Can be used together with the
48                get_max_value transformer to accomplish these incremental based loads.
49                See our append load feature tests  to see how to provide an acon for
50                incremental loads, taking advantage of the scenario explained here.
51            increment_col: name of the column from which to get the increment
52                value from (when using increment_df approach). This assumes there's
53                only one row in the increment_df, reason why is a good idea to use
54                together with the get_max_value transformer. Defaults to "latest"
55                because that's the default output column name provided by the
56                get_max_value transformer.
57            greater_or_equal: if filtering should be done by also including the
58                increment value or not (useful for scenarios where you are performing
59                increment loads but still want to include data considering the increment
60                value, and not only values greater than that increment... examples may
61                include scenarios where you already loaded data including those values,
62                but the source produced more data containing those values).
63                Defaults to false.
64
65        Returns:
66            A function to be called in .transform() spark function.
67        """
68
69        def inner(df: DataFrame) -> DataFrame:
70            if increment_df:
71                if greater_or_equal:
72                    return df.filter(  # type: ignore
73                        col(input_col) >= increment_df.collect()[0][increment_col]
74                    )
75                else:
76                    return df.filter(  # type: ignore
77                        col(input_col) > increment_df.collect()[0][increment_col]
78                    )
79            else:
80                if greater_or_equal:
81                    return df.filter(col(input_col) >= increment_value)  # type: ignore
82                else:
83                    return df.filter(col(input_col) > increment_value)  # type: ignore
84
85        return inner

Incrementally Filter a certain dataframe given an increment logic.

This logic can either be an increment value or an increment dataframe from which the get the latest value from. By default, the operator for the filtering process is greater or equal to cover cases where we receive late arriving data not cover in a previous load. You can change greater_or_equal to false to use greater, when you trust the source will never output more data with the increment after you have load the data (e.g., you will never load data until the source is still dumping data, which may cause you to get an incomplete picture of the last arrived data).

Arguments:
  • input_col: input column name
  • increment_value: value to which to filter the data, considering the provided input_Col.
  • increment_df: a dataframe to get the increment value from. you either specify this or the increment_value (this takes precedence). This is a good approach to get the latest value from a given dataframe that was read and apply that value as filter here. In this way you can perform incremental loads based on the last value of a given dataframe (e.g., table or file based). Can be used together with the get_max_value transformer to accomplish these incremental based loads. See our append load feature tests to see how to provide an acon for incremental loads, taking advantage of the scenario explained here.
  • increment_col: name of the column from which to get the increment value from (when using increment_df approach). This assumes there's only one row in the increment_df, reason why is a good idea to use together with the get_max_value transformer. Defaults to "latest" because that's the default output column name provided by the get_max_value transformer.
  • greater_or_equal: if filtering should be done by also including the increment value or not (useful for scenarios where you are performing increment loads but still want to include data considering the increment value, and not only values greater than that increment... examples may include scenarios where you already loaded data including those values, but the source produced more data containing those values). Defaults to false.
Returns:

A function to be called in .transform() spark function.

View Example
38{
39    "function": "incremental_filter",
40    "args": {
41        "input_col": "actrequest_timestamp",
42        "increment_df": "max_sales_bronze_timestamp"
43    }
44}
View Full Acon


@staticmethod
def expression_filter(exp: str) -> Callable:
 87    @staticmethod
 88    def expression_filter(exp: str) -> Callable:
 89        """Filter a dataframe based on an expression.
 90
 91        Args:
 92            exp: filter expression.
 93
 94        Returns:
 95            A function to be called in .transform() spark function.
 96        """
 97
 98        def inner(df: DataFrame) -> DataFrame:
 99            return df.filter(exp)  # type: ignore
100
101        return inner

Filter a dataframe based on an expression.

Arguments:
  • exp: filter expression.
Returns:

A function to be called in .transform() spark function.

View Example
20{
21    "function": "expression_filter",
22    "args": {
23        "exp": "date like '2016%'"
24    }
25}
View Full Acon


@staticmethod
def column_filter_exp(exp: List[str]) -> Callable:
103    @staticmethod
104    def column_filter_exp(exp: List[str]) -> Callable:
105        """Filter a dataframe's columns based on a list of SQL expressions.
106
107        Args:
108            exp: column filter expressions.
109
110        Returns:
111            A function to be called in .transform() spark function.
112        """
113
114        def inner(df: DataFrame) -> DataFrame:
115            return df.selectExpr(*exp)  # type: ignore
116
117        return inner

Filter a dataframe's columns based on a list of SQL expressions.

Arguments:
  • exp: column filter expressions.
Returns:

A function to be called in .transform() spark function.

View Example
21{
22    "function": "column_filter_exp",
23    "args": {
24        "exp": [
25            "date",
26            "country",
27            "customer_number"
28        ]
29    }
30}
View Full Acon


@staticmethod
def drop_duplicate_rows(cols: List[str] = None, watermarker: dict = None) -> Callable:
119    @staticmethod
120    def drop_duplicate_rows(
121        cols: List[str] = None, watermarker: dict = None
122    ) -> Callable:
123        """Drop duplicate rows using spark function dropDuplicates().
124
125        This transformer can be used with or without arguments.
126        The provided argument needs to be a list of columns.
127        For example: [“Name”,”VAT”] will drop duplicate records within
128        "Name" and "VAT" columns.
129        If the transformer is used without providing any columns list or providing
130        an empty list, such as [] the result will be the same as using
131        the distinct() pyspark function. If the watermark dict is present it will
132        ensure that the drop operation will apply to rows within the watermark timeline
133        window.
134
135
136        Args:
137            cols: column names.
138            watermarker: properties to apply watermarker to the transformer.
139
140        Returns:
141            A function to be called in .transform() spark function.
142        """
143
144        def inner(df: DataFrame) -> DataFrame:
145            if watermarker:
146                df = Watermarker.with_watermark(
147                    watermarker["col"], watermarker["watermarking_time"]
148                )(df)
149            if not cols:
150                return df.dropDuplicates()
151            else:
152                return df.dropDuplicates(cols)
153
154        return inner

Drop duplicate rows using spark function dropDuplicates().

This transformer can be used with or without arguments. The provided argument needs to be a list of columns. For example: [“Name”,”VAT”] will drop duplicate records within "Name" and "VAT" columns. If the transformer is used without providing any columns list or providing an empty list, such as [] the result will be the same as using the distinct() pyspark function. If the watermark dict is present it will ensure that the drop operation will apply to rows within the watermark timeline window.

Arguments:
  • cols: column names.
  • watermarker: properties to apply watermarker to the transformer.
Returns:

A function to be called in .transform() spark function.