lakehouse_engine.algorithms.reconciliator

Module containing the Reconciliator class.

  1"""Module containing the Reconciliator class."""
  2
  3from enum import Enum
  4from typing import List
  5
  6import pyspark.sql.functions as spark_fns
  7from pyspark.sql import DataFrame
  8from pyspark.sql.functions import abs, coalesce, col, lit, when  # noqa: A004
  9from pyspark.sql.types import FloatType
 10
 11from lakehouse_engine.algorithms.exceptions import ReconciliationFailedException
 12from lakehouse_engine.core.definitions import InputSpec, ReconciliatorSpec
 13from lakehouse_engine.core.exec_env import ExecEnv
 14from lakehouse_engine.core.executable import Executable
 15from lakehouse_engine.io.reader_factory import ReaderFactory
 16from lakehouse_engine.transformers.optimizers import Optimizers
 17from lakehouse_engine.utils.logging_handler import LoggingHandler
 18
 19
 20class ReconciliationType(Enum):
 21    """Type of Reconciliation."""
 22
 23    PCT = "percentage"
 24    ABS = "absolute"
 25
 26
 27class ReconciliationTransformers(Enum):
 28    """Transformers Available for the Reconciliation Algorithm."""
 29
 30    AVAILABLE_TRANSFORMERS: dict = {
 31        "cache": Optimizers.cache,
 32        "persist": Optimizers.persist,
 33    }
 34
 35
 36class Reconciliator(Executable):
 37    """Class to define the behavior of an algorithm that checks if data reconciles.
 38
 39    Checking if data reconciles, using this algorithm, is a matter of reading the
 40    'truth' data and the 'current' data. You can use any input specification compatible
 41    with the lakehouse engine to read 'truth' or 'current' data. On top of that, you
 42    can pass a 'truth_preprocess_query' and a 'current_preprocess_query' so you can
 43    preprocess the data before it goes into the actual reconciliation process.
 44    Moreover, you can use the 'truth_preprocess_query_args' and
 45    'current_preprocess_query_args' to pass additional arguments to be used to apply
 46    additional operations on top of the dataframe, resulting from the previous steps.
 47    With these arguments you can apply additional operations like caching or persisting
 48    the Dataframe. The way to pass the additional arguments for the operations is
 49    similar to the TransformSpec, but only a few operations are allowed. Those are
 50    defined in ReconciliationTransformers.AVAILABLE_TRANSFORMERS.
 51
 52    The reconciliation process is focused on joining 'truth' with 'current' by all
 53    provided columns except the ones passed as 'metrics'. After that it calculates the
 54    differences in the metrics attributes (either percentage or absolute difference).
 55    Finally, it aggregates the differences, using the supplied aggregation function
 56    (e.g., sum, avg, min, max, etc).
 57
 58    All of these configurations are passed via the ACON to instantiate a
 59    ReconciliatorSpec object.
 60
 61    .. note::
 62        It is crucial that both the current and truth datasets have exactly the same
 63        structure.
 64    .. note::
 65        You should not use 0 as yellow or red threshold, as the algorithm will verify
 66        if the difference between the truth and current values is bigger
 67        or equal than those thresholds.
 68    .. note::
 69        The reconciliation does not produce any negative values or percentages, as we
 70        use the absolute value of the differences. This means that the recon result
 71        will not indicate if it was the current values that were bigger or smaller
 72        than the truth values, or vice versa.
 73    """
 74
 75    _logger = LoggingHandler(__name__).get_logger()
 76
 77    def __init__(self, acon: dict):
 78        """Construct Algorithm instances.
 79
 80        Args:
 81            acon: algorithm configuration.
 82        """
 83        self.spec: ReconciliatorSpec = ReconciliatorSpec(
 84            metrics=acon["metrics"],
 85            truth_input_spec=InputSpec(**acon["truth_input_spec"]),
 86            current_input_spec=InputSpec(**acon["current_input_spec"]),
 87            truth_preprocess_query=acon.get("truth_preprocess_query", None),
 88            truth_preprocess_query_args=acon.get("truth_preprocess_query_args", None),
 89            current_preprocess_query=acon.get("current_preprocess_query", None),
 90            current_preprocess_query_args=acon.get(
 91                "current_preprocess_query_args", None
 92            ),
 93            ignore_empty_df=acon.get("ignore_empty_df", False),
 94        )
 95
 96    def get_source_of_truth(self) -> DataFrame:
 97        """Get the source of truth (expected result) for the reconciliation process.
 98
 99        Returns:
100            DataFrame containing the source of truth.
101        """
102        truth_df = ReaderFactory.get_data(self.spec.truth_input_spec)
103        if self.spec.truth_preprocess_query:
104            truth_df.createOrReplaceTempView("truth")
105            truth_df = ExecEnv.SESSION.sql(self.spec.truth_preprocess_query)
106
107        return truth_df
108
109    def get_current_results(self) -> DataFrame:
110        """Get the current results from the table that we are checking if it reconciles.
111
112        Returns:
113            DataFrame containing the current results.
114        """
115        current_df = ReaderFactory.get_data(self.spec.current_input_spec)
116        if self.spec.current_preprocess_query:
117            current_df.createOrReplaceTempView("current")
118            current_df = ExecEnv.SESSION.sql(self.spec.current_preprocess_query)
119
120        return current_df
121
122    def execute(self) -> None:
123        """Reconcile the current results against the truth dataset."""
124        truth_df = self.get_source_of_truth()
125        self._apply_preprocess_query_args(
126            truth_df, self.spec.truth_preprocess_query_args
127        )
128        self._logger.info("Source of truth:")
129        truth_df.show(1000, truncate=False)
130
131        current_results_df = self.get_current_results()
132        self._apply_preprocess_query_args(
133            current_results_df, self.spec.current_preprocess_query_args
134        )
135        self._logger.info("Current results:")
136        current_results_df.show(1000, truncate=False)
137
138        status = "green"
139
140        # if ignore_empty_df is true, run empty check on truth_df and current_results_df
141        # if both the dataframes are empty then exit with green
142        if (
143            self.spec.ignore_empty_df
144            and truth_df.isEmpty()
145            and current_results_df.isEmpty()
146        ):
147            self._logger.info(
148                f"ignore_empty_df is {self.spec.ignore_empty_df}, "
149                f"truth_df and current_results_df are empty, "
150                f"hence ignoring reconciliation"
151            )
152            self._logger.info("The Reconciliation process has succeeded.")
153            return
154
155        recon_results = self._get_recon_results(
156            truth_df, current_results_df, self.spec.metrics
157        )
158        self._logger.info(f"Reconciliation result: {recon_results}")
159
160        for m in self.spec.metrics:
161            metric_name = f"{m['metric']}_{m['type']}_diff_{m['aggregation']}"
162            if m["yellow"] <= recon_results[metric_name] < m["red"]:
163                if status == "green":
164                    # only switch to yellow if it was green before, otherwise we want
165                    # to preserve 'red' as the final status.
166                    status = "yellow"
167            elif m["red"] <= recon_results[metric_name]:
168                status = "red"
169
170        if status != "green":
171            raise ReconciliationFailedException(
172                f"The Reconciliation process has failed with status: {status}."
173            )
174        else:
175            self._logger.info("The Reconciliation process has succeeded.")
176
177    @staticmethod
178    def _apply_preprocess_query_args(
179        df: DataFrame, preprocess_query_args: List[dict]
180    ) -> DataFrame:
181        """Apply transformers on top of the preprocessed query.
182
183        Args:
184            df: dataframe being transformed.
185            preprocess_query_args: dict having the functions/transformations to
186                apply and respective arguments.
187
188        Returns: the transformed Dataframe.
189        """
190        transformed_df = df
191
192        if preprocess_query_args is None:
193            transformed_df = df.transform(Optimizers.cache())
194        elif len(preprocess_query_args) > 0:
195            for transformation in preprocess_query_args:
196                rec_func = ReconciliationTransformers.AVAILABLE_TRANSFORMERS.value[
197                    transformation["function"]
198                ](**transformation.get("args", {}))
199
200                transformed_df = df.transform(rec_func)
201        else:
202            transformed_df = df
203
204        return transformed_df
205
206    def _get_recon_results(
207        self, truth_df: DataFrame, current_results_df: DataFrame, metrics: List[dict]
208    ) -> dict:
209        """Get the reconciliation results by comparing truth_df with current_results_df.
210
211        Args:
212            truth_df: dataframe with the truth data to reconcile against. It is
213                typically an aggregated dataset to use as baseline and then we match the
214                current_results_df (Aggregated at the same level) against this truth.
215            current_results_df: dataframe with the current results of the dataset we
216                are trying to reconcile.
217            metrics: list of dicts containing metric, aggregation, yellow threshold and
218                red threshold.
219
220        Return:
221            dictionary with the results (difference between truth and current results)
222        """
223        if len(truth_df.head(1)) == 0 or len(current_results_df.head(1)) == 0:
224            raise ReconciliationFailedException(
225                "The reconciliation has failed because either the truth dataset or the "
226                "current results dataset was empty."
227            )
228
229        # truth and current are joined on all columns except the metrics
230        joined_df = truth_df.alias("truth").join(
231            current_results_df.alias("current"),
232            [
233                truth_df[c] == current_results_df[c]
234                for c in current_results_df.columns
235                if c not in [m["metric"] for m in metrics]
236            ],
237            how="full",
238        )
239
240        for m in metrics:
241            if m["type"] == ReconciliationType.PCT.value:
242                joined_df = joined_df.withColumn(
243                    f"{m['metric']}_{m['type']}_diff",
244                    coalesce(
245                        (
246                            # we need to make sure we don't produce negative values
247                            # because our thresholds only accept > or >= comparisons.
248                            abs(
249                                (
250                                    col(f"current.{m['metric']}")
251                                    - col(f"truth.{m['metric']}")
252                                )
253                                / abs(col(f"truth.{m['metric']}"))
254                            )
255                        ),
256                        # if the formula above produces null, we need to consider where
257                        # it came from: we check below if the values were the same,
258                        # and if so the diff is 0, if not the diff is 1 (e.g., the null
259                        # result might have come from a division by 0).
260                        when(
261                            col(f"current.{m['metric']}").eqNullSafe(
262                                col(f"truth.{m['metric']}")
263                            ),
264                            lit(0),
265                        ).otherwise(lit(1)),
266                    ),
267                )
268            elif m["type"] == ReconciliationType.ABS.value:
269                joined_df = joined_df.withColumn(
270                    f"{m['metric']}_{m['type']}_diff",
271                    abs(
272                        coalesce(col(f"current.{m['metric']}"), lit(0))
273                        - coalesce(col(f"truth.{m['metric']}"), lit(0))
274                    ),
275                )
276            else:
277                raise NotImplementedError(
278                    "The requested reconciliation type is not yet implemented."
279                )
280
281            joined_df = joined_df.withColumn(
282                f"{m['metric']}_{m['type']}_diff",
283                col(f"{m['metric']}_{m['type']}_diff").cast(FloatType()),
284            )
285
286        results_df = joined_df.agg(
287            *[
288                getattr(spark_fns, m["aggregation"])(
289                    f"{m['metric']}_{m['type']}_diff"
290                ).alias(f"{m['metric']}_{m['type']}_diff_{m['aggregation']}")
291                for m in metrics
292            ]
293        )
294
295        return results_df.collect()[0].asDict()
class ReconciliationType(enum.Enum):
21class ReconciliationType(Enum):
22    """Type of Reconciliation."""
23
24    PCT = "percentage"
25    ABS = "absolute"

