Filters
Module containing the filters transformers.
Filters
¶
Bases: object
Class containing the filters transformers.
Source code in mkdocs/lakehouse_engine/packages/transformers/filters.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
|
column_filter_exp(exp)
staticmethod
¶
Filter a dataframe's columns based on a list of SQL expressions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exp |
List[str]
|
column filter expressions. |
required |
Returns:
Type | Description |
---|---|
Callable
|
A function to be called in .transform() spark function. |
View Example of column_filter_exp (See full example here)
Source code in mkdocs/lakehouse_engine/packages/transformers/filters.py
drop_duplicate_rows(cols=None, watermarker=None)
staticmethod
¶
Drop duplicate rows using spark function dropDuplicates().
This transformer can be used with or without arguments. The provided argument needs to be a list of columns. For example: [“Name”,”VAT”] will drop duplicate records within "Name" and "VAT" columns. If the transformer is used without providing any columns list or providing an empty list, such as [] the result will be the same as using the distinct() pyspark function. If the watermark dict is present it will ensure that the drop operation will apply to rows within the watermark timeline window.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cols |
List[str]
|
column names. |
None
|
watermarker |
dict
|
properties to apply watermarker to the transformer. |
None
|
Returns:
Type | Description |
---|---|
Callable
|
A function to be called in .transform() spark function. |
Source code in mkdocs/lakehouse_engine/packages/transformers/filters.py
expression_filter(exp)
staticmethod
¶
Filter a dataframe based on an expression.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exp |
str
|
filter expression. |
required |
Returns:
Type | Description |
---|---|
Callable
|
A function to be called in .transform() spark function. |
View Example of expression_filter (See full example here)
Source code in mkdocs/lakehouse_engine/packages/transformers/filters.py
incremental_filter(input_col, increment_value=None, increment_df=None, increment_col='latest', greater_or_equal=False)
classmethod
¶
Incrementally Filter a certain dataframe given an increment logic.
This logic can either be an increment value or an increment dataframe from which the get the latest value from. By default, the operator for the filtering process is greater or equal to cover cases where we receive late arriving data not cover in a previous load. You can change greater_or_equal to false to use greater, when you trust the source will never output more data with the increment after you have load the data (e.g., you will never load data until the source is still dumping data, which may cause you to get an incomplete picture of the last arrived data).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_col |
str
|
input column name |
required |
increment_value |
Optional[Any]
|
value to which to filter the data, considering the provided input_Col. |
None
|
increment_df |
Optional[DataFrame]
|
a dataframe to get the increment value from. you either specify this or the increment_value (this takes precedence). This is a good approach to get the latest value from a given dataframe that was read and apply that value as filter here. In this way you can perform incremental loads based on the last value of a given dataframe (e.g., table or file based). Can be used together with the get_max_value transformer to accomplish these incremental based loads. See our append load feature tests to see how to provide an acon for incremental loads, taking advantage of the scenario explained here. |
None
|
increment_col |
str
|
name of the column from which to get the increment value from (when using increment_df approach). This assumes there's only one row in the increment_df, reason why is a good idea to use together with the get_max_value transformer. Defaults to "latest" because that's the default output column name provided by the get_max_value transformer. |
'latest'
|
greater_or_equal |
bool
|
if filtering should be done by also including the increment value or not (useful for scenarios where you are performing increment loads but still want to include data considering the increment value, and not only values greater than that increment... examples may include scenarios where you already loaded data including those values, but the source produced more data containing those values). Defaults to false. |
False
|
Returns:
Type | Description |
---|---|
Callable
|
A function to be called in .transform() spark function. |