lakehouse_engine.dq_processors.validator

Module containing the definition of a data quality validator.

  1"""Module containing the definition of a data quality validator."""
  2
  3from typing import Any, List
  4
  5from great_expectations.core.batch import RuntimeBatchRequest
  6from great_expectations.data_context import BaseDataContext
  7from pyspark.sql import DataFrame
  8from pyspark.sql.functions import col, collect_set, explode, first, lit, struct, when
  9
 10from lakehouse_engine.core.definitions import DQDefaults, DQFunctionSpec
 11from lakehouse_engine.core.exec_env import ExecEnv
 12from lakehouse_engine.utils.logging_handler import LoggingHandler
 13
 14
 15class Validator(object):
 16    """Class containing the data quality validator."""
 17
 18    _LOGGER = LoggingHandler(__name__).get_logger()
 19
 20    @classmethod
 21    def get_dq_validator(
 22        cls,
 23        context: BaseDataContext,
 24        batch_request: RuntimeBatchRequest,
 25        expectation_suite_name: str,
 26        dq_functions: List[DQFunctionSpec],
 27        critical_functions: List[DQFunctionSpec],
 28    ) -> Any:
 29        """Get a validator according to the specification.
 30
 31        We use getattr to dynamically execute any expectation available.
 32        getattr(validator, function) is similar to validator.function(). With this
 33        approach, we can execute any expectation supported.
 34
 35        Args:
 36            context: the BaseDataContext containing the configurations for the data
 37                source and store backend.
 38            batch_request: run time batch request to be able to query underlying data.
 39            expectation_suite_name: name of the expectation suite.
 40            dq_functions: a list of DQFunctionSpec to consider in the expectation suite.
 41            critical_functions: list of critical expectations in the expectation suite.
 42
 43        Returns:
 44            The validator with the expectation suite stored.
 45        """
 46        validator = context.get_validator(
 47            batch_request=batch_request, expectation_suite_name=expectation_suite_name
 48        )
 49        if dq_functions:
 50            for dq_function in dq_functions:
 51                getattr(validator, dq_function.function)(
 52                    **dq_function.args if dq_function.args else {}
 53                )
 54
 55        if critical_functions:
 56            for critical_function in critical_functions:
 57                meta_args = cls._add_critical_function_tag(critical_function.args)
 58
 59                getattr(validator, critical_function.function)(**meta_args)
 60
 61        return validator.save_expectation_suite(discard_failed_expectations=False)
 62
 63    @classmethod
 64    def tag_source_with_dq(
 65        cls, source_pk: List[str], source_df: DataFrame, results_df: DataFrame
 66    ) -> DataFrame:
 67        """Tags the source dataframe with a new column having the DQ results.
 68
 69        Args:
 70            source_pk: the primary key of the source data.
 71            source_df: the source dataframe to be tagged with DQ results.
 72            results_df: dq results dataframe.
 73
 74        Returns: a dataframe tagged with the DQ results.
 75        """
 76        run_success = True
 77        run_name = results_df.select("run_name").first()[0]
 78        raised_exceptions = (
 79            True
 80            if results_df.filter("exception_info.raised_exception == True").count() > 0
 81            else False
 82        )
 83
 84        failures_df = (
 85            results_df.filter(
 86                "expectation_success == False and size(unexpected_index_list) > 0"
 87            )
 88            if "unexpected_index_list" in results_df.schema.simpleString()
 89            else results_df.filter("expectation_success == False")
 90        )
 91
 92        if failures_df.isEmpty() is not True:
 93            run_success = False
 94
 95            source_df = cls._get_row_tagged_fail_df(
 96                failures_df, raised_exceptions, source_df, source_pk
 97            )
 98
 99        return cls._join_complementary_data(
100            run_name, run_success, raised_exceptions, source_df
101        )
102
103    @classmethod
104    def _add_critical_function_tag(cls, args: dict) -> dict:
105        if "meta" in args.keys():
106            meta = args["meta"]
107
108            if isinstance(meta["notes"], str):
109                meta["notes"] = meta["notes"] + " **Critical function**."
110            else:
111                meta["notes"]["content"] = (
112                    meta["notes"]["content"] + " **Critical function**."
113                )
114
115            args["meta"] = meta
116            return args
117
118        else:
119            args["meta"] = {
120                "notes": {
121                    "format": "markdown",
122                    "content": "**Critical function**.",
123                }
124            }
125            return args
126
127    @staticmethod
128    def _get_row_tagged_fail_df(
129        failures_df: DataFrame,
130        raised_exceptions: bool,
131        source_df: DataFrame,
132        source_pk: List[str],
133    ) -> DataFrame:
134        """Get the source_df DataFrame tagged with the row level failures.
135
136        Args:
137            failures_df: dataframe having all failed expectations from the DQ execution.
138            raised_exceptions: whether there was at least one expectation raising
139                exceptions (True) or not (False).
140            source_df: the source dataframe being tagged with DQ results.
141            source_pk: the primary key of the source data.
142
143        Returns: the source_df tagged with the row level failures.
144        """
145        if "unexpected_index_list" in failures_df.schema.simpleString():
146
147            row_failures_df = (
148                failures_df.alias("a")
149                .withColumn("exploded_list", explode(col("unexpected_index_list")))
150                .selectExpr("a.*", "exploded_list.*")
151                .groupBy(*source_pk)
152                .agg(
153                    struct(
154                        first(col("run_name")).alias("run_name"),
155                        first(col("success")).alias("run_success"),
156                        lit(raised_exceptions).alias("raised_exceptions"),
157                        first(col("expectation_success")).alias("run_row_success"),
158                        collect_set(
159                            struct(
160                                col("expectation_type"),
161                                col("kwargs"),
162                            )
163                        ).alias("dq_failure_details"),
164                    ).alias("dq_validations")
165                )
166            )
167
168            if all(item in row_failures_df.columns for item in source_pk):
169                join_cond = [
170                    col(f"a.{key}").eqNullSafe(col(f"b.{key}")) for key in source_pk
171                ]
172                source_df = (
173                    source_df.alias("a")
174                    .join(row_failures_df.alias("b"), join_cond, "left")
175                    .select("a.*", "b.dq_validations")
176                )
177
178        return source_df
179
180    @staticmethod
181    def _join_complementary_data(
182        run_name: str, run_success: bool, raised_exceptions: bool, source_df: DataFrame
183    ) -> DataFrame:
184        """Join the source_df DataFrame with complementary data.
185
186        The source_df was already tagged/joined with the row level DQ failures, in case
187        there were any. However, there might be cases for which we don't have any
188        failure (everything succeeded) or cases for which only not row level failures
189        happened (e.g. table level expectations or column level aggregations), and, for
190        those we need to join the source_df with complementary data.
191
192        Args:
193            run_name: the name of the DQ execution in great expectations.
194            run_success: whether the general execution of the DQ was succeeded (True)
195                or not (False).
196            raised_exceptions: whether there was at least one expectation raising
197                exceptions (True) or not (False).
198            source_df: the source dataframe being tagged with DQ results.
199
200        Returns: the source_df tagged with complementary data.
201        """
202        complementary_data = [
203            {
204                "dq_validations": {
205                    "run_name": run_name,
206                    "run_success": run_success,
207                    "raised_exceptions": raised_exceptions,
208                    "run_row_success": True,
209                }
210            }
211        ]
212        complementary_df = ExecEnv.SESSION.createDataFrame(
213            complementary_data, schema=DQDefaults.DQ_VALIDATIONS_SCHEMA.value
214        )
215
216        return (
217            source_df.crossJoin(
218                complementary_df.withColumnRenamed(
219                    "dq_validations", "tmp_dq_validations"
220                )
221            )
222            .withColumn(
223                "dq_validations",
224                (
225                    when(
226                        col("dq_validations").isNotNull(), col("dq_validations")
227                    ).otherwise(col("tmp_dq_validations"))
228                    if "dq_validations" in source_df.columns
229                    else col("tmp_dq_validations")
230                ),
231            )
232            .drop("tmp_dq_validations")
233        )
class Validator:
 16class Validator(object):
 17    """Class containing the data quality validator."""
 18
 19    _LOGGER = LoggingHandler(__name__).get_logger()
 20
 21    @classmethod
 22    def get_dq_validator(
 23        cls,
 24        context: BaseDataContext,
 25        batch_request: RuntimeBatchRequest,
 26        expectation_suite_name: str,
 27        dq_functions: List[DQFunctionSpec],
 28        critical_functions: List[DQFunctionSpec],
 29    ) -> Any:
 30        """Get a validator according to the specification.
 31
 32        We use getattr to dynamically execute any expectation available.
 33        getattr(validator, function) is similar to validator.function(). With this
 34        approach, we can execute any expectation supported.
 35
 36        Args:
 37            context: the BaseDataContext containing the configurations for the data
 38                source and store backend.
 39            batch_request: run time batch request to be able to query underlying data.
 40            expectation_suite_name: name of the expectation suite.
 41            dq_functions: a list of DQFunctionSpec to consider in the expectation suite.
 42            critical_functions: list of critical expectations in the expectation suite.
 43
 44        Returns:
 45            The validator with the expectation suite stored.
 46        """
 47        validator = context.get_validator(
 48            batch_request=batch_request, expectation_suite_name=expectation_suite_name
 49        )
 50        if dq_functions:
 51            for dq_function in dq_functions:
 52                getattr(validator, dq_function.function)(
 53                    **dq_function.args if dq_function.args else {}
 54                )
 55
 56        if critical_functions:
 57            for critical_function in critical_functions:
 58                meta_args = cls._add_critical_function_tag(critical_function.args)
 59
 60                getattr(validator, critical_function.function)(**meta_args)
 61
 62        return validator.save_expectation_suite(discard_failed_expectations=False)
 63
 64    @classmethod
 65    def tag_source_with_dq(
 66        cls, source_pk: List[str], source_df: DataFrame, results_df: DataFrame
 67    ) -> DataFrame:
 68        """Tags the source dataframe with a new column having the DQ results.
 69
 70        Args:
 71            source_pk: the primary key of the source data.
 72            source_df: the source dataframe to be tagged with DQ results.
 73            results_df: dq results dataframe.
 74
 75        Returns: a dataframe tagged with the DQ results.
 76        """
 77        run_success = True
 78        run_name = results_df.select("run_name").first()[0]
 79        raised_exceptions = (
 80            True
 81            if results_df.filter("exception_info.raised_exception == True").count() > 0
 82            else False
 83        )
 84
 85        failures_df = (
 86            results_df.filter(
 87                "expectation_success == False and size(unexpected_index_list) > 0"
 88            )
 89            if "unexpected_index_list" in results_df.schema.simpleString()
 90            else results_df.filter("expectation_success == False")
 91        )
 92
 93        if failures_df.isEmpty() is not True:
 94            run_success = False
 95
 96            source_df = cls._get_row_tagged_fail_df(
 97                failures_df, raised_exceptions, source_df, source_pk
 98            )
 99