Type of Reconciliation.

PCT = <ReconciliationType.PCT: 'percentage'>
ABS = <ReconciliationType.ABS: 'absolute'>
Inherited Members
enum.Enum
name
value
class ReconciliationTransformers(enum.Enum):
28class ReconciliationTransformers(Enum):
29    """Transformers Available for the Reconciliation Algorithm."""
30
31    AVAILABLE_TRANSFORMERS: dict = {
32        "cache": Optimizers.cache,
33        "persist": Optimizers.persist,
34    }

Transformers Available for the Reconciliation Algorithm.

AVAILABLE_TRANSFORMERS: dict = <ReconciliationTransformers.AVAILABLE_TRANSFORMERS: {'cache': <bound method Optimizers.cache of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>, 'persist': <bound method Optimizers.persist of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>}>
Inherited Members
enum.Enum
name
value
class Reconciliator(lakehouse_engine.core.executable.Executable):
 37class Reconciliator(Executable):
 38    """Class to define the behavior of an algorithm that checks if data reconciles.
 39
 40    Checking if data reconciles, using this algorithm, is a matter of reading the
 41    'truth' data and the 'current' data. You can use any input specification compatible
 42    with the lakehouse engine to read 'truth' or 'current' data. On top of that, you
 43    can pass a 'truth_preprocess_query' and a 'current_preprocess_query' so you can
 44    preprocess the data before it goes into the actual reconciliation process.
 45    Moreover, you can use the 'truth_preprocess_query_args' and
 46    'current_preprocess_query_args' to pass additional arguments to be used to apply
 47    additional operations on top of the dataframe, resulting from the previous steps.
 48    With these arguments you can apply additional operations like caching or persisting
 49    the Dataframe. The way to pass the additional arguments for the operations is
 50    similar to the TransformSpec, but only a few operations are allowed. Those are
 51    defined in ReconciliationTransformers.AVAILABLE_TRANSFORMERS.
 52
 53    The reconciliation process is focused on joining 'truth' with 'current' by all
 54    provided columns except the ones passed as 'metrics'. After that it calculates the
 55    differences in the metrics attributes (either percentage or absolute difference).
 56    Finally, it aggregates the differences, using the supplied aggregation function
 57    (e.g., sum, avg, min, max, etc).
 58
 59    All of these configurations are passed via the ACON to instantiate a
 60    ReconciliatorSpec object.
 61
 62    .. note::
 63        It is crucial that both the current and truth datasets have exactly the same
 64        structure.
 65    .. note::
 66        You should not use 0 as yellow or red threshold, as the algorithm will verify
 67        if the difference between the truth and current values is bigger
 68        or equal than those thresholds.
 69    .. note::
 70        The reconciliation does not produce any negative values or percentages, as we
 71        use the absolute value of the differences. This means that the recon result
 72        will not indicate if it was the current values that were bigger or smaller
 73        than the truth values, or vice versa.
 74    """
 75
 76    _logger = LoggingHandler(__name__).get_logger()
 77
 78    def __init__(self, acon: dict):
 79        """Construct Algorithm instances.
 80
 81        Args:
 82            acon: algorithm configuration.
 83        """
 84        self.spec: ReconciliatorSpec = ReconciliatorSpec(
 85            metrics=acon["metrics"],
 86            truth_input_spec=InputSpec(**acon["truth_input_spec"]),
 87            current_input_spec=InputSpec(**acon["current_input_spec"]),
 88            truth_preprocess_query=acon.get("truth_preprocess_query", None),
 89            truth_preprocess_query_args=acon.get("truth_preprocess_query_args", None),
 90            current_preprocess_query=acon.get("current_preprocess_query", None),
 91            current_preprocess_query_args=acon.get(
 92                "current_preprocess_query_args", None
 93            ),
 94            ignore_empty_df=acon.get("ignore_empty_df", False),
 95        )
 96
 97    def get_source_of_truth(self) -> DataFrame:
 98        """Get the source of truth (expected result) for the reconciliation process.
 99
100        Returns:
101            DataFrame containing the source of truth.
102        """
103        truth_df = ReaderFactory.get_data(self.spec.truth_input_spec)
104        if self.spec.truth_preprocess_query:
105            truth_df.createOrReplaceTempView("truth")
106            truth_df = ExecEnv.SESSION.sql(self.spec.truth_preprocess_query)
107
108        return truth_df
109
110    def get_current_results(self) -> DataFrame:
111        """Get the current results from the table that we are checking if it reconciles.
112
113        Returns:
114            DataFrame containing the current results.
115        """
116        current_df = ReaderFactory.get_data(self.spec.current_input_spec)
117        if self.spec.current_preprocess_query:
118            current_df.createOrReplaceTempView("current")
119            current_df = ExecEnv.SESSION.sql(self.spec.current_preprocess_query)
120
121        return current_df
122
123    def execute(self) -> None:
124        """Reconcile the current results against the truth dataset."""
125        truth_df = self.get_source_of_truth()
126        self._apply_preprocess_query_args(
127            truth_df, self.spec.truth_preprocess_query_args
128        )
129        self._logger.info("Source of truth:")
130        truth_df.show(1000, truncate=False)
131
132        current_results_df = self.get_current_results()
133        self._apply_preprocess_query_args(
134            current_results_df, self.spec.current_preprocess_query_args
135        )
136        self._logger.info("Current results:")
137        current_results_df.show(1000, truncate=False)
138
139        status = "green"
140
141        # if ignore_empty_df is true, run empty check on truth_df and current_results_df
142        # if both the dataframes are empty then exit with green
143        if (
144            self.spec.ignore_empty_df
145            and truth_df.isEmpty()
146            and current_results_df.isEmpty()
147        ):
148            self._logger.info(
149                f"ignore_empty_df is {self.spec.ignore_empty_df}, "
150                f"truth_df and current_results_df are empty, "
151                f"hence ignoring reconciliation"
152            )
153            self._logger.info("The Reconciliation process has succeeded.")
154            return
155
156        recon_results = self._get_recon_results(
157            truth_df, current_results_df, self.spec.metrics
158        )
159        self._logger.info(f"Reconciliation result: {recon_results}")
160
161        for m in self.spec.metrics:
162            metric_name = f"{m['metric']}_{m['type']}_diff_{m['aggregation']}"
163            if m["yellow"] <= recon_results[metric_name] < m["red"]:
164                if status == "green":
165                    # only switch to yellow if it was green before, otherwise we want
166                    # to preserve 'red' as the final status.
167                    status = "yellow"
168            elif m["red"] <= recon_results[metric_name]:
169                status = "red"
170
171        if status != "green":
172            raise ReconciliationFailedException(
173                f"The Reconciliation process has failed with status: {status}."
174            )
175        else:
176            self._logger.info("The Reconciliation process has succeeded.")
177
178    @staticmethod
179    def _apply_preprocess_query_args(
180        df: DataFrame, preprocess_query_args: List[dict]
181    ) -> DataFrame:
182        """Apply transformers on top of the preprocessed query.
183
184        Args:
185            df: dataframe being transformed.
186            preprocess_query_args: dict having the functions/transformations to
187                apply and respective arguments.
188
189        Returns: the transformed Dataframe.
190        """
191        transformed_df = df
192
193        if preprocess_query_args is None:
194            transformed_df = df.transform(Optimizers.cache())
195        elif len(preprocess_query_args) > 0:
196            for transformation in preprocess_query_args:
197                rec_func = ReconciliationTransformers.AVAILABLE_TRANSFORMERS.value[
198                    transformation["function"]
199                ](**transformation.get("args", {}))
200
201                transformed_df = df.transform(rec_func)
202        else:
203            transformed_df = df
204
205        return transformed_df
206
207    def _get_recon_results(
208        self, truth_df: DataFrame, current_results_df: DataFrame, metrics: List[dict]
209    ) -> dict:
210        """Get the reconciliation results by comparing truth_df with current_results_df.
211
212        Args:
213            truth_df: dataframe with the truth data to reconcile against. It is
214                typically an aggregated dataset to use as baseline and then we match the
215                current_results_df (Aggregated at the same level) against this truth.
216            current_results_df: dataframe with the current results of the dataset we
217                are trying to reconcile.
218            metrics: list of dicts containing metric, aggregation, yellow threshold and
219                red threshold.
220
221        Return:
222            dictionary with the results (difference between truth and current results)
223        """
224        if len(truth_df.head(1)) == 0 or len(current_results_df.head(1)) == 0:
225            raise ReconciliationFailedException(
226                "The reconciliation has failed because either the truth dataset or the "
227                "current results dataset was empty."
228            )
229
230        # truth and current are joined on all columns except the metrics
231        joined_df = truth_df.alias("truth").join(
232            current_results_df.alias("current"),
233            [
234                truth_df[c] == current_results_df[c]
235                for c in current_results_df.columns
236                if c not in [m["metric"] for m in metrics]
237            ],
238            how="full",
239        )
240
241        for m in metrics:
242            if m["type"] == ReconciliationType.PCT.value:
243                joined_df = joined_df.withColumn(
244                    f"{m['metric']}_{m['type']}_diff",
245                    coalesce(
246                        (
247                            # we need to make sure we don't produce negative values
248                            # because our thresholds only accept > or >= comparisons.
249                            abs(
250                                (
251                                    col(f"current.{m['metric']}")
252                                    - col(f"truth.{m['metric']}")
253                                )
254                                / abs(col(f"truth.{m['metric']}"))
255                            )
256                        ),
257                        # if the formula above produces null, we need to consider where
258                        # it came from: we check below if the values were the same,
259                        # and if so the diff is 0, if not the diff is 1 (e.g., the null
260                        # result might have come from a division by 0).
261                        when(
262                            col(f"current.{m['metric']}").eqNullSafe(
263                                col(f"truth.{m['metric']}")
264                            ),
265                            lit(0),
266                        ).otherwise(lit(1)),
267                    ),
268                )
269            elif m["type"] == ReconciliationType.ABS.value:
270                joined_df = joined_df.withColumn(
271                    f"{m['metric']}_{m['type']}_diff",
272                    abs(
273                        coalesce(col(f"current.{m['metric']}"), lit(0))
274                        - coalesce(col(f"truth.{m['metric']}"), lit(0))
275                    ),
276                )
277            else:
278                raise NotImplementedError(
279                    "The requested reconciliation type is not yet implemented."
280                )
281
282            joined_df = joined_df.withColumn(
283                f"{m['metric']}_{m['type']}_diff",
284                col(f"{m['metric']}_{m['type']}_diff").cast(FloatType()),
285            )
286
287        results_df = joined_df.agg(
288            *[
289                getattr(spark_fns, m["aggregation"])(
290                    f"{m['metric']}_{m['type']}_diff"
291                ).alias(f"{m['metric']}_{m['type']}_diff_{m['aggregation']}")
292                for m in metrics
293            ]
294        )
295
296        return results_df.collect()[0].asDict()

