lakehouse_engine.dq_processors.validator
Module containing the definition of a data quality validator.
1"""Module containing the definition of a data quality validator.""" 2 3from typing import Any, List 4 5from great_expectations.core.batch import RuntimeBatchRequest 6from great_expectations.data_context import BaseDataContext 7from pyspark.sql import DataFrame 8from pyspark.sql.functions import col, collect_set, explode, first, lit, struct, when 9 10from lakehouse_engine.core.definitions import DQDefaults, DQFunctionSpec 11from lakehouse_engine.core.exec_env import ExecEnv 12from lakehouse_engine.utils.logging_handler import LoggingHandler 13 14 15class Validator(object): 16 """Class containing the data quality validator.""" 17 18 _LOGGER = LoggingHandler(__name__).get_logger() 19 20 @classmethod 21 def get_dq_validator( 22 cls, 23 context: BaseDataContext, 24 batch_request: RuntimeBatchRequest, 25 expectation_suite_name: str, 26 dq_functions: List[DQFunctionSpec], 27 critical_functions: List[DQFunctionSpec], 28 ) -> Any: 29 """Get a validator according to the specification. 30 31 We use getattr to dynamically execute any expectation available. 32 getattr(validator, function) is similar to validator.function(). With this 33 approach, we can execute any expectation supported. 34 35 Args: 36 context: the BaseDataContext containing the configurations for the data 37 source and store backend. 38 batch_request: run time batch request to be able to query underlying data. 39 expectation_suite_name: name of the expectation suite. 40 dq_functions: a list of DQFunctionSpec to consider in the expectation suite. 41 critical_functions: list of critical expectations in the expectation suite. 42 43 Returns: 44 The validator with the expectation suite stored. 45 """ 46 validator = context.get_validator( 47 batch_request=batch_request, expectation_suite_name=expectation_suite_name 48 ) 49 if dq_functions: 50 for dq_function in dq_functions: 51 getattr(validator, dq_function.function)( 52 **dq_function.args if dq_function.args else {} 53 ) 54 55 if critical_functions: 56 for critical_function in critical_functions: 57 meta_args = cls._add_critical_function_tag(critical_function.args) 58 59 getattr(validator, critical_function.function)(**meta_args) 60 61 return validator.save_expectation_suite(discard_failed_expectations=False) 62 63 @classmethod 64 def tag_source_with_dq( 65 cls, source_pk: List[str], source_df: DataFrame, results_df: DataFrame 66 ) -> DataFrame: 67 """Tags the source dataframe with a new column having the DQ results. 68 69 Args: 70 source_pk: the primary key of the source data. 71 source_df: the source dataframe to be tagged with DQ results. 72 results_df: dq results dataframe. 73 74 Returns: a dataframe tagged with the DQ results. 75 """ 76 run_success = True 77 run_name = results_df.select("run_name").first()[0] 78 raised_exceptions = ( 79 True 80 if results_df.filter("exception_info.raised_exception == True").count() > 0 81 else False 82 ) 83 84 failures_df = ( 85 results_df.filter( 86 "expectation_success == False and size(unexpected_index_list) > 0" 87 ) 88 if "unexpected_index_list" in results_df.schema.simpleString() 89 else results_df.filter("expectation_success == False") 90 ) 91 92 if failures_df.isEmpty() is not True: 93 run_success = False 94 95 source_df = cls._get_row_tagged_fail_df( 96 failures_df, raised_exceptions, source_df, source_pk 97 ) 98 99 return cls._join_complementary_data( 100 run_name, run_success, raised_exceptions, source_df 101 ) 102 103 @classmethod 104 def _add_critical_function_tag(cls, args: dict) -> dict: 105 if "meta" in args.keys(): 106 meta = args["meta"] 107 108 if isinstance(meta["notes"], str): 109 meta["notes"] = meta["notes"] + " **Critical function**." 110 else: 111 meta["notes"]["content"] = ( 112 meta["notes"]["content"] + " **Critical function**." 113 ) 114 115 args["meta"] = meta 116 return args 117 118 else: 119 args["meta"] = { 120 "notes": { 121 "format": "markdown", 122 "content": "**Critical function**.", 123 } 124 } 125 return args 126 127 @staticmethod 128 def _get_row_tagged_fail_df( 129 failures_df: DataFrame, 130 raised_exceptions: bool, 131 source_df: DataFrame, 132 source_pk: List[str], 133 ) -> DataFrame: 134 """Get the source_df DataFrame tagged with the row level failures. 135 136 Args: 137 failures_df: dataframe having all failed expectations from the DQ execution. 138 raised_exceptions: whether there was at least one expectation raising 139 exceptions (True) or not (False). 140 source_df: the source dataframe being tagged with DQ results. 141 source_pk: the primary key of the source data. 142 143 Returns: the source_df tagged with the row level failures. 144 """ 145 if "unexpected_index_list" in failures_df.schema.simpleString(): 146 147 row_failures_df = ( 148 failures_df.alias("a") 149 .withColumn("exploded_list", explode(col("unexpected_index_list"))) 150 .selectExpr("a.*", "exploded_list.*") 151 .groupBy(*source_pk) 152 .agg( 153 struct( 154 first(col("run_name")).alias("run_name"), 155 first(col("success")).alias("run_success"), 156 lit(raised_exceptions).alias("raised_exceptions"), 157 first(col("expectation_success")).alias("run_row_success"), 158 collect_set( 159 struct( 160 col("expectation_type"), 161 col("kwargs"), 162 ) 163 ).alias("dq_failure_details"), 164 ).alias("dq_validations") 165 ) 166 ) 167 168 if all(item in row_failures_df.columns for item in source_pk): 169 join_cond = [ 170 col(f"a.{key}").eqNullSafe(col(f"b.{key}")) for key in source_pk 171 ] 172 source_df = ( 173 source_df.alias("a") 174 .join(row_failures_df.alias("b"), join_cond, "left") 175 .select("a.*", "b.dq_validations") 176 ) 177 178 return source_df 179 180 @staticmethod 181 def _join_complementary_data( 182 run_name: str, run_success: bool, raised_exceptions: bool, source_df: DataFrame 183 ) -> DataFrame: 184 """Join the source_df DataFrame with complementary data. 185 186 The source_df was already tagged/joined with the row level DQ failures, in case 187 there were any. However, there might be cases for which we don't have any 188 failure (everything succeeded) or cases for which only not row level failures 189 happened (e.g. table level expectations or column level aggregations), and, for 190 those we need to join the source_df with complementary data. 191 192 Args: 193 run_name: the name of the DQ execution in great expectations. 194 run_success: whether the general execution of the DQ was succeeded (True) 195 or not (False). 196 raised_exceptions: whether there was at least one expectation raising 197 exceptions (True) or not (False). 198 source_df: the source dataframe being tagged with DQ results. 199 200 Returns: the source_df tagged with complementary data. 201 """ 202 complementary_data = [ 203 { 204 "dq_validations": { 205 "run_name": run_name, 206 "run_success": run_success, 207 "raised_exceptions": raised_exceptions, 208 "run_row_success": True, 209 } 210 } 211 ] 212 complementary_df = ExecEnv.SESSION.createDataFrame( 213 complementary_data, schema=DQDefaults.DQ_VALIDATIONS_SCHEMA.value 214 ) 215 216 return ( 217 source_df.crossJoin( 218 complementary_df.withColumnRenamed( 219 "dq_validations", "tmp_dq_validations" 220 ) 221 ) 222 .withColumn( 223 "dq_validations", 224 ( 225 when( 226 col("dq_validations").isNotNull(), col("dq_validations") 227 ).otherwise(col("tmp_dq_validations")) 228 if "dq_validations" in source_df.columns 229 else col("tmp_dq_validations") 230 ), 231 ) 232 .drop("tmp_dq_validations") 233 )
class
Validator:
16class Validator(object): 17 """Class containing the data quality validator.""" 18 19 _LOGGER = LoggingHandler(__name__).get_logger() 20 21 @classmethod 22 def get_dq_validator( 23 cls, 24 context: BaseDataContext, 25 batch_request: RuntimeBatchRequest, 26 expectation_suite_name: str, 27 dq_functions: List[DQFunctionSpec], 28 critical_functions: List[DQFunctionSpec], 29 ) -> Any: 30 """Get a validator according to the specification. 31 32 We use getattr to dynamically execute any expectation available. 33 getattr(validator, function) is similar to validator.function(). With this 34 approach, we can execute any expectation supported. 35 36 Args: 37 context: the BaseDataContext containing the configurations for the data 38 source and store backend. 39 batch_request: run time batch request to be able to query underlying data. 40 expectation_suite_name: name of the expectation suite. 41 dq_functions: a list of DQFunctionSpec to consider in the expectation suite. 42 critical_functions: list of critical expectations in the expectation suite. 43 44 Returns: 45 The validator with the expectation suite stored. 46 """ 47 validator = context.get_validator( 48 batch_request=batch_request, expectation_suite_name=expectation_suite_name 49 ) 50 if dq_functions: 51 for dq_function in dq_functions: 52 getattr(validator, dq_function.function)( 53 **dq_function.args if dq_function.args else {} 54 ) 55 56 if critical_functions: 57 for critical_function in critical_functions: 58 meta_args = cls._add_critical_function_tag(critical_function.args) 59 60 getattr(validator, critical_function.function)(**meta_args) 61 62 return validator.save_expectation_suite(discard_failed_expectations=False) 63 64 @classmethod 65 def tag_source_with_dq( 66 cls, source_pk: List[str], source_df: DataFrame, results_df: DataFrame 67 ) -> DataFrame: 68 """Tags the source dataframe with a new column having the DQ results. 69 70 Args: 71 source_pk: the primary key of the source data. 72 source_df: the source dataframe to be tagged with DQ results. 73 results_df: dq results dataframe. 74 75 Returns: a dataframe tagged with the DQ results. 76 """ 77 run_success = True 78 run_name = results_df.select("run_name").first()[0] 79 raised_exceptions = ( 80 True 81 if results_df.filter("exception_info.raised_exception == True").count() > 0 82 else False 83 ) 84 85 failures_df = ( 86 results_df.filter( 87 "expectation_success == False and size(unexpected_index_list) > 0" 88 ) 89 if "unexpected_index_list" in results_df.schema.simpleString() 90 else results_df.filter("expectation_success == False") 91 ) 92 93 if failures_df.isEmpty() is not True: 94 run_success = False 95 96 source_df = cls._get_row_tagged_fail_df( 97 failures_df, raised_exceptions, source_df, source_pk 98 ) 99 100 return cls._join_complementary_data( 101 run_name, run_success, raised_exceptions, source_df 102 ) 103 104 @classmethod 105 def _add_critical_function_tag(cls, args: dict) -> dict: 106 if "meta" in args.keys(): 107 meta = args["meta"] 108 109 if isinstance(meta["notes"], str): 110 meta["notes"] = meta["notes"] + " **Critical function**." 111 else: 112 meta["notes"]["content"] = ( 113 meta["notes"]["content"] + " **Critical function**." 114 ) 115 116 args["meta"] = meta 117 return args 118 119 else: 120 args["meta"] = { 121 "notes": { 122 "format": "markdown", 123 "content": "**Critical function**.", 124 } 125 } 126 return args 127 128 @staticmethod 129 def _get_row_tagged_fail_df( 130 failures_df: DataFrame, 131 raised_exceptions: bool, 132 source_df: DataFrame, 133 source_pk: List[str], 134 ) -> DataFrame: 135 """Get the source_df DataFrame tagged with the row level failures. 136 137 Args: 138 failures_df: dataframe having all failed expectations from the DQ execution. 139 raised_exceptions: whether there was at least one expectation raising 140 exceptions (True) or not (False). 141 source_df: the source dataframe being tagged with DQ results. 142 source_pk: the primary key of the source data. 143 144 Returns: the source_df tagged with the row level failures. 145 """ 146 if "unexpected_index_list" in failures_df.schema.simpleString(): 147 148 row_failures_df = ( 149 failures_df.alias("a") 150 .withColumn("exploded_list", explode(col("unexpected_index_list"))) 151 .selectExpr("a.*", "exploded_list.*") 152 .groupBy(*source_pk) 153 .agg( 154 struct( 155 first(col("run_name")).alias("run_name"), 156 first(col("success")).alias("run_success"), 157 lit(raised_exceptions).alias("raised_exceptions"), 158 first(col("expectation_success")).alias("run_row_success"), 159 collect_set( 160 struct( 161 col("expectation_type"), 162 col("kwargs"), 163 ) 164 ).alias("dq_failure_details"), 165 ).alias("dq_validations") 166 ) 167 ) 168 169 if all(item in row_failures_df.columns for item in source_pk): 170 join_cond = [ 171 col(f"a.{key}").eqNullSafe(col(f"b.{key}")) for key in source_pk 172 ] 173 source_df = ( 174 source_df.alias("a") 175 .join(row_failures_df.alias("b"), join_cond, "left") 176 .select("a.*", "b.dq_validations") 177 ) 178 179 return source_df 180 181 @staticmethod 182 def _join_complementary_data( 183 run_name: str, run_success: bool, raised_exceptions: bool, source_df: DataFrame 184 ) -> DataFrame: 185 """Join the source_df DataFrame with complementary data. 186 187 The source_df was already tagged/joined with the row level DQ failures, in case 188 there were any. However, there might be cases for which we don't have any 189 failure (everything succeeded) or cases for which only not row level failures 190 happened (e.g. table level expectations or column level aggregations), and, for 191 those we need to join the source_df with complementary data. 192 193 Args: 194 run_name: the name of the DQ execution in great expectations. 195 run_success: whether the general execution of the DQ was succeeded (True) 196 or not (False). 197 raised_exceptions: whether there was at least one expectation raising 198 exceptions (True) or not (False). 199 source_df: the source dataframe being tagged with DQ results. 200 201 Returns: the source_df tagged with complementary data. 202 """ 203 complementary_data = [ 204 { 205 "dq_validations": { 206 "run_name": run_name, 207 "run_success": run_success, 208 "raised_exceptions": raised_exceptions, 209 "run_row_success": True, 210 } 211 } 212 ] 213 complementary_df = ExecEnv.SESSION.createDataFrame( 214 complementary_data, schema=DQDefaults.DQ_VALIDATIONS_SCHEMA.value 215 ) 216 217 return ( 218 source_df.crossJoin( 219 complementary_df.withColumnRenamed( 220 "dq_validations", "tmp_dq_validations" 221 ) 222 ) 223 .withColumn( 224 "dq_validations", 225 ( 226 when( 227 col("dq_validations").isNotNull(), col("dq_validations") 228 ).otherwise(col("tmp_dq_validations")) 229 if "dq_validations" in source_df.columns 230 else col("tmp_dq_validations") 231 ), 232 ) 233 .drop("tmp_dq_validations") 234 )
Class containing the data quality validator.
@classmethod
def
get_dq_validator( cls, context: <function BaseDataContext>, batch_request: great_expectations.core.batch.RuntimeBatchRequest, expectation_suite_name: str, dq_functions: List[lakehouse_engine.core.definitions.DQFunctionSpec], critical_functions: List[lakehouse_engine.core.definitions.DQFunctionSpec]) -> Any:
21 @classmethod 22 def get_dq_validator( 23 cls, 24 context: BaseDataContext, 25 batch_request: RuntimeBatchRequest, 26 expectation_suite_name: str, 27 dq_functions: List[DQFunctionSpec], 28 critical_functions: List[DQFunctionSpec], 29 ) -> Any: 30 """Get a validator according to the specification. 31 32 We use getattr to dynamically execute any expectation available. 33 getattr(validator, function) is similar to validator.function(). With this 34 approach, we can execute any expectation supported. 35 36 Args: 37 context: the BaseDataContext containing the configurations for the data 38 source and store backend. 39 batch_request: run time batch request to be able to query underlying data. 40 expectation_suite_name: name of the expectation suite. 41 dq_functions: a list of DQFunctionSpec to consider in the expectation suite. 42 critical_functions: list of critical expectations in the expectation suite. 43 44 Returns: 45 The validator with the expectation suite stored. 46 """ 47 validator = context.get_validator( 48 batch_request=batch_request, expectation_suite_name=expectation_suite_name 49 ) 50 if dq_functions: 51 for dq_function in dq_functions: 52 getattr(validator, dq_function.function)( 53 **dq_function.args if dq_function.args else {} 54 ) 55 56 if critical_functions: 57 for critical_function in critical_functions: 58 meta_args = cls._add_critical_function_tag(critical_function.args) 59 60 getattr(validator, critical_function.function)(**meta_args) 61 62 return validator.save_expectation_suite(discard_failed_expectations=False)
Get a validator according to the specification.
We use getattr to dynamically execute any expectation available. getattr(validator, function) is similar to validator.function(). With this approach, we can execute any expectation supported.
Arguments:
- context: the BaseDataContext containing the configurations for the data source and store backend.
- batch_request: run time batch request to be able to query underlying data.
- expectation_suite_name: name of the expectation suite.
- dq_functions: a list of DQFunctionSpec to consider in the expectation suite.
- critical_functions: list of critical expectations in the expectation suite.
Returns:
The validator with the expectation suite stored.
@classmethod
def
tag_source_with_dq( cls, source_pk: List[str], source_df: pyspark.sql.dataframe.DataFrame, results_df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
64 @classmethod 65 def tag_source_with_dq( 66 cls, source_pk: List[str], source_df: DataFrame, results_df: DataFrame 67 ) -> DataFrame: 68 """Tags the source dataframe with a new column having the DQ results. 69 70 Args: 71 source_pk: the primary key of the source data. 72 source_df: the source dataframe to be tagged with DQ results. 73 results_df: dq results dataframe. 74 75 Returns: a dataframe tagged with the DQ results. 76 """ 77 run_success = True 78 run_name = results_df.select("run_name").first()[0] 79 raised_exceptions = ( 80 True 81 if results_df.filter("exception_info.raised_exception == True").count() > 0 82 else False 83 ) 84 85 failures_df = ( 86 results_df.filter( 87 "expectation_success == False and size(unexpected_index_list) > 0" 88 ) 89 if "unexpected_index_list" in results_df.schema.simpleString() 90 else results_df.filter("expectation_success == False") 91 ) 92 93 if failures_df.isEmpty() is not True: 94 run_success = False 95 96 source_df = cls._get_row_tagged_fail_df( 97 failures_df, raised_exceptions, source_df, source_pk 98 ) 99 100 return cls._join_complementary_data( 101 run_name, run_success, raised_exceptions, source_df 102 )
Tags the source dataframe with a new column having the DQ results.
Arguments:
- source_pk: the primary key of the source data.
- source_df: the source dataframe to be tagged with DQ results.
- results_df: dq results dataframe.
Returns: a dataframe tagged with the DQ results.