100        return cls._join_complementary_data(
101            run_name, run_success, raised_exceptions, source_df
102        )
103
104    @classmethod
105    def _add_critical_function_tag(cls, args: dict) -> dict:
106        if "meta" in args.keys():
107            meta = args["meta"]
108
109            if isinstance(meta["notes"], str):
110                meta["notes"] = meta["notes"] + " **Critical function**."
111            else:
112                meta["notes"]["content"] = (
113                    meta["notes"]["content"] + " **Critical function**."
114                )
115
116            args["meta"] = meta
117            return args
118
119        else:
120            args["meta"] = {
121                "notes": {
122                    "format": "markdown",
123                    "content": "**Critical function**.",
124                }
125            }
126            return args
127
128    @staticmethod
129    def _get_row_tagged_fail_df(
130        failures_df: DataFrame,
131        raised_exceptions: bool,
132        source_df: DataFrame,
133        source_pk: List[str],
134    ) -> DataFrame:
135        """Get the source_df DataFrame tagged with the row level failures.
136
137        Args:
138            failures_df: dataframe having all failed expectations from the DQ execution.
139            raised_exceptions: whether there was at least one expectation raising
140                exceptions (True) or not (False).
141            source_df: the source dataframe being tagged with DQ results.
142            source_pk: the primary key of the source data.
143
144        Returns: the source_df tagged with the row level failures.
145        """
146        if "unexpected_index_list" in failures_df.schema.simpleString():
147
148            row_failures_df = (
149                failures_df.alias("a")
150                .withColumn("exploded_list", explode(col("unexpected_index_list")))
151                .selectExpr("a.*", "exploded_list.*")
152                .groupBy(*source_pk)
153                .agg(
154                    struct(
155                        first(col("run_name")).alias("run_name"),
156                        first(col("success")).alias("run_success"),
157                        lit(raised_exceptions).alias("raised_exceptions"),
158                        first(col("expectation_success")).alias("run_row_success"),
159                        collect_set(
160                            struct(
161                                col("expectation_type"),
162                                col("kwargs"),
163                            )
164                        ).alias("dq_failure_details"),
165                    ).alias("dq_validations")
166                )
167            )
168
169            if all(item in row_failures_df.columns for item in source_pk):
170                join_cond = [
171                    col(f"a.{key}").eqNullSafe(col(f"b.{key}")) for key in source_pk
172                ]
173                source_df = (
174                    source_df.alias("a")
175                    .join(row_failures_df.alias("b"), join_cond, "left")
176                    .select("a.*", "b.dq_validations")
177                )
178
179        return source_df
180
181    @staticmethod
182    def _join_complementary_data(
183        run_name: str, run_success: bool, raised_exceptions: bool, source_df: DataFrame
184    ) -> DataFrame:
185        """Join the source_df DataFrame with complementary data.
186
187        The source_df was already tagged/joined with the row level DQ failures, in case
188        there were any. However, there might be cases for which we don't have any
189        failure (everything succeeded) or cases for which only not row level failures
190        happened (e.g. table level expectations or column level aggregations), and, for
191        those we need to join the source_df with complementary data.
192
193        Args:
194            run_name: the name of the DQ execution in great expectations.
195            run_success: whether the general execution of the DQ was succeeded (True)
196                or not (False).
197            raised_exceptions: whether there was at least one expectation raising
198                exceptions (True) or not (False).
199            source_df: the source dataframe being tagged with DQ results.
200
201        Returns: the source_df tagged with complementary data.
202        """
203        complementary_data = [
204            {
205                "dq_validations": {
206                    "run_name": run_name,
207                    "run_success": run_success,
208                    "raised_exceptions": raised_exceptions,
209                    "run_row_success": True,
210                }
211            }
212        ]
213        complementary_df = ExecEnv.SESSION.createDataFrame(
214            complementary_data, schema=DQDefaults.DQ_VALIDATIONS_SCHEMA.value
215        )
216
217        return (
218            source_df.crossJoin(
219                complementary_df.withColumnRenamed(
220                    "dq_validations", "tmp_dq_validations"
221                )
222            )
223            .withColumn(
224                "dq_validations",
225                (
226                    when(
227                        col("dq_validations").isNotNull(), col("dq_validations")
228                    ).otherwise(col("tmp_dq_validations"))
229                    if "dq_validations" in source_df.columns
230                    else col("tmp_dq_validations")
231                ),
232            )
233            .drop("tmp_dq_validations")
234        )