Class to define the behavior of an algorithm that checks if data reconciles.

Checking if data reconciles, using this algorithm, is a matter of reading the 'truth' data and the 'current' data. You can use any input specification compatible with the lakehouse engine to read 'truth' or 'current' data. On top of that, you can pass a 'truth_preprocess_query' and a 'current_preprocess_query' so you can preprocess the data before it goes into the actual reconciliation process. Moreover, you can use the 'truth_preprocess_query_args' and 'current_preprocess_query_args' to pass additional arguments to be used to apply additional operations on top of the dataframe, resulting from the previous steps. With these arguments you can apply additional operations like caching or persisting the Dataframe. The way to pass the additional arguments for the operations is similar to the TransformSpec, but only a few operations are allowed. Those are defined in ReconciliationTransformers.AVAILABLE_TRANSFORMERS.

The reconciliation process is focused on joining 'truth' with 'current' by all provided columns except the ones passed as 'metrics'. After that it calculates the differences in the metrics attributes (either percentage or absolute difference). Finally, it aggregates the differences, using the supplied aggregation function (e.g., sum, avg, min, max, etc).

All of these configurations are passed via the ACON to instantiate a ReconciliatorSpec object.

It is crucial that both the current and truth datasets have exactly the same structure.

You should not use 0 as yellow or red threshold, as the algorithm will verify if the difference between the truth and current values is bigger or equal than those thresholds.

