lakehouse_engine.algorithms.reconciliator
Module containing the Reconciliator class.
1"""Module containing the Reconciliator class.""" 2 3from enum import Enum 4from typing import List 5 6import pyspark.sql.functions as spark_fns 7from pyspark.sql import DataFrame 8from pyspark.sql.functions import abs, coalesce, col, lit, when # noqa: A004 9from pyspark.sql.types import FloatType 10 11from lakehouse_engine.algorithms.exceptions import ReconciliationFailedException 12from lakehouse_engine.core.definitions import InputSpec, ReconciliatorSpec 13from lakehouse_engine.core.exec_env import ExecEnv 14from lakehouse_engine.core.executable import Executable 15from lakehouse_engine.io.reader_factory import ReaderFactory 16from lakehouse_engine.transformers.optimizers import Optimizers 17from lakehouse_engine.utils.logging_handler import LoggingHandler 18 19 20class ReconciliationType(Enum): 21 """Type of Reconciliation.""" 22 23 PCT = "percentage" 24 ABS = "absolute" 25 26 27class ReconciliationTransformers(Enum): 28 """Transformers Available for the Reconciliation Algorithm.""" 29 30 AVAILABLE_TRANSFORMERS: dict = { 31 "cache": Optimizers.cache, 32 "persist": Optimizers.persist, 33 } 34 35 36class Reconciliator(Executable): 37 """Class to define the behavior of an algorithm that checks if data reconciles. 38 39 Checking if data reconciles, using this algorithm, is a matter of reading the 40 'truth' data and the 'current' data. You can use any input specification compatible 41 with the lakehouse engine to read 'truth' or 'current' data. On top of that, you 42 can pass a 'truth_preprocess_query' and a 'current_preprocess_query' so you can 43 preprocess the data before it goes into the actual reconciliation process. 44 Moreover, you can use the 'truth_preprocess_query_args' and 45 'current_preprocess_query_args' to pass additional arguments to be used to apply 46 additional operations on top of the dataframe, resulting from the previous steps. 47 With these arguments you can apply additional operations like caching or persisting 48 the Dataframe. The way to pass the additional arguments for the operations is 49 similar to the TransformSpec, but only a few operations are allowed. Those are 50 defined in ReconciliationTransformers.AVAILABLE_TRANSFORMERS. 51 52 The reconciliation process is focused on joining 'truth' with 'current' by all 53 provided columns except the ones passed as 'metrics'. After that it calculates the 54 differences in the metrics attributes (either percentage or absolute difference). 55 Finally, it aggregates the differences, using the supplied aggregation function 56 (e.g., sum, avg, min, max, etc). 57 58 All of these configurations are passed via the ACON to instantiate a 59 ReconciliatorSpec object. 60 61 .. note:: 62 It is crucial that both the current and truth datasets have exactly the same 63 structure. 64 .. note:: 65 You should not use 0 as yellow or red threshold, as the algorithm will verify 66 if the difference between the truth and current values is bigger 67 or equal than those thresholds. 68 .. note:: 69 The reconciliation does not produce any negative values or percentages, as we 70 use the absolute value of the differences. This means that the recon result 71 will not indicate if it was the current values that were bigger or smaller 72 than the truth values, or vice versa. 73 """ 74 75 _logger = LoggingHandler(__name__).get_logger() 76 77 def __init__(self, acon: dict): 78 """Construct Algorithm instances. 79 80 Args: 81 acon: algorithm configuration. 82 """ 83 self.spec: ReconciliatorSpec = ReconciliatorSpec( 84 metrics=acon["metrics"], 85 truth_input_spec=InputSpec(**acon["truth_input_spec"]), 86 current_input_spec=InputSpec(**acon["current_input_spec"]), 87 truth_preprocess_query=acon.get("truth_preprocess_query", None), 88 truth_preprocess_query_args=acon.get("truth_preprocess_query_args", None), 89 current_preprocess_query=acon.get("current_preprocess_query", None), 90 current_preprocess_query_args=acon.get( 91 "current_preprocess_query_args", None 92 ), 93 ignore_empty_df=acon.get("ignore_empty_df", False), 94 ) 95 96 def get_source_of_truth(self) -> DataFrame: 97 """Get the source of truth (expected result) for the reconciliation process. 98 99 Returns: 100 DataFrame containing the source of truth. 101 """ 102 truth_df = ReaderFactory.get_data(self.spec.truth_input_spec) 103 if self.spec.truth_preprocess_query: 104 truth_df.createOrReplaceTempView("truth") 105 truth_df = ExecEnv.SESSION.sql(self.spec.truth_preprocess_query) 106 107 return truth_df 108 109 def get_current_results(self) -> DataFrame: 110 """Get the current results from the table that we are checking if it reconciles. 111 112 Returns: 113 DataFrame containing the current results. 114 """ 115 current_df = ReaderFactory.get_data(self.spec.current_input_spec) 116 if self.spec.current_preprocess_query: 117 current_df.createOrReplaceTempView("current") 118 current_df = ExecEnv.SESSION.sql(self.spec.current_preprocess_query) 119 120 return current_df 121 122 def execute(self) -> None: 123 """Reconcile the current results against the truth dataset.""" 124 truth_df = self.get_source_of_truth() 125 self._apply_preprocess_query_args( 126 truth_df, self.spec.truth_preprocess_query_args 127 ) 128 self._logger.info("Source of truth:") 129 truth_df.show(1000, truncate=False) 130 131 current_results_df = self.get_current_results() 132 self._apply_preprocess_query_args( 133 current_results_df, self.spec.current_preprocess_query_args 134 ) 135 self._logger.info("Current results:") 136 current_results_df.show(1000, truncate=False) 137 138 status = "green" 139 140 # if ignore_empty_df is true, run empty check on truth_df and current_results_df 141 # if both the dataframes are empty then exit with green 142 if ( 143 self.spec.ignore_empty_df 144 and truth_df.isEmpty() 145 and current_results_df.isEmpty() 146 ): 147 self._logger.info( 148 f"ignore_empty_df is {self.spec.ignore_empty_df}, " 149 f"truth_df and current_results_df are empty, " 150 f"hence ignoring reconciliation" 151 ) 152 self._logger.info("The Reconciliation process has succeeded.") 153 return 154 155 recon_results = self._get_recon_results( 156 truth_df, current_results_df, self.spec.metrics 157 ) 158 self._logger.info(f"Reconciliation result: {recon_results}") 159 160 for m in self.spec.metrics: 161 metric_name = f"{m['metric']}_{m['type']}_diff_{m['aggregation']}" 162 if m["yellow"] <= recon_results[metric_name] < m["red"]: 163 if status == "green": 164 # only switch to yellow if it was green before, otherwise we want 165 # to preserve 'red' as the final status. 166 status = "yellow" 167 elif m["red"] <= recon_results[metric_name]: 168 status = "red" 169 170 if status != "green": 171 raise ReconciliationFailedException( 172 f"The Reconciliation process has failed with status: {status}." 173 ) 174 else: 175 self._logger.info("The Reconciliation process has succeeded.") 176 177 @staticmethod 178 def _apply_preprocess_query_args( 179 df: DataFrame, preprocess_query_args: List[dict] 180 ) -> DataFrame: 181 """Apply transformers on top of the preprocessed query. 182 183 Args: 184 df: dataframe being transformed. 185 preprocess_query_args: dict having the functions/transformations to 186 apply and respective arguments. 187 188 Returns: the transformed Dataframe. 189 """ 190 transformed_df = df 191 192 if preprocess_query_args is None: 193 transformed_df = df.transform(Optimizers.cache()) 194 elif len(preprocess_query_args) > 0: 195 for transformation in preprocess_query_args: 196 rec_func = ReconciliationTransformers.AVAILABLE_TRANSFORMERS.value[ 197 transformation["function"] 198 ](**transformation.get("args", {})) 199 200 transformed_df = df.transform(rec_func) 201 else: 202 transformed_df = df 203 204 return transformed_df 205 206 def _get_recon_results( 207 self, truth_df: DataFrame, current_results_df: DataFrame, metrics: List[dict] 208 ) -> dict: 209 """Get the reconciliation results by comparing truth_df with current_results_df. 210 211 Args: 212 truth_df: dataframe with the truth data to reconcile against. It is 213 typically an aggregated dataset to use as baseline and then we match the 214 current_results_df (Aggregated at the same level) against this truth. 215 current_results_df: dataframe with the current results of the dataset we 216 are trying to reconcile. 217 metrics: list of dicts containing metric, aggregation, yellow threshold and 218 red threshold. 219 220 Return: 221 dictionary with the results (difference between truth and current results) 222 """ 223 if len(truth_df.head(1)) == 0 or len(current_results_df.head(1)) == 0: 224 raise ReconciliationFailedException( 225 "The reconciliation has failed because either the truth dataset or the " 226 "current results dataset was empty." 227 ) 228 229 # truth and current are joined on all columns except the metrics 230 joined_df = truth_df.alias("truth").join( 231 current_results_df.alias("current"), 232 [ 233 truth_df[c] == current_results_df[c] 234 for c in current_results_df.columns 235 if c not in [m["metric"] for m in metrics] 236 ], 237 how="full", 238 ) 239 240 for m in metrics: 241 if m["type"] == ReconciliationType.PCT.value: 242 joined_df = joined_df.withColumn( 243 f"{m['metric']}_{m['type']}_diff", 244 coalesce( 245 ( 246 # we need to make sure we don't produce negative values 247 # because our thresholds only accept > or >= comparisons. 248 abs( 249 ( 250 col(f"current.{m['metric']}") 251 - col(f"truth.{m['metric']}") 252 ) 253 / abs(col(f"truth.{m['metric']}")) 254 ) 255 ), 256 # if the formula above produces null, we need to consider where 257 # it came from: we check below if the values were the same, 258 # and if so the diff is 0, if not the diff is 1 (e.g., the null 259 # result might have come from a division by 0). 260 when( 261 col(f"current.{m['metric']}").eqNullSafe( 262 col(f"truth.{m['metric']}") 263 ), 264 lit(0), 265 ).otherwise(lit(1)), 266 ), 267 ) 268 elif m["type"] == ReconciliationType.ABS.value: 269 joined_df = joined_df.withColumn( 270 f"{m['metric']}_{m['type']}_diff", 271 abs( 272 coalesce(col(f"current.{m['metric']}"), lit(0)) 273 - coalesce(col(f"truth.{m['metric']}"), lit(0)) 274 ), 275 ) 276 else: 277 raise NotImplementedError( 278 "The requested reconciliation type is not yet implemented." 279 ) 280 281 joined_df = joined_df.withColumn( 282 f"{m['metric']}_{m['type']}_diff", 283 col(f"{m['metric']}_{m['type']}_diff").cast(FloatType()), 284 ) 285 286 results_df = joined_df.agg( 287 *[ 288 getattr(spark_fns, m["aggregation"])( 289 f"{m['metric']}_{m['type']}_diff" 290 ).alias(f"{m['metric']}_{m['type']}_diff_{m['aggregation']}") 291 for m in metrics 292 ] 293 ) 294 295 return results_df.collect()[0].asDict()
21class ReconciliationType(Enum): 22 """Type of Reconciliation.""" 23 24 PCT = "percentage" 25 ABS = "absolute"
Type of Reconciliation.
Inherited Members
- enum.Enum
- name
- value
28class ReconciliationTransformers(Enum): 29 """Transformers Available for the Reconciliation Algorithm.""" 30 31 AVAILABLE_TRANSFORMERS: dict = { 32 "cache": Optimizers.cache, 33 "persist": Optimizers.persist, 34 }
Transformers Available for the Reconciliation Algorithm.
Inherited Members
- enum.Enum
- name
- value
37class Reconciliator(Executable): 38 """Class to define the behavior of an algorithm that checks if data reconciles. 39 40 Checking if data reconciles, using this algorithm, is a matter of reading the 41 'truth' data and the 'current' data. You can use any input specification compatible 42 with the lakehouse engine to read 'truth' or 'current' data. On top of that, you 43 can pass a 'truth_preprocess_query' and a 'current_preprocess_query' so you can 44 preprocess the data before it goes into the actual reconciliation process. 45 Moreover, you can use the 'truth_preprocess_query_args' and 46 'current_preprocess_query_args' to pass additional arguments to be used to apply 47 additional operations on top of the dataframe, resulting from the previous steps. 48 With these arguments you can apply additional operations like caching or persisting 49 the Dataframe. The way to pass the additional arguments for the operations is 50 similar to the TransformSpec, but only a few operations are allowed. Those are 51 defined in ReconciliationTransformers.AVAILABLE_TRANSFORMERS. 52 53 The reconciliation process is focused on joining 'truth' with 'current' by all 54 provided columns except the ones passed as 'metrics'. After that it calculates the 55 differences in the metrics attributes (either percentage or absolute difference). 56 Finally, it aggregates the differences, using the supplied aggregation function 57 (e.g., sum, avg, min, max, etc). 58 59 All of these configurations are passed via the ACON to instantiate a 60 ReconciliatorSpec object. 61 62 .. note:: 63 It is crucial that both the current and truth datasets have exactly the same 64 structure. 65 .. note:: 66 You should not use 0 as yellow or red threshold, as the algorithm will verify 67 if the difference between the truth and current values is bigger 68 or equal than those thresholds. 69 .. note:: 70 The reconciliation does not produce any negative values or percentages, as we 71 use the absolute value of the differences. This means that the recon result 72 will not indicate if it was the current values that were bigger or smaller 73 than the truth values, or vice versa. 74 """ 75 76 _logger = LoggingHandler(__name__).get_logger() 77 78 def __init__(self, acon: dict): 79 """Construct Algorithm instances. 80 81 Args: 82 acon: algorithm configuration. 83 """ 84 self.spec: ReconciliatorSpec = ReconciliatorSpec( 85 metrics=acon["metrics"], 86 truth_input_spec=InputSpec(**acon["truth_input_spec"]), 87 current_input_spec=InputSpec(**acon["current_input_spec"]), 88 truth_preprocess_query=acon.get("truth_preprocess_query", None), 89 truth_preprocess_query_args=acon.get("truth_preprocess_query_args", None), 90 current_preprocess_query=acon.get("current_preprocess_query", None), 91 current_preprocess_query_args=acon.get( 92 "current_preprocess_query_args", None 93 ), 94 ignore_empty_df=acon.get("ignore_empty_df", False), 95 ) 96 97 def get_source_of_truth(self) -> DataFrame: 98 """Get the source of truth (expected result) for the reconciliation process. 99 100 Returns: 101 DataFrame containing the source of truth. 102 """ 103 truth_df = ReaderFactory.get_data(self.spec.truth_input_spec) 104 if self.spec.truth_preprocess_query: 105 truth_df.createOrReplaceTempView("truth") 106 truth_df = ExecEnv.SESSION.sql(self.spec.truth_preprocess_query) 107 108 return truth_df 109 110 def get_current_results(self) -> DataFrame: 111 """Get the current results from the table that we are checking if it reconciles. 112 113 Returns: 114 DataFrame containing the current results. 115 """ 116 current_df = ReaderFactory.get_data(self.spec.current_input_spec) 117 if self.spec.current_preprocess_query: 118 current_df.createOrReplaceTempView("current") 119 current_df = ExecEnv.SESSION.sql(self.spec.current_preprocess_query) 120 121 return current_df 122 123 def execute(self) -> None: 124 """Reconcile the current results against the truth dataset.""" 125 truth_df = self.get_source_of_truth() 126 self._apply_preprocess_query_args( 127 truth_df, self.spec.truth_preprocess_query_args 128 ) 129 self._logger.info("Source of truth:") 130 truth_df.show(1000, truncate=False) 131 132 current_results_df = self.get_current_results() 133 self._apply_preprocess_query_args( 134 current_results_df, self.spec.current_preprocess_query_args 135 ) 136 self._logger.info("Current results:") 137 current_results_df.show(1000, truncate=False) 138 139 status = "green" 140 141 # if ignore_empty_df is true, run empty check on truth_df and current_results_df 142 # if both the dataframes are empty then exit with green 143 if ( 144 self.spec.ignore_empty_df 145 and truth_df.isEmpty() 146 and current_results_df.isEmpty() 147 ): 148 self._logger.info( 149 f"ignore_empty_df is {self.spec.ignore_empty_df}, " 150 f"truth_df and current_results_df are empty, " 151 f"hence ignoring reconciliation" 152 ) 153 self._logger.info("The Reconciliation process has succeeded.") 154 return 155 156 recon_results = self._get_recon_results( 157 truth_df, current_results_df, self.spec.metrics 158 ) 159 self._logger.info(f"Reconciliation result: {recon_results}") 160 161 for m in self.spec.metrics: 162 metric_name = f"{m['metric']}_{m['type']}_diff_{m['aggregation']}" 163 if m["yellow"] <= recon_results[metric_name] < m["red"]: 164 if status == "green": 165 # only switch to yellow if it was green before, otherwise we want 166 # to preserve 'red' as the final status. 167 status = "yellow" 168 elif m["red"] <= recon_results[metric_name]: 169 status = "red" 170 171 if status != "green": 172 raise ReconciliationFailedException( 173 f"The Reconciliation process has failed with status: {status}." 174 ) 175 else: 176 self._logger.info("The Reconciliation process has succeeded.") 177 178 @staticmethod 179 def _apply_preprocess_query_args( 180 df: DataFrame, preprocess_query_args: List[dict] 181 ) -> DataFrame: 182 """Apply transformers on top of the preprocessed query. 183 184 Args: 185 df: dataframe being transformed. 186 preprocess_query_args: dict having the functions/transformations to 187 apply and respective arguments. 188 189 Returns: the transformed Dataframe. 190 """ 191 transformed_df = df 192 193 if preprocess_query_args is None: 194 transformed_df = df.transform(Optimizers.cache()) 195 elif len(preprocess_query_args) > 0: 196 for transformation in preprocess_query_args: 197 rec_func = ReconciliationTransformers.AVAILABLE_TRANSFORMERS.value[ 198 transformation["function"] 199 ](**transformation.get("args", {})) 200 201 transformed_df = df.transform(rec_func) 202 else: 203 transformed_df = df 204 205 return transformed_df 206 207 def _get_recon_results( 208 self, truth_df: DataFrame, current_results_df: DataFrame, metrics: List[dict] 209 ) -> dict: 210 """Get the reconciliation results by comparing truth_df with current_results_df. 211 212 Args: 213 truth_df: dataframe with the truth data to reconcile against. It is 214 typically an aggregated dataset to use as baseline and then we match the 215 current_results_df (Aggregated at the same level) against this truth. 216 current_results_df: dataframe with the current results of the dataset we 217 are trying to reconcile. 218 metrics: list of dicts containing metric, aggregation, yellow threshold and 219 red threshold. 220 221 Return: 222 dictionary with the results (difference between truth and current results) 223 """ 224 if len(truth_df.head(1)) == 0 or len(current_results_df.head(1)) == 0: 225 raise ReconciliationFailedException( 226 "The reconciliation has failed because either the truth dataset or the " 227 "current results dataset was empty." 228 ) 229 230 # truth and current are joined on all columns except the metrics 231 joined_df = truth_df.alias("truth").join( 232 current_results_df.alias("current"), 233 [ 234 truth_df[c] == current_results_df[c] 235 for c in current_results_df.columns 236 if c not in [m["metric"] for m in metrics] 237 ], 238 how="full", 239 ) 240 241 for m in metrics: 242 if m["type"] == ReconciliationType.PCT.value: 243 joined_df = joined_df.withColumn( 244 f"{m['metric']}_{m['type']}_diff", 245 coalesce( 246 ( 247 # we need to make sure we don't produce negative values 248 # because our thresholds only accept > or >= comparisons. 249 abs( 250 ( 251 col(f"current.{m['metric']}") 252 - col(f"truth.{m['metric']}") 253 ) 254 / abs(col(f"truth.{m['metric']}")) 255 ) 256 ), 257 # if the formula above produces null, we need to consider where 258 # it came from: we check below if the values were the same, 259 # and if so the diff is 0, if not the diff is 1 (e.g., the null 260 # result might have come from a division by 0). 261 when( 262 col(f"current.{m['metric']}").eqNullSafe( 263 col(f"truth.{m['metric']}") 264 ), 265 lit(0), 266 ).otherwise(lit(1)), 267 ), 268 ) 269 elif m["type"] == ReconciliationType.ABS.value: 270 joined_df = joined_df.withColumn( 271 f"{m['metric']}_{m['type']}_diff", 272 abs( 273 coalesce(col(f"current.{m['metric']}"), lit(0)) 274 - coalesce(col(f"truth.{m['metric']}"), lit(0)) 275 ), 276 ) 277 else: 278 raise NotImplementedError( 279 "The requested reconciliation type is not yet implemented." 280 ) 281 282 joined_df = joined_df.withColumn( 283 f"{m['metric']}_{m['type']}_diff", 284 col(f"{m['metric']}_{m['type']}_diff").cast(FloatType()), 285 ) 286 287 results_df = joined_df.agg( 288 *[ 289 getattr(spark_fns, m["aggregation"])( 290 f"{m['metric']}_{m['type']}_diff" 291 ).alias(f"{m['metric']}_{m['type']}_diff_{m['aggregation']}") 292 for m in metrics 293 ] 294 ) 295 296 return results_df.collect()[0].asDict()
Class to define the behavior of an algorithm that checks if data reconciles.
Checking if data reconciles, using this algorithm, is a matter of reading the 'truth' data and the 'current' data. You can use any input specification compatible with the lakehouse engine to read 'truth' or 'current' data. On top of that, you can pass a 'truth_preprocess_query' and a 'current_preprocess_query' so you can preprocess the data before it goes into the actual reconciliation process. Moreover, you can use the 'truth_preprocess_query_args' and 'current_preprocess_query_args' to pass additional arguments to be used to apply additional operations on top of the dataframe, resulting from the previous steps. With these arguments you can apply additional operations like caching or persisting the Dataframe. The way to pass the additional arguments for the operations is similar to the TransformSpec, but only a few operations are allowed. Those are defined in ReconciliationTransformers.AVAILABLE_TRANSFORMERS.
The reconciliation process is focused on joining 'truth' with 'current' by all provided columns except the ones passed as 'metrics'. After that it calculates the differences in the metrics attributes (either percentage or absolute difference). Finally, it aggregates the differences, using the supplied aggregation function (e.g., sum, avg, min, max, etc).
All of these configurations are passed via the ACON to instantiate a ReconciliatorSpec object.
It is crucial that both the current and truth datasets have exactly the same structure.
You should not use 0 as yellow or red threshold, as the algorithm will verify if the difference between the truth and current values is bigger or equal than those thresholds.
The reconciliation does not produce any negative values or percentages, as we use the absolute value of the differences. This means that the recon result will not indicate if it was the current values that were bigger or smaller than the truth values, or vice versa.
78 def __init__(self, acon: dict): 79 """Construct Algorithm instances. 80 81 Args: 82 acon: algorithm configuration. 83 """ 84 self.spec: ReconciliatorSpec = ReconciliatorSpec( 85 metrics=acon["metrics"], 86 truth_input_spec=InputSpec(**acon["truth_input_spec"]), 87 current_input_spec=InputSpec(**acon["current_input_spec"]), 88 truth_preprocess_query=acon.get("truth_preprocess_query", None), 89 truth_preprocess_query_args=acon.get("truth_preprocess_query_args", None), 90 current_preprocess_query=acon.get("current_preprocess_query", None), 91 current_preprocess_query_args=acon.get( 92 "current_preprocess_query_args", None 93 ), 94 ignore_empty_df=acon.get("ignore_empty_df", False), 95 )
Construct Algorithm instances.
Arguments:
- acon: algorithm configuration.
97 def get_source_of_truth(self) -> DataFrame: 98 """Get the source of truth (expected result) for the reconciliation process. 99 100 Returns: 101 DataFrame containing the source of truth. 102 """ 103 truth_df = ReaderFactory.get_data(self.spec.truth_input_spec) 104 if self.spec.truth_preprocess_query: 105 truth_df.createOrReplaceTempView("truth") 106 truth_df = ExecEnv.SESSION.sql(self.spec.truth_preprocess_query) 107 108 return truth_df
Get the source of truth (expected result) for the reconciliation process.
Returns:
DataFrame containing the source of truth.
110 def get_current_results(self) -> DataFrame: 111 """Get the current results from the table that we are checking if it reconciles. 112 113 Returns: 114 DataFrame containing the current results. 115 """ 116 current_df = ReaderFactory.get_data(self.spec.current_input_spec) 117 if self.spec.current_preprocess_query: 118 current_df.createOrReplaceTempView("current") 119 current_df = ExecEnv.SESSION.sql(self.spec.current_preprocess_query) 120 121 return current_df
Get the current results from the table that we are checking if it reconciles.
Returns:
DataFrame containing the current results.
123 def execute(self) -> None: 124 """Reconcile the current results against the truth dataset.""" 125 truth_df = self.get_source_of_truth() 126 self._apply_preprocess_query_args( 127 truth_df, self.spec.truth_preprocess_query_args 128 ) 129 self._logger.info("Source of truth:") 130 truth_df.show(1000, truncate=False) 131 132 current_results_df = self.get_current_results() 133 self._apply_preprocess_query_args( 134 current_results_df, self.spec.current_preprocess_query_args 135 ) 136 self._logger.info("Current results:") 137 current_results_df.show(1000, truncate=False) 138 139 status = "green" 140 141 # if ignore_empty_df is true, run empty check on truth_df and current_results_df 142 # if both the dataframes are empty then exit with green 143 if ( 144 self.spec.ignore_empty_df 145 and truth_df.isEmpty() 146 and current_results_df.isEmpty() 147 ): 148 self._logger.info( 149 f"ignore_empty_df is {self.spec.ignore_empty_df}, " 150 f"truth_df and current_results_df are empty, " 151 f"hence ignoring reconciliation" 152 ) 153 self._logger.info("The Reconciliation process has succeeded.") 154 return 155 156 recon_results = self._get_recon_results( 157 truth_df, current_results_df, self.spec.metrics 158 ) 159 self._logger.info(f"Reconciliation result: {recon_results}") 160 161 for m in self.spec.metrics: 162 metric_name = f"{m['metric']}_{m['type']}_diff_{m['aggregation']}" 163 if m["yellow"] <= recon_results[metric_name] < m["red"]: 164 if status == "green": 165 # only switch to yellow if it was green before, otherwise we want 166 # to preserve 'red' as the final status. 167 status = "yellow" 168 elif m["red"] <= recon_results[metric_name]: 169 status = "red" 170 171 if status != "green": 172 raise ReconciliationFailedException( 173 f"The Reconciliation process has failed with status: {status}." 174 ) 175 else: 176 self._logger.info("The Reconciliation process has succeeded.")
Reconcile the current results against the truth dataset.