Class containing the data quality validator.

@classmethod
def get_dq_validator( cls, context: <function BaseDataContext>, batch_request: great_expectations.core.batch.RuntimeBatchRequest, expectation_suite_name: str, dq_functions: List[lakehouse_engine.core.definitions.DQFunctionSpec], critical_functions: List[lakehouse_engine.core.definitions.DQFunctionSpec]) -> Any:
21    @classmethod
22    def get_dq_validator(
23        cls,
24        context: BaseDataContext,
25        batch_request: RuntimeBatchRequest,
26        expectation_suite_name: str,
27        dq_functions: List[DQFunctionSpec],
28        critical_functions: List[DQFunctionSpec],
29    ) -> Any:
30        """Get a validator according to the specification.
31
32        We use getattr to dynamically execute any expectation available.
33        getattr(validator, function) is similar to validator.function(). With this
34        approach, we can execute any expectation supported.
35
36        Args:
37            context: the BaseDataContext containing the configurations for the data
38                source and store backend.
39            batch_request: run time batch request to be able to query underlying data.
40            expectation_suite_name: name of the expectation suite.
41            dq_functions: a list of DQFunctionSpec to consider in the expectation suite.
42            critical_functions: list of critical expectations in the expectation suite.
43
44        Returns:
45            The validator with the expectation suite stored.
46        """
47        validator = context.get_validator(
48            batch_request=batch_request, expectation_suite_name=expectation_suite_name
49        )
50        if dq_functions:
51            for dq_function in dq_functions:
52                getattr(validator, dq_function.function)(
53                    **dq_function.args if dq_function.args else {}
54                )
55
56        if critical_functions:
57            for critical_function in critical_functions:
58                meta_args = cls._add_critical_function_tag(critical_function.args)
59
60                getattr(validator, critical_function.function)(**meta_args)
61
62        return validator.save_expectation_suite(discard_failed_expectations=False)