The reconciliation does not produce any negative values or percentages, as we use the absolute value of the differences. This means that the recon result will not indicate if it was the current values that were bigger or smaller than the truth values, or vice versa.

Reconciliator(acon: dict)
78    def __init__(self, acon: dict):
79        """Construct Algorithm instances.
80
81        Args:
82            acon: algorithm configuration.
83        """
84        self.spec: ReconciliatorSpec = ReconciliatorSpec(
85            metrics=acon["metrics"],
86            truth_input_spec=InputSpec(**acon["truth_input_spec"]),
87            current_input_spec=InputSpec(**acon["current_input_spec"]),
88            truth_preprocess_query=acon.get("truth_preprocess_query", None),
89            truth_preprocess_query_args=acon.get("truth_preprocess_query_args", None),
90            current_preprocess_query=acon.get("current_preprocess_query", None),
91            current_preprocess_query_args=acon.get(
92                "current_preprocess_query_args", None
93            ),
94            ignore_empty_df=acon.get("ignore_empty_df", False),
95        )

Construct Algorithm instances.

Arguments:
  • acon: algorithm configuration.
def get_source_of_truth(self) -> pyspark.sql.dataframe.DataFrame:
 97    def get_source_of_truth(self) -> DataFrame:
 98        """Get the source of truth (expected result) for the reconciliation process.
 99
100        Returns:
101            DataFrame containing the source of truth.
102        """
103        truth_df = ReaderFactory.get_data(self.spec.truth_input_spec)
104        if self.spec.truth_preprocess_query:
105            truth_df.createOrReplaceTempView("truth")
106            truth_df = ExecEnv.SESSION.sql(self.spec.truth_preprocess_query)
107
108        return truth_df

