lakehouse_engine.dq_processors.dq_factory
Module containing the class definition of the Data Quality Factory.
1"""Module containing the class definition of the Data Quality Factory.""" 2 3import importlib.util 4import json 5from datetime import datetime, timezone 6from json import dumps, loads 7from typing import Any, Dict, List, Optional, OrderedDict, Tuple, Union 8 9from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult 10from great_expectations.core.batch import RuntimeBatchRequest 11from great_expectations.data_context import EphemeralDataContext 12from great_expectations.data_context.data_context.context_factory import get_context 13from great_expectations.data_context.types.base import ( 14 AnonymizedUsageStatisticsConfig, 15 DataContextConfig, 16 FilesystemStoreBackendDefaults, 17 S3StoreBackendDefaults, 18) 19from pyspark.sql import DataFrame 20from pyspark.sql.functions import ( 21 array, 22 coalesce, 23 col, 24 collect_list, 25 concat_ws, 26 dayofmonth, 27 explode, 28 from_json, 29 lit, 30 month, 31 schema_of_json, 32 sort_array, 33 struct, 34 to_json, 35 to_timestamp, 36 transform, 37 year, 38) 39from pyspark.sql.types import StringType 40 41from lakehouse_engine.core.definitions import ( 42 DQDefaults, 43 DQSpec, 44 DQType, 45 OutputSpec, 46 WriteType, 47) 48from lakehouse_engine.core.exec_env import ExecEnv 49from lakehouse_engine.core.table_manager import TableManager 50from lakehouse_engine.dq_processors.exceptions import ( 51 DQCheckpointsResultsException, 52 DQValidationsFailedException, 53) 54from lakehouse_engine.dq_processors.validator import Validator 55from lakehouse_engine.io.writer_factory import WriterFactory 56from lakehouse_engine.utils.logging_handler import LoggingHandler 57 58 59class DQFactory(object): 60 """Class for the Data Quality Factory.""" 61 62 _LOGGER = LoggingHandler(__name__).get_logger() 63 _TIMESTAMP = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") 64 65 @classmethod 66 def run_dq_process(cls, dq_spec: DQSpec, data: DataFrame) -> DataFrame: 67 """Run the specified data quality process on a dataframe. 68 69 Based on the dq_specs we apply the defined expectations on top of the dataframe 70 in order to apply the necessary validations and then output the result of 71 the data quality process. 72 73 Args: 74 dq_spec: data quality specification. 75 data: input dataframe to run the dq process on. 76 77 Returns: 78 The DataFrame containing the results of the DQ process. 79 """ 80 # import custom expectations for them to be available to be used. 81 for expectation in DQDefaults.CUSTOM_EXPECTATION_LIST.value: 82 importlib.__import__( 83 "lakehouse_engine.dq_processors.custom_expectations." + expectation 84 ) 85 86 context = get_context(project_config=cls._get_data_context_config(dq_spec)) 87 context.add_datasource(**cls._get_data_source_defaults(dq_spec)) 88 89 expectation_suite_name = ( 90 dq_spec.expectation_suite_name 91 if dq_spec.expectation_suite_name 92 else f"{dq_spec.spec_id}-{dq_spec.input_id}-{dq_spec.dq_type}" 93 ) 94 context.add_or_update_expectation_suite( 95 expectation_suite_name=expectation_suite_name 96 ) 97 98 batch_request = cls._get_batch_request(dq_spec, data) 99 100 if ( 101 dq_spec.dq_type == DQType.VALIDATOR.value 102 or dq_spec.dq_type == DQType.PRISMA.value 103 ): 104 Validator.get_dq_validator( 105 context, 106 batch_request, 107 expectation_suite_name, 108 dq_spec.dq_functions, 109 dq_spec.critical_functions, 110 ) 111 112 source_pk = cls._get_unexpected_rows_pk(dq_spec) 113 results, results_df = cls._configure_and_run_checkpoint( 114 dq_spec, context, batch_request, expectation_suite_name, source_pk 115 ) 116 117 if dq_spec.dq_type == DQType.PRISMA.value: 118 119 results_df = results_df.withColumn("source_primary_key", lit(source_pk)) 120 121 processed_keys_df = data.select( 122 concat_ws( 123 ", ", *[coalesce(col(c), lit("null")) for c in source_pk] 124 ).alias("combined_pk") 125 ) 126 comb_pk_expr = ( 127 sort_array(collect_list("combined_pk")) 128 if dq_spec.sort_processed_keys 129 else collect_list("combined_pk") 130 ) 131 processed_keys_df = processed_keys_df.agg( 132 concat_ws("||", comb_pk_expr).alias("processed_keys") 133 ) 134 135 results_df = results_df.join(processed_keys_df, lit(1) == lit(1)) 136 137 cls._write_to_result_sink(dq_spec, results_df) 138 139 cls._log_or_fail(results, dq_spec) 140 141 if ( 142 dq_spec.tag_source_data 143 and dq_spec.result_sink_explode 144 and dq_spec.fail_on_error is not True 145 ): 146 data = Validator.tag_source_with_dq(source_pk, data, results_df) 147 else: 148 raise TypeError( 149 f"Type of Data Quality '{dq_spec.dq_type}' is not supported." 150 ) 151 152 return data 153 154 @classmethod 155 def build_data_docs( 156 cls, 157 store_backend: str = DQDefaults.STORE_BACKEND.value, 158 local_fs_root_dir: str = None, 159 data_docs_local_fs: str = None, 160 data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value, 161 bucket: str = None, 162 data_docs_bucket: str = None, 163 expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 164 validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value, 165 checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value, 166 ) -> None: 167 """Build Data Docs for the project. 168 169 This function does a full build of data docs based on all the great expectations 170 checkpoints in the specified location, getting all history of run/validations 171 executed and results. 172 173 Args: 174 store_backend: which store_backend to use (e.g. s3 or file_system). 175 local_fs_root_dir: path of the root directory. Note: only applicable 176 for store_backend file_system 177 data_docs_local_fs: path of the root directory. Note: only applicable 178 for store_backend file_system. 179 data_docs_prefix: prefix where to store data_docs' data. 180 bucket: the bucket name to consider for the store_backend 181 (store DQ artefacts). Note: only applicable for store_backend s3. 182 data_docs_bucket: the bucket name for data docs only. When defined, 183 it will supersede bucket parameter. 184 Note: only applicable for store_backend s3. 185 expectations_store_prefix: prefix where to store expectations' data. 186 Note: only applicable for store_backend s3. 187 validations_store_prefix: prefix where to store validations' data. 188 Note: only applicable for store_backend s3. 189 checkpoint_store_prefix: prefix where to store checkpoints' data. 190 Note: only applicable for store_backend s3. 191 """ 192 if store_backend == DQDefaults.STORE_BACKEND.value: 193 dq_spec = DQSpec( 194 spec_id="dq_validator", 195 input_id="dq", 196 dq_type=DQType.VALIDATOR.value, 197 store_backend=DQDefaults.STORE_BACKEND.value, 198 data_docs_prefix=data_docs_prefix, 199 bucket=bucket, 200 data_docs_bucket=data_docs_bucket, 201 expectations_store_prefix=expectations_store_prefix, 202 validations_store_prefix=validations_store_prefix, 203 checkpoint_store_prefix=checkpoint_store_prefix, 204 ) 205 elif store_backend == DQDefaults.FILE_SYSTEM_STORE.value: 206 dq_spec = DQSpec( 207 spec_id="dq_validator", 208 input_id="dq", 209 dq_type=DQType.VALIDATOR.value, 210 store_backend=DQDefaults.FILE_SYSTEM_STORE.value, 211 local_fs_root_dir=local_fs_root_dir, 212 data_docs_local_fs=data_docs_local_fs, 213 data_docs_prefix=data_docs_prefix, 214 ) 215 context = get_context(project_config=cls._get_data_context_config(dq_spec)) 216 cls._LOGGER.info("The data docs were rebuilt") 217 context.build_data_docs() 218 219 @classmethod 220 def _check_critical_functions_tags(cls, failed_expectations: List[Any]) -> list: 221 critical_failure = [] 222 223 for expectation in failed_expectations: 224 meta = expectation["meta"] 225 if meta and ( 226 ("notes" in meta.keys() and "Critical function" in meta["notes"]) 227 or ( 228 "content" in meta["notes"].keys() 229 and "Critical function" in meta["notes"]["content"] 230 ) 231 ): 232 critical_failure.append(expectation["expectation_type"]) 233 234 return critical_failure 235 236 @classmethod 237 def _configure_and_run_checkpoint( 238 cls, 239 dq_spec: DQSpec, 240 context: EphemeralDataContext, 241 batch_request: RuntimeBatchRequest, 242 expectation_suite_name: str, 243 source_pk: List[str], 244 ) -> Tuple[CheckpointResult, DataFrame]: 245 """Configure, run and return checkpoint results. 246 247 A checkpoint is what enables us to run the validations of the expectations' 248 suite on the batches of data. 249 250 Args: 251 dq_spec: data quality specification. 252 context: the EphemeralDataContext containing the configurations for the data 253 source and store backend. 254 batch_request: run time batch request to be able to query underlying data. 255 expectation_suite_name: name of the expectation suite. 256 source_pk: the primary key of the source data. 257 258 Returns: 259 The checkpoint results in two types: CheckpointResult and Dataframe. 260 """ 261 checkpoint_name = f"{dq_spec.spec_id}-{dq_spec.input_id}-checkpoint" 262 context.add_or_update_checkpoint( 263 name=checkpoint_name, 264 class_name=DQDefaults.DATA_CHECKPOINTS_CLASS_NAME.value, 265 config_version=DQDefaults.DATA_CHECKPOINTS_CONFIG_VERSION.value, 266 run_name_template=f"%Y%m%d-%H%M%S-{checkpoint_name}", 267 ) 268 269 result_format: Dict[str, Any] = { 270 "result_format": dq_spec.gx_result_format, 271 } 272 if source_pk: 273 result_format = { 274 **result_format, 275 "unexpected_index_column_names": source_pk, 276 } 277 278 results = context.run_checkpoint( 279 checkpoint_name=checkpoint_name, 280 validations=[ 281 { 282 "batch_request": batch_request, 283 "expectation_suite_name": expectation_suite_name, 284 } 285 ], 286 result_format=result_format, 287 ) 288 289 return results, cls._transform_checkpoint_results( 290 results.to_json_dict(), dq_spec 291 ) 292 293 @classmethod 294 def _explode_results( 295 cls, 296 df: DataFrame, 297 dq_spec: DQSpec, 298 ) -> DataFrame: 299 """Transform dq results dataframe exploding a set of columns. 300 301 Args: 302 df: dataframe with dq results to be exploded. 303 dq_spec: data quality specification. 304 """ 305 df = df.withColumn( 306 "validation_results", explode("run_results.validation_result.results") 307 ).withColumn("source", lit(dq_spec.source)) 308 309 new_columns = [ 310 "validation_results.expectation_config.kwargs.*", 311 "run_results.validation_result.statistics.*", 312 "validation_results.expectation_config.expectation_type", 313 "validation_results.success as expectation_success", 314 "validation_results.exception_info", 315 ] + dq_spec.result_sink_extra_columns 316 317 df_exploded = df.selectExpr(*df.columns, *new_columns).drop( 318 *[c.replace(".*", "").split(" as")[0] for c in new_columns] 319 ) 320 321 schema = df_exploded.schema.simpleString() 322 if "unexpected_index_list" in schema: 323 df_exploded = ( 324 df_exploded.withColumn( 325 "unexpected_index_list", 326 array(struct(lit(True).alias("run_success"))), 327 ) 328 if df.select( 329 col("validation_results.result.unexpected_index_list") 330 ).dtypes[0][1] 331 == "array<string>" 332 else df_exploded.withColumn( 333 "unexpected_index_list", 334 transform( 335 col("validation_results.result.unexpected_index_list"), 336 lambda x: x.withField("run_success", lit(False)), 337 ), 338 ) 339 ) 340 341 if "observed_value" in schema: 342 df_exploded = df_exploded.withColumn( 343 "observed_value", col("validation_results.result.observed_value") 344 ) 345 346 return ( 347 df_exploded.withColumn("run_time_year", year(to_timestamp("run_time"))) 348 .withColumn("run_time_month", month(to_timestamp("run_time"))) 349 .withColumn("run_time_day", dayofmonth(to_timestamp("run_time"))) 350 .withColumn("checkpoint_config", to_json(col("checkpoint_config"))) 351 .withColumn("run_results", to_json(col("run_results"))) 352 .withColumn( 353 "kwargs", to_json(col("validation_results.expectation_config.kwargs")) 354 ) 355 .withColumn("validation_results", to_json(col("validation_results"))) 356 ) 357 358 @classmethod 359 def _get_batch_request( 360 cls, dq_spec: DQSpec, data: DataFrame 361 ) -> RuntimeBatchRequest: 362 """Get run time batch request to be able to query underlying data. 363 364 Args: 365 dq_spec: data quality process specification. 366 data: input dataframe to run the dq process on. 367 368 Returns: 369 The RuntimeBatchRequest object configuration. 370 """ 371 return RuntimeBatchRequest( 372 datasource_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource", 373 data_connector_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector", 374 data_asset_name=( 375 dq_spec.data_asset_name 376 if dq_spec.data_asset_name 377 else f"{dq_spec.spec_id}-{dq_spec.input_id}" 378 ), 379 batch_identifiers={ 380 "spec_id": dq_spec.spec_id, 381 "input_id": dq_spec.input_id, 382 "timestamp": cls._TIMESTAMP, 383 }, 384 runtime_parameters={"batch_data": data}, 385 ) 386 387 @classmethod 388 def _get_data_context_config(cls, dq_spec: DQSpec) -> DataContextConfig: 389 """Get the configuration of the data context. 390 391 Based on the configuration it is possible to define the backend to be 392 the file system (e.g. local file system) or S3, meaning that the DQ artefacts 393 will be stored according to this configuration. 394 395 Args: 396 dq_spec: data quality process specification. 397 398 Returns: 399 The DataContextConfig object configuration. 400 """ 401 store_backend: Union[FilesystemStoreBackendDefaults, S3StoreBackendDefaults] 402 data_docs_site = None 403 404 if dq_spec.store_backend == DQDefaults.FILE_SYSTEM_STORE.value: 405 store_backend = FilesystemStoreBackendDefaults( 406 root_directory=dq_spec.local_fs_root_dir 407 ) 408 data_docs_site = cls._get_data_docs_sites( 409 "local_site", store_backend.data_docs_sites, dq_spec 410 ) 411 elif dq_spec.store_backend == DQDefaults.FILE_SYSTEM_S3_STORE.value: 412 store_backend = S3StoreBackendDefaults( 413 default_bucket_name=dq_spec.bucket, 414 validations_store_prefix=dq_spec.validations_store_prefix, 415 checkpoint_store_prefix=dq_spec.checkpoint_store_prefix, 416 expectations_store_prefix=dq_spec.expectations_store_prefix, 417 data_docs_prefix=dq_spec.data_docs_prefix, 418 data_docs_bucket_name=( 419 dq_spec.data_docs_bucket 420 if dq_spec.data_docs_bucket 421 else dq_spec.bucket 422 ), 423 ) 424 data_docs_site = cls._get_data_docs_sites( 425 "s3_site", store_backend.data_docs_sites, dq_spec 426 ) 427 428 return DataContextConfig( 429 store_backend_defaults=store_backend, 430 data_docs_sites=data_docs_site, 431 anonymous_usage_statistics=AnonymizedUsageStatisticsConfig(enabled=False), 432 ) 433 434 @classmethod 435 def _get_data_docs_sites( 436 cls, site_name: str, data_docs_site: dict, dq_spec: DQSpec 437 ) -> dict: 438 """Get the custom configuration of the data_docs_sites. 439 440 Args: 441 site_name: the name to give to the site. 442 data_docs_site: the default configuration for the data_docs_site. 443 dq_spec: data quality specification. 444 445 Returns: 446 Modified data_docs_site. 447 """ 448 data_docs_site[site_name]["show_how_to_buttons"] = False 449 450 if site_name == "local_site": 451 data_docs_site[site_name]["store_backend"][ 452 "base_directory" 453 ] = dq_spec.data_docs_prefix 454 455 if dq_spec.data_docs_local_fs: 456 # Enable to write data_docs in a separated path 457 data_docs_site[site_name]["store_backend"][ 458 "root_directory" 459 ] = dq_spec.data_docs_local_fs 460 461 return data_docs_site 462 463 @classmethod 464 def _get_data_source_defaults(cls, dq_spec: DQSpec) -> dict: 465 """Get the configuration for a datasource. 466 467 Args: 468 dq_spec: data quality specification. 469 470 Returns: 471 The python dictionary with the datasource configuration. 472 """ 473 return { 474 "name": f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource", 475 "class_name": DQDefaults.DATASOURCE_CLASS_NAME.value, 476 "execution_engine": { 477 "class_name": DQDefaults.DATASOURCE_EXECUTION_ENGINE.value, 478 "persist": False, 479 }, 480 "data_connectors": { 481 f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector": { 482 "module_name": DQDefaults.DATA_CONNECTORS_MODULE_NAME.value, 483 "class_name": DQDefaults.DATA_CONNECTORS_CLASS_NAME.value, 484 "assets": { 485 ( 486 dq_spec.data_asset_name 487 if dq_spec.data_asset_name 488 else f"{dq_spec.spec_id}-{dq_spec.input_id}" 489 ): {"batch_identifiers": DQDefaults.DQ_BATCH_IDENTIFIERS.value} 490 }, 491 } 492 }, 493 } 494 495 @classmethod 496 def _get_failed_expectations( 497 cls, results: CheckpointResult, dq_spec: DQSpec 498 ) -> List[Any]: 499 """Get the failed expectations of a Checkpoint result. 500 501 Args: 502 results: the results of the DQ process. 503 dq_spec: data quality specification. 504 505 Returns: a list of failed expectations. 506 """ 507 failed_expectations = [] 508 for validation_result in results.list_validation_results(): 509 expectations_results = validation_result["results"] 510 for result in expectations_results: 511 if not result["success"]: 512 failed_expectations.append(result["expectation_config"]) 513 if result["exception_info"]["raised_exception"]: 514 cls._LOGGER.error( 515 f"""The expectation {str(result["expectation_config"])} 516 raised the following exception: 517 {result["exception_info"]["exception_message"]}""" 518 ) 519 cls._LOGGER.error( 520 f"{len(failed_expectations)} out of {len(expectations_results)} " 521 f"Data Quality Expectation(s) have failed! Failed Expectations: " 522 f"{failed_expectations}" 523 ) 524 525 percentage_failure = 1 - ( 526 validation_result["statistics"]["success_percent"] / 100 527 ) 528 529 if ( 530 dq_spec.max_percentage_failure is not None 531 and dq_spec.max_percentage_failure < percentage_failure 532 ): 533 raise DQValidationsFailedException( 534 f"Max error threshold is being surpassed! " 535 f"Expected: {dq_spec.max_percentage_failure} " 536 f"Got: {percentage_failure}" 537 ) 538 539 return failed_expectations 540 541 @classmethod 542 def _get_unexpected_rows_pk(cls, dq_spec: DQSpec) -> Optional[List[str]]: 543 """Get primary key for using on rows failing DQ validations. 544 545 Args: 546 dq_spec: data quality specification. 547 548 Returns: the list of columns that are part of the primary key. 549 """ 550 if dq_spec.unexpected_rows_pk: 551 return dq_spec.unexpected_rows_pk 552 elif dq_spec.tbl_to_derive_pk: 553 return TableManager( 554 {"function": "get_tbl_pk", "table_or_view": dq_spec.tbl_to_derive_pk} 555 ).get_tbl_pk() 556 elif dq_spec.tag_source_data: 557 raise ValueError( 558 "You need to provide either the argument " 559 "'unexpected_rows_pk' or 'tbl_to_derive_pk'." 560 ) 561 else: 562 return None 563 564 @classmethod 565 def _log_or_fail(cls, results: CheckpointResult, dq_spec: DQSpec) -> None: 566 """Log the execution of the Data Quality process. 567 568 Args: 569 results: the results of the DQ process. 570 dq_spec: data quality specification. 571 """ 572 if results["success"]: 573 cls._LOGGER.info( 574 "The data passed all the expectations defined. Everything looks good!" 575 ) 576 else: 577 failed_expectations = cls._get_failed_expectations(results, dq_spec) 578 if dq_spec.critical_functions: 579 critical_failure = cls._check_critical_functions_tags( 580 failed_expectations 581 ) 582 583 if critical_failure: 584 raise DQValidationsFailedException( 585 f"Data Quality Validations Failed, the following critical " 586 f"expectations failed: {critical_failure}." 587 ) 588 elif dq_spec.fail_on_error: 589 raise DQValidationsFailedException("Data Quality Validations Failed!") 590 591 @classmethod 592 def _transform_checkpoint_results( 593 cls, checkpoint_results: dict, dq_spec: DQSpec 594 ) -> DataFrame: 595 """Transforms the checkpoint results and creates new entries. 596 597 All the items of the dictionary are cast to a json like format. 598 The validation_result_identifier is extracted from the run_results column 599 into a separated column. All columns are cast to json like format. 600 After that the dictionary is converted into a dataframe. 601 602 Args: 603 checkpoint_results: dict with results of the checkpoint run. 604 dq_spec: data quality specification. 605 606 Returns: 607 Transformed results dataframe. 608 """ 609 results_json_dict = loads(dumps(checkpoint_results)) 610 611 results_dict = {} 612 for key, value in results_json_dict.items(): 613 if key == "run_results": 614 checkpoint_result_identifier = list(value.keys())[0] 615 # check if the grabbed identifier is correct 616 if ( 617 str(checkpoint_result_identifier) 618 .lower() 619 .startswith(DQDefaults.VALIDATION_COLUMN_IDENTIFIER.value) 620 ): 621 results_dict["validation_result_identifier"] = ( 622 checkpoint_result_identifier 623 ) 624 results_dict["run_results"] = value[checkpoint_result_identifier] 625 else: 626 raise DQCheckpointsResultsException( 627 "The checkpoint result identifier format is not " 628 "in accordance to what is expected" 629 ) 630 else: 631 results_dict[key] = value 632 633 df = ExecEnv.SESSION.createDataFrame( 634 [json.dumps(results_dict)], 635 schema=StringType(), 636 ) 637 schema = schema_of_json(df.select("value").head()[0]) 638 df = df.withColumn("value", from_json("value", schema)).select("value.*") 639 640 cols_to_expand = ["run_id"] 641 df = ( 642 df.select( 643 [ 644 col(c) if c not in cols_to_expand else col(f"{c}.*") 645 for c in df.columns 646 ] 647 ) 648 .drop(*cols_to_expand) 649 .withColumn("spec_id", lit(dq_spec.spec_id)) 650 .withColumn("input_id", lit(dq_spec.input_id)) 651 ) 652 653 return ( 654 cls._explode_results(df, dq_spec) 655 if dq_spec.result_sink_explode 656 else df.withColumn( 657 "checkpoint_config", to_json(col("checkpoint_config")) 658 ).withColumn("run_results", to_json(col("run_results"))) 659 ) 660 661 @classmethod 662 def _write_to_result_sink( 663 cls, 664 dq_spec: DQSpec, 665 df: DataFrame, 666 data: OrderedDict = None, 667 ) -> None: 668 """Write dq results dataframe to a table or location. 669 670 It can be written: 671 - a raw output (having result_sink_explode set as False) 672 - an exploded output (having result_sink_explode set as True), which 673 is more prepared for analysis, with some columns exploded, flatten and 674 transformed. It can also be set result_sink_extra_columns with other 675 columns desired to have in the output table or location. 676 677 Args: 678 dq_spec: data quality specification. 679 df: dataframe with dq results to write. 680 data: list of all dfs generated on previous steps before writer. 681 """ 682 if dq_spec.result_sink_db_table or dq_spec.result_sink_location: 683 options = {"mergeSchema": "true"} if dq_spec.result_sink_explode else {} 684 685 WriterFactory.get_writer( 686 spec=OutputSpec( 687 spec_id="dq_result_sink", 688 input_id="dq_result", 689 db_table=dq_spec.result_sink_db_table, 690 location=dq_spec.result_sink_location, 691 partitions=( 692 dq_spec.result_sink_partitions 693 if dq_spec.result_sink_partitions 694 else [] 695 ), 696 write_type=WriteType.APPEND.value, 697 data_format=dq_spec.result_sink_format, 698 options=( 699 options 700 if dq_spec.result_sink_options is None 701 else {**dq_spec.result_sink_options, **options} 702 ), 703 ), 704 df=df, 705 data=data, 706 ).write()
class
DQFactory:
60class DQFactory(object): 61 """Class for the Data Quality Factory.""" 62 63 _LOGGER = LoggingHandler(__name__).get_logger() 64 _TIMESTAMP = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") 65 66 @classmethod 67 def run_dq_process(cls, dq_spec: DQSpec, data: DataFrame) -> DataFrame: 68 """Run the specified data quality process on a dataframe. 69 70 Based on the dq_specs we apply the defined expectations on top of the dataframe 71 in order to apply the necessary validations and then output the result of 72 the data quality process. 73 74 Args: 75 dq_spec: data quality specification. 76 data: input dataframe to run the dq process on. 77 78 Returns: 79 The DataFrame containing the results of the DQ process. 80 """ 81 # import custom expectations for them to be available to be used. 82 for expectation in DQDefaults.CUSTOM_EXPECTATION_LIST.value: 83 importlib.__import__( 84 "lakehouse_engine.dq_processors.custom_expectations." + expectation 85 ) 86 87 context = get_context(project_config=cls._get_data_context_config(dq_spec)) 88 context.add_datasource(**cls._get_data_source_defaults(dq_spec)) 89 90 expectation_suite_name = ( 91 dq_spec.expectation_suite_name 92 if dq_spec.expectation_suite_name 93 else f"{dq_spec.spec_id}-{dq_spec.input_id}-{dq_spec.dq_type}" 94 ) 95 context.add_or_update_expectation_suite( 96 expectation_suite_name=expectation_suite_name 97 ) 98 99 batch_request = cls._get_batch_request(dq_spec, data) 100 101 if ( 102 dq_spec.dq_type == DQType.VALIDATOR.value 103 or dq_spec.dq_type == DQType.PRISMA.value 104 ): 105 Validator.get_dq_validator( 106 context, 107 batch_request, 108 expectation_suite_name, 109 dq_spec.dq_functions, 110 dq_spec.critical_functions, 111 ) 112 113 source_pk = cls._get_unexpected_rows_pk(dq_spec) 114 results, results_df = cls._configure_and_run_checkpoint( 115 dq_spec, context, batch_request, expectation_suite_name, source_pk 116 ) 117 118 if dq_spec.dq_type == DQType.PRISMA.value: 119 120 results_df = results_df.withColumn("source_primary_key", lit(source_pk)) 121 122 processed_keys_df = data.select( 123 concat_ws( 124 ", ", *[coalesce(col(c), lit("null")) for c in source_pk] 125 ).alias("combined_pk") 126 ) 127 comb_pk_expr = ( 128 sort_array(collect_list("combined_pk")) 129 if dq_spec.sort_processed_keys 130 else collect_list("combined_pk") 131 ) 132 processed_keys_df = processed_keys_df.agg( 133 concat_ws("||", comb_pk_expr).alias("processed_keys") 134 ) 135 136 results_df = results_df.join(processed_keys_df, lit(1) == lit(1)) 137 138 cls._write_to_result_sink(dq_spec, results_df) 139 140 cls._log_or_fail(results, dq_spec) 141 142 if ( 143 dq_spec.tag_source_data 144 and dq_spec.result_sink_explode 145 and dq_spec.fail_on_error is not True 146 ): 147 data = Validator.tag_source_with_dq(source_pk, data, results_df) 148 else: 149 raise TypeError( 150 f"Type of Data Quality '{dq_spec.dq_type}' is not supported." 151 ) 152 153 return data 154 155 @classmethod 156 def build_data_docs( 157 cls, 158 store_backend: str = DQDefaults.STORE_BACKEND.value, 159 local_fs_root_dir: str = None, 160 data_docs_local_fs: str = None, 161 data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value, 162 bucket: str = None, 163 data_docs_bucket: str = None, 164 expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 165 validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value, 166 checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value, 167 ) -> None: 168 """Build Data Docs for the project. 169 170 This function does a full build of data docs based on all the great expectations 171 checkpoints in the specified location, getting all history of run/validations 172 executed and results. 173 174 Args: 175 store_backend: which store_backend to use (e.g. s3 or file_system). 176 local_fs_root_dir: path of the root directory. Note: only applicable 177 for store_backend file_system 178 data_docs_local_fs: path of the root directory. Note: only applicable 179 for store_backend file_system. 180 data_docs_prefix: prefix where to store data_docs' data. 181 bucket: the bucket name to consider for the store_backend 182 (store DQ artefacts). Note: only applicable for store_backend s3. 183 data_docs_bucket: the bucket name for data docs only. When defined, 184 it will supersede bucket parameter. 185 Note: only applicable for store_backend s3. 186 expectations_store_prefix: prefix where to store expectations' data. 187 Note: only applicable for store_backend s3. 188 validations_store_prefix: prefix where to store validations' data. 189 Note: only applicable for store_backend s3. 190 checkpoint_store_prefix: prefix where to store checkpoints' data. 191 Note: only applicable for store_backend s3. 192 """ 193 if store_backend == DQDefaults.STORE_BACKEND.value: 194 dq_spec = DQSpec( 195 spec_id="dq_validator", 196 input_id="dq", 197 dq_type=DQType.VALIDATOR.value, 198 store_backend=DQDefaults.STORE_BACKEND.value, 199 data_docs_prefix=data_docs_prefix, 200 bucket=bucket, 201 data_docs_bucket=data_docs_bucket, 202 expectations_store_prefix=expectations_store_prefix, 203 validations_store_prefix=validations_store_prefix, 204 checkpoint_store_prefix=checkpoint_store_prefix, 205 ) 206 elif store_backend == DQDefaults.FILE_SYSTEM_STORE.value: 207 dq_spec = DQSpec( 208 spec_id="dq_validator", 209 input_id="dq", 210 dq_type=DQType.VALIDATOR.value, 211 store_backend=DQDefaults.FILE_SYSTEM_STORE.value, 212 local_fs_root_dir=local_fs_root_dir, 213 data_docs_local_fs=data_docs_local_fs, 214 data_docs_prefix=data_docs_prefix, 215 ) 216 context = get_context(project_config=cls._get_data_context_config(dq_spec)) 217 cls._LOGGER.info("The data docs were rebuilt") 218 context.build_data_docs() 219 220 @classmethod 221 def _check_critical_functions_tags(cls, failed_expectations: List[Any]) -> list: 222 critical_failure = [] 223 224 for expectation in failed_expectations: 225 meta = expectation["meta"] 226 if meta and ( 227 ("notes" in meta.keys() and "Critical function" in meta["notes"]) 228 or ( 229 "content" in meta["notes"].keys() 230 and "Critical function" in meta["notes"]["content"] 231 ) 232 ): 233 critical_failure.append(expectation["expectation_type"]) 234 235 return critical_failure 236 237 @classmethod 238 def _configure_and_run_checkpoint( 239 cls, 240 dq_spec: DQSpec, 241 context: EphemeralDataContext, 242 batch_request: RuntimeBatchRequest, 243 expectation_suite_name: str, 244 source_pk: List[str], 245 ) -> Tuple[CheckpointResult, DataFrame]: 246 """Configure, run and return checkpoint results. 247 248 A checkpoint is what enables us to run the validations of the expectations' 249 suite on the batches of data. 250 251 Args: 252 dq_spec: data quality specification. 253 context: the EphemeralDataContext containing the configurations for the data 254 source and store backend. 255 batch_request: run time batch request to be able to query underlying data. 256 expectation_suite_name: name of the expectation suite. 257 source_pk: the primary key of the source data. 258 259 Returns: 260 The checkpoint results in two types: CheckpointResult and Dataframe. 261 """ 262 checkpoint_name = f"{dq_spec.spec_id}-{dq_spec.input_id}-checkpoint" 263 context.add_or_update_checkpoint( 264 name=checkpoint_name, 265 class_name=DQDefaults.DATA_CHECKPOINTS_CLASS_NAME.value, 266 config_version=DQDefaults.DATA_CHECKPOINTS_CONFIG_VERSION.value, 267 run_name_template=f"%Y%m%d-%H%M%S-{checkpoint_name}", 268 ) 269 270 result_format: Dict[str, Any] = { 271 "result_format": dq_spec.gx_result_format, 272 } 273 if source_pk: 274 result_format = { 275 **result_format, 276 "unexpected_index_column_names": source_pk, 277 } 278 279 results = context.run_checkpoint( 280 checkpoint_name=checkpoint_name, 281 validations=[ 282 { 283 "batch_request": batch_request, 284 "expectation_suite_name": expectation_suite_name, 285 } 286 ], 287 result_format=result_format, 288 ) 289 290 return results, cls._transform_checkpoint_results( 291 results.to_json_dict(), dq_spec 292 ) 293 294 @classmethod 295 def _explode_results( 296 cls, 297 df: DataFrame, 298 dq_spec: DQSpec, 299 ) -> DataFrame: 300 """Transform dq results dataframe exploding a set of columns. 301 302 Args: 303 df: dataframe with dq results to be exploded. 304 dq_spec: data quality specification. 305 """ 306 df = df.withColumn( 307 "validation_results", explode("run_results.validation_result.results") 308 ).withColumn("source", lit(dq_spec.source)) 309 310 new_columns = [ 311 "validation_results.expectation_config.kwargs.*", 312 "run_results.validation_result.statistics.*", 313 "validation_results.expectation_config.expectation_type", 314 "validation_results.success as expectation_success", 315 "validation_results.exception_info", 316 ] + dq_spec.result_sink_extra_columns 317 318 df_exploded = df.selectExpr(*df.columns, *new_columns).drop( 319 *[c.replace(".*", "").split(" as")[0] for c in new_columns] 320 ) 321 322 schema = df_exploded.schema.simpleString() 323 if "unexpected_index_list" in schema: 324 df_exploded = ( 325 df_exploded.withColumn( 326 "unexpected_index_list", 327 array(struct(lit(True).alias("run_success"))), 328 ) 329 if df.select( 330 col("validation_results.result.unexpected_index_list") 331 ).dtypes[0][1] 332 == "array<string>" 333 else df_exploded.withColumn( 334 "unexpected_index_list", 335 transform( 336 col("validation_results.result.unexpected_index_list"), 337 lambda x: x.withField("run_success", lit(False)), 338 ), 339 ) 340 ) 341 342 if "observed_value" in schema: 343 df_exploded = df_exploded.withColumn( 344 "observed_value", col("validation_results.result.observed_value") 345 ) 346 347 return ( 348 df_exploded.withColumn("run_time_year", year(to_timestamp("run_time"))) 349 .withColumn("run_time_month", month(to_timestamp("run_time"))) 350 .withColumn("run_time_day", dayofmonth(to_timestamp("run_time"))) 351 .withColumn("checkpoint_config", to_json(col("checkpoint_config"))) 352 .withColumn("run_results", to_json(col("run_results"))) 353 .withColumn( 354 "kwargs", to_json(col("validation_results.expectation_config.kwargs")) 355 ) 356 .withColumn("validation_results", to_json(col("validation_results"))) 357 ) 358 359 @classmethod 360 def _get_batch_request( 361 cls, dq_spec: DQSpec, data: DataFrame 362 ) -> RuntimeBatchRequest: 363 """Get run time batch request to be able to query underlying data. 364 365 Args: 366 dq_spec: data quality process specification. 367 data: input dataframe to run the dq process on. 368 369 Returns: 370 The RuntimeBatchRequest object configuration. 371 """ 372 return RuntimeBatchRequest( 373 datasource_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource", 374 data_connector_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector", 375 data_asset_name=( 376 dq_spec.data_asset_name 377 if dq_spec.data_asset_name 378 else f"{dq_spec.spec_id}-{dq_spec.input_id}" 379 ), 380 batch_identifiers={ 381 "spec_id": dq_spec.spec_id, 382 "input_id": dq_spec.input_id, 383 "timestamp": cls._TIMESTAMP, 384 }, 385 runtime_parameters={"batch_data": data}, 386 ) 387 388 @classmethod 389 def _get_data_context_config(cls, dq_spec: DQSpec) -> DataContextConfig: 390 """Get the configuration of the data context. 391 392 Based on the configuration it is possible to define the backend to be 393 the file system (e.g. local file system) or S3, meaning that the DQ artefacts 394 will be stored according to this configuration. 395 396 Args: 397 dq_spec: data quality process specification. 398 399 Returns: 400 The DataContextConfig object configuration. 401 """ 402 store_backend: Union[FilesystemStoreBackendDefaults, S3StoreBackendDefaults] 403 data_docs_site = None 404 405 if dq_spec.store_backend == DQDefaults.FILE_SYSTEM_STORE.value: 406 store_backend = FilesystemStoreBackendDefaults( 407 root_directory=dq_spec.local_fs_root_dir 408 ) 409 data_docs_site = cls._get_data_docs_sites( 410 "local_site", store_backend.data_docs_sites, dq_spec 411 ) 412 elif dq_spec.store_backend == DQDefaults.FILE_SYSTEM_S3_STORE.value: 413 store_backend = S3StoreBackendDefaults( 414 default_bucket_name=dq_spec.bucket, 415 validations_store_prefix=dq_spec.validations_store_prefix, 416 checkpoint_store_prefix=dq_spec.checkpoint_store_prefix, 417 expectations_store_prefix=dq_spec.expectations_store_prefix, 418 data_docs_prefix=dq_spec.data_docs_prefix, 419 data_docs_bucket_name=( 420 dq_spec.data_docs_bucket 421 if dq_spec.data_docs_bucket 422 else dq_spec.bucket 423 ), 424 ) 425 data_docs_site = cls._get_data_docs_sites( 426 "s3_site", store_backend.data_docs_sites, dq_spec 427 ) 428 429 return DataContextConfig( 430 store_backend_defaults=store_backend, 431 data_docs_sites=data_docs_site, 432 anonymous_usage_statistics=AnonymizedUsageStatisticsConfig(enabled=False), 433 ) 434 435 @classmethod 436 def _get_data_docs_sites( 437 cls, site_name: str, data_docs_site: dict, dq_spec: DQSpec 438 ) -> dict: 439 """Get the custom configuration of the data_docs_sites. 440 441 Args: 442 site_name: the name to give to the site. 443 data_docs_site: the default configuration for the data_docs_site. 444 dq_spec: data quality specification. 445 446 Returns: 447 Modified data_docs_site. 448 """ 449 data_docs_site[site_name]["show_how_to_buttons"] = False 450 451 if site_name == "local_site": 452 data_docs_site[site_name]["store_backend"][ 453 "base_directory" 454 ] = dq_spec.data_docs_prefix 455 456 if dq_spec.data_docs_local_fs: 457 # Enable to write data_docs in a separated path 458 data_docs_site[site_name]["store_backend"][ 459 "root_directory" 460 ] = dq_spec.data_docs_local_fs 461 462 return data_docs_site 463 464 @classmethod 465 def _get_data_source_defaults(cls, dq_spec: DQSpec) -> dict: 466 """Get the configuration for a datasource. 467 468 Args: 469 dq_spec: data quality specification. 470 471 Returns: 472 The python dictionary with the datasource configuration. 473 """ 474 return { 475 "name": f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource", 476 "class_name": DQDefaults.DATASOURCE_CLASS_NAME.value, 477 "execution_engine": { 478 "class_name": DQDefaults.DATASOURCE_EXECUTION_ENGINE.value, 479 "persist": False, 480 }, 481 "data_connectors": { 482 f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector": { 483 "module_name": DQDefaults.DATA_CONNECTORS_MODULE_NAME.value, 484 "class_name": DQDefaults.DATA_CONNECTORS_CLASS_NAME.value, 485 "assets": { 486 ( 487 dq_spec.data_asset_name 488 if dq_spec.data_asset_name 489 else f"{dq_spec.spec_id}-{dq_spec.input_id}" 490 ): {"batch_identifiers": DQDefaults.DQ_BATCH_IDENTIFIERS.value} 491 }, 492 } 493 }, 494 } 495 496 @classmethod 497 def _get_failed_expectations( 498 cls, results: CheckpointResult, dq_spec: DQSpec 499 ) -> List[Any]: 500 """Get the failed expectations of a Checkpoint result. 501 502 Args: 503 results: the results of the DQ process. 504 dq_spec: data quality specification. 505 506 Returns: a list of failed expectations. 507 """ 508 failed_expectations = [] 509 for validation_result in results.list_validation_results(): 510 expectations_results = validation_result["results"] 511 for result in expectations_results: 512 if not result["success"]: 513 failed_expectations.append(result["expectation_config"]) 514 if result["exception_info"]["raised_exception"]: 515 cls._LOGGER.error( 516 f"""The expectation {str(result["expectation_config"])} 517 raised the following exception: 518 {result["exception_info"]["exception_message"]}""" 519 ) 520 cls._LOGGER.error( 521 f"{len(failed_expectations)} out of {len(expectations_results)} " 522 f"Data Quality Expectation(s) have failed! Failed Expectations: " 523 f"{failed_expectations}" 524 ) 525 526 percentage_failure = 1 - ( 527 validation_result["statistics"]["success_percent"] / 100 528 ) 529 530 if ( 531 dq_spec.max_percentage_failure is not None 532 and dq_spec.max_percentage_failure < percentage_failure 533 ): 534 raise DQValidationsFailedException( 535 f"Max error threshold is being surpassed! " 536 f"Expected: {dq_spec.max_percentage_failure} " 537 f"Got: {percentage_failure}" 538 ) 539 540 return failed_expectations 541 542 @classmethod 543 def _get_unexpected_rows_pk(cls, dq_spec: DQSpec) -> Optional[List[str]]: 544 """Get primary key for using on rows failing DQ validations. 545 546 Args: 547 dq_spec: data quality specification. 548 549 Returns: the list of columns that are part of the primary key. 550 """ 551 if dq_spec.unexpected_rows_pk: 552 return dq_spec.unexpected_rows_pk 553 elif dq_spec.tbl_to_derive_pk: 554 return TableManager( 555 {"function": "get_tbl_pk", "table_or_view": dq_spec.tbl_to_derive_pk} 556 ).get_tbl_pk() 557 elif dq_spec.tag_source_data: 558 raise ValueError( 559 "You need to provide either the argument " 560 "'unexpected_rows_pk' or 'tbl_to_derive_pk'." 561 ) 562 else: 563 return None 564 565 @classmethod 566 def _log_or_fail(cls, results: CheckpointResult, dq_spec: DQSpec) -> None: 567 """Log the execution of the Data Quality process. 568 569 Args: 570 results: the results of the DQ process. 571 dq_spec: data quality specification. 572 """ 573 if results["success"]: 574 cls._LOGGER.info( 575 "The data passed all the expectations defined. Everything looks good!" 576 ) 577 else: 578 failed_expectations = cls._get_failed_expectations(results, dq_spec) 579 if dq_spec.critical_functions: 580 critical_failure = cls._check_critical_functions_tags( 581 failed_expectations 582 ) 583 584 if critical_failure: 585 raise DQValidationsFailedException( 586 f"Data Quality Validations Failed, the following critical " 587 f"expectations failed: {critical_failure}." 588 ) 589 elif dq_spec.fail_on_error: 590 raise DQValidationsFailedException("Data Quality Validations Failed!") 591 592 @classmethod 593 def _transform_checkpoint_results( 594 cls, checkpoint_results: dict, dq_spec: DQSpec 595 ) -> DataFrame: 596 """Transforms the checkpoint results and creates new entries. 597 598 All the items of the dictionary are cast to a json like format. 599 The validation_result_identifier is extracted from the run_results column 600 into a separated column. All columns are cast to json like format. 601 After that the dictionary is converted into a dataframe. 602 603 Args: 604 checkpoint_results: dict with results of the checkpoint run. 605 dq_spec: data quality specification. 606 607 Returns: 608 Transformed results dataframe. 609 """ 610 results_json_dict = loads(dumps(checkpoint_results)) 611 612 results_dict = {} 613 for key, value in results_json_dict.items(): 614 if key == "run_results": 615 checkpoint_result_identifier = list(value.keys())[0] 616 # check if the grabbed identifier is correct 617 if ( 618 str(checkpoint_result_identifier) 619 .lower() 620 .startswith(DQDefaults.VALIDATION_COLUMN_IDENTIFIER.value) 621 ): 622 results_dict["validation_result_identifier"] = ( 623 checkpoint_result_identifier 624 ) 625 results_dict["run_results"] = value[checkpoint_result_identifier] 626 else: 627 raise DQCheckpointsResultsException( 628 "The checkpoint result identifier format is not " 629 "in accordance to what is expected" 630 ) 631 else: 632 results_dict[key] = value 633 634 df = ExecEnv.SESSION.createDataFrame( 635 [json.dumps(results_dict)], 636 schema=StringType(), 637 ) 638 schema = schema_of_json(df.select("value").head()[0]) 639 df = df.withColumn("value", from_json("value", schema)).select("value.*") 640 641 cols_to_expand = ["run_id"] 642 df = ( 643 df.select( 644 [ 645 col(c) if c not in cols_to_expand else col(f"{c}.*") 646 for c in df.columns 647 ] 648 ) 649 .drop(*cols_to_expand) 650 .withColumn("spec_id", lit(dq_spec.spec_id)) 651 .withColumn("input_id", lit(dq_spec.input_id)) 652 ) 653 654 return ( 655 cls._explode_results(df, dq_spec) 656 if dq_spec.result_sink_explode 657 else df.withColumn( 658 "checkpoint_config", to_json(col("checkpoint_config")) 659 ).withColumn("run_results", to_json(col("run_results"))) 660 ) 661 662 @classmethod 663 def _write_to_result_sink( 664 cls, 665 dq_spec: DQSpec, 666 df: DataFrame, 667 data: OrderedDict = None, 668 ) -> None: 669 """Write dq results dataframe to a table or location. 670 671 It can be written: 672 - a raw output (having result_sink_explode set as False) 673 - an exploded output (having result_sink_explode set as True), which 674 is more prepared for analysis, with some columns exploded, flatten and 675 transformed. It can also be set result_sink_extra_columns with other 676 columns desired to have in the output table or location. 677 678 Args: 679 dq_spec: data quality specification. 680 df: dataframe with dq results to write. 681 data: list of all dfs generated on previous steps before writer. 682 """ 683 if dq_spec.result_sink_db_table or dq_spec.result_sink_location: 684 options = {"mergeSchema": "true"} if dq_spec.result_sink_explode else {} 685 686 WriterFactory.get_writer( 687 spec=OutputSpec( 688 spec_id="dq_result_sink", 689 input_id="dq_result", 690 db_table=dq_spec.result_sink_db_table, 691 location=dq_spec.result_sink_location, 692 partitions=( 693 dq_spec.result_sink_partitions 694 if dq_spec.result_sink_partitions 695 else [] 696 ), 697 write_type=WriteType.APPEND.value, 698 data_format=dq_spec.result_sink_format, 699 options=( 700 options 701 if dq_spec.result_sink_options is None 702 else {**dq_spec.result_sink_options, **options} 703 ), 704 ), 705 df=df, 706 data=data, 707 ).write()
Class for the Data Quality Factory.
@classmethod
def
run_dq_process( cls, dq_spec: lakehouse_engine.core.definitions.DQSpec, data: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
66 @classmethod 67 def run_dq_process(cls, dq_spec: DQSpec, data: DataFrame) -> DataFrame: 68 """Run the specified data quality process on a dataframe. 69 70 Based on the dq_specs we apply the defined expectations on top of the dataframe 71 in order to apply the necessary validations and then output the result of 72 the data quality process. 73 74 Args: 75 dq_spec: data quality specification. 76 data: input dataframe to run the dq process on. 77 78 Returns: 79 The DataFrame containing the results of the DQ process. 80 """ 81 # import custom expectations for them to be available to be used. 82 for expectation in DQDefaults.CUSTOM_EXPECTATION_LIST.value: 83 importlib.__import__( 84 "lakehouse_engine.dq_processors.custom_expectations." + expectation 85 ) 86 87 context = get_context(project_config=cls._get_data_context_config(dq_spec)) 88 context.add_datasource(**cls._get_data_source_defaults(dq_spec)) 89 90 expectation_suite_name = ( 91 dq_spec.expectation_suite_name 92 if dq_spec.expectation_suite_name 93 else f"{dq_spec.spec_id}-{dq_spec.input_id}-{dq_spec.dq_type}" 94 ) 95 context.add_or_update_expectation_suite( 96 expectation_suite_name=expectation_suite_name 97 ) 98 99 batch_request = cls._get_batch_request(dq_spec, data) 100 101 if ( 102 dq_spec.dq_type == DQType.VALIDATOR.value 103 or dq_spec.dq_type == DQType.PRISMA.value 104 ): 105 Validator.get_dq_validator( 106 context, 107 batch_request, 108 expectation_suite_name, 109 dq_spec.dq_functions, 110 dq_spec.critical_functions, 111 ) 112 113 source_pk = cls._get_unexpected_rows_pk(dq_spec) 114 results, results_df = cls._configure_and_run_checkpoint( 115 dq_spec, context, batch_request, expectation_suite_name, source_pk 116 ) 117 118 if dq_spec.dq_type == DQType.PRISMA.value: 119 120 results_df = results_df.withColumn("source_primary_key", lit(source_pk)) 121 122 processed_keys_df = data.select( 123 concat_ws( 124 ", ", *[coalesce(col(c), lit("null")) for c in source_pk] 125 ).alias("combined_pk") 126 ) 127 comb_pk_expr = ( 128 sort_array(collect_list("combined_pk")) 129 if dq_spec.sort_processed_keys 130 else collect_list("combined_pk") 131 ) 132 processed_keys_df = processed_keys_df.agg( 133 concat_ws("||", comb_pk_expr).alias("processed_keys") 134 ) 135 136 results_df = results_df.join(processed_keys_df, lit(1) == lit(1)) 137 138 cls._write_to_result_sink(dq_spec, results_df) 139 140 cls._log_or_fail(results, dq_spec) 141 142 if ( 143 dq_spec.tag_source_data 144 and dq_spec.result_sink_explode 145 and dq_spec.fail_on_error is not True 146 ): 147 data = Validator.tag_source_with_dq(source_pk, data, results_df) 148 else: 149 raise TypeError( 150 f"Type of Data Quality '{dq_spec.dq_type}' is not supported." 151 ) 152 153 return data
Run the specified data quality process on a dataframe.
Based on the dq_specs we apply the defined expectations on top of the dataframe in order to apply the necessary validations and then output the result of the data quality process.
Arguments:
- dq_spec: data quality specification.
- data: input dataframe to run the dq process on.
Returns:
The DataFrame containing the results of the DQ process.
@classmethod
def
build_data_docs( cls, store_backend: str = 's3', local_fs_root_dir: str = None, data_docs_local_fs: str = None, data_docs_prefix: str = 'dq/data_docs/site/', bucket: str = None, data_docs_bucket: str = None, expectations_store_prefix: str = 'dq/expectations/', validations_store_prefix: str = 'dq/validations/', checkpoint_store_prefix: str = 'dq/checkpoints/') -> None:
155 @classmethod 156 def build_data_docs( 157 cls, 158 store_backend: str = DQDefaults.STORE_BACKEND.value, 159 local_fs_root_dir: str = None, 160 data_docs_local_fs: str = None, 161 data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value, 162 bucket: str = None, 163 data_docs_bucket: str = None, 164 expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 165 validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value, 166 checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value, 167 ) -> None: 168 """Build Data Docs for the project. 169 170 This function does a full build of data docs based on all the great expectations 171 checkpoints in the specified location, getting all history of run/validations 172 executed and results. 173 174 Args: 175 store_backend: which store_backend to use (e.g. s3 or file_system). 176 local_fs_root_dir: path of the root directory. Note: only applicable 177 for store_backend file_system 178 data_docs_local_fs: path of the root directory. Note: only applicable 179 for store_backend file_system. 180 data_docs_prefix: prefix where to store data_docs' data. 181 bucket: the bucket name to consider for the store_backend 182 (store DQ artefacts). Note: only applicable for store_backend s3. 183 data_docs_bucket: the bucket name for data docs only. When defined, 184 it will supersede bucket parameter. 185 Note: only applicable for store_backend s3. 186 expectations_store_prefix: prefix where to store expectations' data. 187 Note: only applicable for store_backend s3. 188 validations_store_prefix: prefix where to store validations' data. 189 Note: only applicable for store_backend s3. 190 checkpoint_store_prefix: prefix where to store checkpoints' data. 191 Note: only applicable for store_backend s3. 192 """ 193 if store_backend == DQDefaults.STORE_BACKEND.value: 194 dq_spec = DQSpec( 195 spec_id="dq_validator", 196 input_id="dq", 197 dq_type=DQType.VALIDATOR.value, 198 store_backend=DQDefaults.STORE_BACKEND.value, 199 data_docs_prefix=data_docs_prefix, 200 bucket=bucket, 201 data_docs_bucket=data_docs_bucket, 202 expectations_store_prefix=expectations_store_prefix, 203 validations_store_prefix=validations_store_prefix, 204 checkpoint_store_prefix=checkpoint_store_prefix, 205 ) 206 elif store_backend == DQDefaults.FILE_SYSTEM_STORE.value: 207 dq_spec = DQSpec( 208 spec_id="dq_validator", 209 input_id="dq", 210 dq_type=DQType.VALIDATOR.value, 211 store_backend=DQDefaults.FILE_SYSTEM_STORE.value, 212 local_fs_root_dir=local_fs_root_dir, 213 data_docs_local_fs=data_docs_local_fs, 214 data_docs_prefix=data_docs_prefix, 215 ) 216 context = get_context(project_config=cls._get_data_context_config(dq_spec)) 217 cls._LOGGER.info("The data docs were rebuilt") 218 context.build_data_docs()
Build Data Docs for the project.
This function does a full build of data docs based on all the great expectations checkpoints in the specified location, getting all history of run/validations executed and results.
Arguments:
- store_backend: which store_backend to use (e.g. s3 or file_system).
- local_fs_root_dir: path of the root directory. Note: only applicable for store_backend file_system
- data_docs_local_fs: path of the root directory. Note: only applicable for store_backend file_system.
- data_docs_prefix: prefix where to store data_docs' data.
- bucket: the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3.
- data_docs_bucket: the bucket name for data docs only. When defined, it will supersede bucket parameter. Note: only applicable for store_backend s3.
- expectations_store_prefix: prefix where to store expectations' data. Note: only applicable for store_backend s3.
- validations_store_prefix: prefix where to store validations' data. Note: only applicable for store_backend s3.
- checkpoint_store_prefix: prefix where to store checkpoints' data. Note: only applicable for store_backend s3.