lakehouse_engine.transformers.filters
Module containing the filters transformers.
1"""Module containing the filters transformers.""" 2 3from typing import Any, Callable, List, Optional 4 5from pyspark.sql import DataFrame 6from pyspark.sql.functions import col 7 8from lakehouse_engine.transformers.watermarker import Watermarker 9from lakehouse_engine.utils.logging_handler import LoggingHandler 10 11 12class Filters(object): 13 """Class containing the filters transformers.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def incremental_filter( 19 cls, 20 input_col: str, 21 increment_value: Optional[Any] = None, 22 increment_df: Optional[DataFrame] = None, 23 increment_col: str = "latest", 24 greater_or_equal: bool = False, 25 ) -> Callable: 26 """Incrementally Filter a certain dataframe given an increment logic. 27 28 This logic can either be an increment value or an increment dataframe from 29 which the get the latest value from. By default, the operator for the 30 filtering process is greater or equal to cover cases where we receive late 31 arriving data not cover in a previous load. You can change greater_or_equal 32 to false to use greater, when you trust the source will never output more data 33 with the increment after you have load the data (e.g., you will never load 34 data until the source is still dumping data, which may cause you to get an 35 incomplete picture of the last arrived data). 36 37 Args: 38 input_col: input column name 39 increment_value: value to which to filter the data, considering the 40 provided input_Col. 41 increment_df: a dataframe to get the increment value from. 42 you either specify this or the increment_value (this takes precedence). 43 This is a good approach to get the latest value from a given dataframe 44 that was read and apply that value as filter here. In this way you can 45 perform incremental loads based on the last value of a given dataframe 46 (e.g., table or file based). Can be used together with the 47 get_max_value transformer to accomplish these incremental based loads. 48 See our append load feature tests to see how to provide an acon for 49 incremental loads, taking advantage of the scenario explained here. 50 increment_col: name of the column from which to get the increment 51 value from (when using increment_df approach). This assumes there's 52 only one row in the increment_df, reason why is a good idea to use 53 together with the get_max_value transformer. Defaults to "latest" 54 because that's the default output column name provided by the 55 get_max_value transformer. 56 greater_or_equal: if filtering should be done by also including the 57 increment value or not (useful for scenarios where you are performing 58 increment loads but still want to include data considering the increment 59 value, and not only values greater than that increment... examples may 60 include scenarios where you already loaded data including those values, 61 but the source produced more data containing those values). 62 Defaults to false. 63 64 Returns: 65 A function to be called in .transform() spark function. 66 """ 67 68 def inner(df: DataFrame) -> DataFrame: 69 if increment_df: 70 if greater_or_equal: 71 return df.filter( # type: ignore 72 col(input_col) >= increment_df.collect()[0][increment_col] 73 ) 74 else: 75 return df.filter( # type: ignore 76 col(input_col) > increment_df.collect()[0][increment_col] 77 ) 78 else: 79 if greater_or_equal: 80 return df.filter(col(input_col) >= increment_value) # type: ignore 81 else: 82 return df.filter(col(input_col) > increment_value) # type: ignore 83 84 return inner 85 86 @staticmethod 87 def expression_filter(exp: str) -> Callable: 88 """Filter a dataframe based on an expression. 89 90 Args: 91 exp: filter expression. 92 93 Returns: 94 A function to be called in .transform() spark function. 95 """ 96 97 def inner(df: DataFrame) -> DataFrame: 98 return df.filter(exp) # type: ignore 99 100 return inner 101 102 @staticmethod 103 def column_filter_exp(exp: List[str]) -> Callable: 104 """Filter a dataframe's columns based on a list of SQL expressions. 105 106 Args: 107 exp: column filter expressions. 108 109 Returns: 110 A function to be called in .transform() spark function. 111 """ 112 113 def inner(df: DataFrame) -> DataFrame: 114 return df.selectExpr(*exp) # type: ignore 115 116 return inner 117 118 @staticmethod 119 def drop_duplicate_rows( 120 cols: List[str] = None, watermarker: dict = None 121 ) -> Callable: 122 """Drop duplicate rows using spark function dropDuplicates(). 123 124 This transformer can be used with or without arguments. 125 The provided argument needs to be a list of columns. 126 For example: [“Name”,”VAT”] will drop duplicate records within 127 "Name" and "VAT" columns. 128 If the transformer is used without providing any columns list or providing 129 an empty list, such as [] the result will be the same as using 130 the distinct() pyspark function. If the watermark dict is present it will 131 ensure that the drop operation will apply to rows within the watermark timeline 132 window. 133 134 135 Args: 136 cols: column names. 137 watermarker: properties to apply watermarker to the transformer. 138 139 Returns: 140 A function to be called in .transform() spark function. 141 """ 142 143 def inner(df: DataFrame) -> DataFrame: 144 if watermarker: 145 df = Watermarker.with_watermark( 146 watermarker["col"], watermarker["watermarking_time"] 147 )(df) 148 if not cols: 149 return df.dropDuplicates() 150 else: 151 return df.dropDuplicates(cols) 152 153 return inner
13class Filters(object): 14 """Class containing the filters transformers.""" 15 16 _logger = LoggingHandler(__name__).get_logger() 17 18 @classmethod 19 def incremental_filter( 20 cls, 21 input_col: str, 22 increment_value: Optional[Any] = None, 23 increment_df: Optional[DataFrame] = None, 24 increment_col: str = "latest", 25 greater_or_equal: bool = False, 26 ) -> Callable: 27 """Incrementally Filter a certain dataframe given an increment logic. 28 29 This logic can either be an increment value or an increment dataframe from 30 which the get the latest value from. By default, the operator for the 31 filtering process is greater or equal to cover cases where we receive late 32 arriving data not cover in a previous load. You can change greater_or_equal 33 to false to use greater, when you trust the source will never output more data 34 with the increment after you have load the data (e.g., you will never load 35 data until the source is still dumping data, which may cause you to get an 36 incomplete picture of the last arrived data). 37 38 Args: 39 input_col: input column name 40 increment_value: value to which to filter the data, considering the 41 provided input_Col. 42 increment_df: a dataframe to get the increment value from. 43 you either specify this or the increment_value (this takes precedence). 44 This is a good approach to get the latest value from a given dataframe 45 that was read and apply that value as filter here. In this way you can 46 perform incremental loads based on the last value of a given dataframe 47 (e.g., table or file based). Can be used together with the 48 get_max_value transformer to accomplish these incremental based loads. 49 See our append load feature tests to see how to provide an acon for 50 incremental loads, taking advantage of the scenario explained here. 51 increment_col: name of the column from which to get the increment 52 value from (when using increment_df approach). This assumes there's 53 only one row in the increment_df, reason why is a good idea to use 54 together with the get_max_value transformer. Defaults to "latest" 55 because that's the default output column name provided by the 56 get_max_value transformer. 57 greater_or_equal: if filtering should be done by also including the 58 increment value or not (useful for scenarios where you are performing 59 increment loads but still want to include data considering the increment 60 value, and not only values greater than that increment... examples may 61 include scenarios where you already loaded data including those values, 62 but the source produced more data containing those values). 63 Defaults to false. 64 65 Returns: 66 A function to be called in .transform() spark function. 67 """ 68 69 def inner(df: DataFrame) -> DataFrame: 70 if increment_df: 71 if greater_or_equal: 72 return df.filter( # type: ignore 73 col(input_col) >= increment_df.collect()[0][increment_col] 74 ) 75 else: 76 return df.filter( # type: ignore 77 col(input_col) > increment_df.collect()[0][increment_col] 78 ) 79 else: 80 if greater_or_equal: 81 return df.filter(col(input_col) >= increment_value) # type: ignore 82 else: 83 return df.filter(col(input_col) > increment_value) # type: ignore 84 85 return inner 86 87 @staticmethod 88 def expression_filter(exp: str) -> Callable: 89 """Filter a dataframe based on an expression. 90 91 Args: 92 exp: filter expression. 93 94 Returns: 95 A function to be called in .transform() spark function. 96 """ 97 98 def inner(df: DataFrame) -> DataFrame: 99 return df.filter(exp) # type: ignore 100 101 return inner 102 103 @staticmethod 104 def column_filter_exp(exp: List[str]) -> Callable: 105 """Filter a dataframe's columns based on a list of SQL expressions. 106 107 Args: 108 exp: column filter expressions. 109 110 Returns: 111 A function to be called in .transform() spark function. 112 """ 113 114 def inner(df: DataFrame) -> DataFrame: 115 return df.selectExpr(*exp) # type: ignore 116 117 return inner 118 119 @staticmethod 120 def drop_duplicate_rows( 121 cols: List[str] = None, watermarker: dict = None 122 ) -> Callable: 123 """Drop duplicate rows using spark function dropDuplicates(). 124 125 This transformer can be used with or without arguments. 126 The provided argument needs to be a list of columns. 127 For example: [“Name”,”VAT”] will drop duplicate records within 128 "Name" and "VAT" columns. 129 If the transformer is used without providing any columns list or providing 130 an empty list, such as [] the result will be the same as using 131 the distinct() pyspark function. If the watermark dict is present it will 132 ensure that the drop operation will apply to rows within the watermark timeline 133 window. 134 135 136 Args: 137 cols: column names. 138 watermarker: properties to apply watermarker to the transformer. 139 140 Returns: 141 A function to be called in .transform() spark function. 142 """ 143 144 def inner(df: DataFrame) -> DataFrame: 145 if watermarker: 146 df = Watermarker.with_watermark( 147 watermarker["col"], watermarker["watermarking_time"] 148 )(df) 149 if not cols: 150 return df.dropDuplicates() 151 else: 152 return df.dropDuplicates(cols) 153 154 return inner
Class containing the filters transformers.
18 @classmethod 19 def incremental_filter( 20 cls, 21 input_col: str, 22 increment_value: Optional[Any] = None, 23 increment_df: Optional[DataFrame] = None, 24 increment_col: str = "latest", 25 greater_or_equal: bool = False, 26 ) -> Callable: 27 """Incrementally Filter a certain dataframe given an increment logic. 28 29 This logic can either be an increment value or an increment dataframe from 30 which the get the latest value from. By default, the operator for the 31 filtering process is greater or equal to cover cases where we receive late 32 arriving data not cover in a previous load. You can change greater_or_equal 33 to false to use greater, when you trust the source will never output more data 34 with the increment after you have load the data (e.g., you will never load 35 data until the source is still dumping data, which may cause you to get an 36 incomplete picture of the last arrived data). 37 38 Args: 39 input_col: input column name 40 increment_value: value to which to filter the data, considering the 41 provided input_Col. 42 increment_df: a dataframe to get the increment value from. 43 you either specify this or the increment_value (this takes precedence). 44 This is a good approach to get the latest value from a given dataframe 45 that was read and apply that value as filter here. In this way you can 46 perform incremental loads based on the last value of a given dataframe 47 (e.g., table or file based). Can be used together with the 48 get_max_value transformer to accomplish these incremental based loads. 49 See our append load feature tests to see how to provide an acon for 50 incremental loads, taking advantage of the scenario explained here. 51 increment_col: name of the column from which to get the increment 52 value from (when using increment_df approach). This assumes there's 53 only one row in the increment_df, reason why is a good idea to use 54 together with the get_max_value transformer. Defaults to "latest" 55 because that's the default output column name provided by the 56 get_max_value transformer. 57 greater_or_equal: if filtering should be done by also including the 58 increment value or not (useful for scenarios where you are performing 59 increment loads but still want to include data considering the increment 60 value, and not only values greater than that increment... examples may 61 include scenarios where you already loaded data including those values, 62 but the source produced more data containing those values). 63 Defaults to false. 64 65 Returns: 66 A function to be called in .transform() spark function. 67 """ 68 69 def inner(df: DataFrame) -> DataFrame: 70 if increment_df: 71 if greater_or_equal: 72 return df.filter( # type: ignore 73 col(input_col) >= increment_df.collect()[0][increment_col] 74 ) 75 else: 76 return df.filter( # type: ignore 77 col(input_col) > increment_df.collect()[0][increment_col] 78 ) 79 else: 80 if greater_or_equal: 81 return df.filter(col(input_col) >= increment_value) # type: ignore 82 else: 83 return df.filter(col(input_col) > increment_value) # type: ignore 84 85 return inner
Incrementally Filter a certain dataframe given an increment logic.
This logic can either be an increment value or an increment dataframe from which the get the latest value from. By default, the operator for the filtering process is greater or equal to cover cases where we receive late arriving data not cover in a previous load. You can change greater_or_equal to false to use greater, when you trust the source will never output more data with the increment after you have load the data (e.g., you will never load data until the source is still dumping data, which may cause you to get an incomplete picture of the last arrived data).
Arguments:
- input_col: input column name
- increment_value: value to which to filter the data, considering the provided input_Col.
- increment_df: a dataframe to get the increment value from. you either specify this or the increment_value (this takes precedence). This is a good approach to get the latest value from a given dataframe that was read and apply that value as filter here. In this way you can perform incremental loads based on the last value of a given dataframe (e.g., table or file based). Can be used together with the get_max_value transformer to accomplish these incremental based loads. See our append load feature tests to see how to provide an acon for incremental loads, taking advantage of the scenario explained here.
- increment_col: name of the column from which to get the increment value from (when using increment_df approach). This assumes there's only one row in the increment_df, reason why is a good idea to use together with the get_max_value transformer. Defaults to "latest" because that's the default output column name provided by the get_max_value transformer.
- greater_or_equal: if filtering should be done by also including the increment value or not (useful for scenarios where you are performing increment loads but still want to include data considering the increment value, and not only values greater than that increment... examples may include scenarios where you already loaded data including those values, but the source produced more data containing those values). Defaults to false.
Returns:
A function to be called in .transform() spark function.
View Example
87 @staticmethod 88 def expression_filter(exp: str) -> Callable: 89 """Filter a dataframe based on an expression. 90 91 Args: 92 exp: filter expression. 93 94 Returns: 95 A function to be called in .transform() spark function. 96 """ 97 98 def inner(df: DataFrame) -> DataFrame: 99 return df.filter(exp) # type: ignore 100 101 return inner
Filter a dataframe based on an expression.
Arguments:
- exp: filter expression.
Returns:
A function to be called in .transform() spark function.
View Example
103 @staticmethod 104 def column_filter_exp(exp: List[str]) -> Callable: 105 """Filter a dataframe's columns based on a list of SQL expressions. 106 107 Args: 108 exp: column filter expressions. 109 110 Returns: 111 A function to be called in .transform() spark function. 112 """ 113 114 def inner(df: DataFrame) -> DataFrame: 115 return df.selectExpr(*exp) # type: ignore 116 117 return inner
Filter a dataframe's columns based on a list of SQL expressions.
Arguments:
- exp: column filter expressions.
Returns:
A function to be called in .transform() spark function.
View Example
119 @staticmethod 120 def drop_duplicate_rows( 121 cols: List[str] = None, watermarker: dict = None 122 ) -> Callable: 123 """Drop duplicate rows using spark function dropDuplicates(). 124 125 This transformer can be used with or without arguments. 126 The provided argument needs to be a list of columns. 127 For example: [“Name”,”VAT”] will drop duplicate records within 128 "Name" and "VAT" columns. 129 If the transformer is used without providing any columns list or providing 130 an empty list, such as [] the result will be the same as using 131 the distinct() pyspark function. If the watermark dict is present it will 132 ensure that the drop operation will apply to rows within the watermark timeline 133 window. 134 135 136 Args: 137 cols: column names. 138 watermarker: properties to apply watermarker to the transformer. 139 140 Returns: 141 A function to be called in .transform() spark function. 142 """ 143 144 def inner(df: DataFrame) -> DataFrame: 145 if watermarker: 146 df = Watermarker.with_watermark( 147 watermarker["col"], watermarker["watermarking_time"] 148 )(df) 149 if not cols: 150 return df.dropDuplicates() 151 else: 152 return df.dropDuplicates(cols) 153 154 return inner
Drop duplicate rows using spark function dropDuplicates().
This transformer can be used with or without arguments. The provided argument needs to be a list of columns. For example: [“Name”,”VAT”] will drop duplicate records within "Name" and "VAT" columns. If the transformer is used without providing any columns list or providing an empty list, such as [] the result will be the same as using the distinct() pyspark function. If the watermark dict is present it will ensure that the drop operation will apply to rows within the watermark timeline window.
Arguments:
- cols: column names.
- watermarker: properties to apply watermarker to the transformer.
Returns:
A function to be called in .transform() spark function.