Get the source of truth (expected result) for the reconciliation process.

Returns:

DataFrame containing the source of truth.

def get_current_results(self) -> pyspark.sql.dataframe.DataFrame:
110    def get_current_results(self) -> DataFrame:
111        """Get the current results from the table that we are checking if it reconciles.
112
113        Returns:
114            DataFrame containing the current results.
115        """
116        current_df = ReaderFactory.get_data(self.spec.current_input_spec)
117        if self.spec.current_preprocess_query:
118            current_df.createOrReplaceTempView("current")
119            current_df = ExecEnv.SESSION.sql(self.spec.current_preprocess_query)
120
121        return current_df

Get the current results from the table that we are checking if it reconciles.

Returns:

DataFrame containing the current results.

def execute(self) -> None:
123    def execute(self) -> None:
124        """Reconcile the current results against the truth dataset."""
125        truth_df = self.get_source_of_truth()
126        self._apply_preprocess_query_args(
127            truth_df, self.spec.truth_preprocess_query_args
128        )
129        self._logger.info("Source of truth:")
130        truth_df.show(1000, truncate=False)
131
132        current_results_df = self.get_current_results()
133        self._apply_preprocess_query_args(
134            current_results_df, self.spec.current_preprocess_query_args
135        )
136        self._logger.info("Current results:")
137        current_results_df.show(1000, truncate=False)
138
139        status = "green"
140
141        # if ignore_empty_df is true, run empty check on truth_df and current_results_df
142        # if both the dataframes are empty then exit with green
143        if (
144            self.spec.ignore_empty_df
145            and truth_df.isEmpty()
146            and current_results_df.isEmpty()
147        ):
148            self._logger.info(
149                f"ignore_empty_df is {self.spec.ignore_empty_df}, "
150                f"truth_df and current_results_df are empty, "
151                f"hence ignoring reconciliation"
152            )
153            self._logger.info("The Reconciliation process has succeeded.")
154            return
155
156        recon_results = self._get_recon_results(
157            truth_df, current_results_df, self.spec.metrics
158        )
159        self._logger.info(f"Reconciliation result: {recon_results}")
160
161        for m in self.spec.metrics:
162            metric_name = f"{m['metric']}_{m['type']}_diff_{m['aggregation']}"
163            if m["yellow"] <= recon_results[metric_name] < m["red"]:
164                if status == "green":
165                    # only switch to yellow if it was green before, otherwise we want
166                    # to preserve 'red' as the final status.
167                    status = "yellow"
168            elif m["red"] <= recon_results[metric_name]:
169                status = "red"
170
171        if status != "green":
172            raise ReconciliationFailedException(
173                f"The Reconciliation process has failed with status: {status}."
174            )
175        else:
176            self._logger.info("The Reconciliation process has succeeded.")

Reconcile the current results against the truth dataset.