Get a validator according to the specification.

We use getattr to dynamically execute any expectation available. getattr(validator, function) is similar to validator.function(). With this approach, we can execute any expectation supported.

Arguments:
  • context: the BaseDataContext containing the configurations for the data source and store backend.
  • batch_request: run time batch request to be able to query underlying data.
  • expectation_suite_name: name of the expectation suite.
  • dq_functions: a list of DQFunctionSpec to consider in the expectation suite.
  • critical_functions: list of critical expectations in the expectation suite.
Returns:

The validator with the expectation suite stored.

@classmethod
def tag_source_with_dq( cls, source_pk: List[str], source_df: pyspark.sql.dataframe.DataFrame, results_df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
 64    @classmethod
 65    def tag_source_with_dq(
 66        cls, source_pk: List[str], source_df: DataFrame, results_df: DataFrame
 67    ) -> DataFrame:
 68        """Tags the source dataframe with a new column having the DQ results.
 69
 70        Args:
 71            source_pk: the primary key of the source data.
 72            source_df: the source dataframe to be tagged with DQ results.
 73            results_df: dq results dataframe.
 74
 75        Returns: a dataframe tagged with the DQ results.
 76        """
 77        run_success = True
 78        run_name = results_df.select("run_name").first()[0]
 79        raised_exceptions = (
 80            True
 81            if results_df.filter("exception_info.raised_exception == True").count() > 0
 82            else False
 83        )
 84
 85        failures_df = (
 86            results_df.filter(
 87                "expectation_success == False and size(unexpected_index_list) > 0"
 88            )
 89            if "unexpected_index_list" in results_df.schema.simpleString()
 90            else results_df.filter("expectation_success == False")
 91        )
 92
 93        if failures_df.isEmpty() is not True:
 94            run_success = False
 95
 96            source_df = cls._get_row_tagged_fail_df(
 97                failures_df, raised_exceptions, source_df, source_pk
 98            )
 99
100        return cls._join_complementary_data(
101            run_name, run_success, raised_exceptions, source_df
102        )

Tags the source dataframe with a new column having the DQ results.

Arguments:
  • source_pk: the primary key of the source data.
  • source_df: the source dataframe to be tagged with DQ results.
  • results_df: dq results dataframe.

Returns: a dataframe tagged with the DQ results.