lakehouse_engine.dq_processors.dq_factory

Module containing the class definition of the Data Quality Factory.

  1"""Module containing the class definition of the Data Quality Factory."""
  2
  3import importlib.util
  4import json
  5from datetime import datetime, timezone
  6from json import dumps, loads
  7from typing import Any, Dict, List, Optional, OrderedDict, Tuple, Union
  8
  9from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult
 10from great_expectations.core.batch import RuntimeBatchRequest
 11from great_expectations.data_context import EphemeralDataContext
 12from great_expectations.data_context.data_context.context_factory import get_context
 13from great_expectations.data_context.types.base import (
 14    AnonymizedUsageStatisticsConfig,
 15    DataContextConfig,
 16    FilesystemStoreBackendDefaults,
 17    S3StoreBackendDefaults,
 18)
 19from pyspark.sql import DataFrame
 20from pyspark.sql.functions import (
 21    array,
 22    coalesce,
 23    col,
 24    collect_list,
 25    concat_ws,
 26    dayofmonth,
 27    explode,
 28    from_json,
 29    lit,
 30    month,
 31    schema_of_json,
 32    sort_array,
 33    struct,
 34    to_json,
 35    to_timestamp,
 36    transform,
 37    year,
 38)
 39from pyspark.sql.types import StringType
 40
 41from lakehouse_engine.core.definitions import (
 42    DQDefaults,
 43    DQSpec,
 44    DQType,
 45    OutputSpec,
 46    WriteType,
 47)
 48from lakehouse_engine.core.exec_env import ExecEnv
 49from lakehouse_engine.core.table_manager import TableManager
 50from lakehouse_engine.dq_processors.exceptions import (
 51    DQCheckpointsResultsException,
 52    DQValidationsFailedException,
 53)
 54from lakehouse_engine.dq_processors.validator import Validator
 55from lakehouse_engine.io.writer_factory import WriterFactory
 56from lakehouse_engine.utils.logging_handler import LoggingHandler
 57
 58
 59class DQFactory(object):
 60    """Class for the Data Quality Factory."""
 61
 62    _LOGGER = LoggingHandler(__name__).get_logger()
 63    _TIMESTAMP = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
 64
 65    @classmethod
 66    def run_dq_process(cls, dq_spec: DQSpec, data: DataFrame) -> DataFrame:
 67        """Run the specified data quality process on a dataframe.
 68
 69        Based on the dq_specs we apply the defined expectations on top of the dataframe
 70        in order to apply the necessary validations and then output the result of
 71        the data quality process.
 72
 73        Args:
 74            dq_spec: data quality specification.
 75            data: input dataframe to run the dq process on.
 76
 77        Returns:
 78            The DataFrame containing the results of the DQ process.
 79        """
 80        # import custom expectations for them to be available to be used.
 81        for expectation in DQDefaults.CUSTOM_EXPECTATION_LIST.value:
 82            importlib.__import__(
 83                "lakehouse_engine.dq_processors.custom_expectations." + expectation
 84            )
 85
 86        context = get_context(project_config=cls._get_data_context_config(dq_spec))
 87        context.add_datasource(**cls._get_data_source_defaults(dq_spec))
 88
 89        expectation_suite_name = (
 90            dq_spec.expectation_suite_name
 91            if dq_spec.expectation_suite_name
 92            else f"{dq_spec.spec_id}-{dq_spec.input_id}-{dq_spec.dq_type}"
 93        )
 94        context.add_or_update_expectation_suite(
 95            expectation_suite_name=expectation_suite_name
 96        )
 97
 98        batch_request = cls._get_batch_request(dq_spec, data)
 99
100        if (
101            dq_spec.dq_type == DQType.VALIDATOR.value
102            or dq_spec.dq_type == DQType.PRISMA.value
103        ):
104            Validator.get_dq_validator(
105                context,
106                batch_request,
107                expectation_suite_name,
108                dq_spec.dq_functions,
109                dq_spec.critical_functions,
110            )
111
112            source_pk = cls._get_unexpected_rows_pk(dq_spec)
113            results, results_df = cls._configure_and_run_checkpoint(
114                dq_spec, context, batch_request, expectation_suite_name, source_pk
115            )
116
117            if dq_spec.dq_type == DQType.PRISMA.value:
118
119                results_df = results_df.withColumn("source_primary_key", lit(source_pk))
120
121                processed_keys_df = data.select(
122                    concat_ws(
123                        ", ", *[coalesce(col(c), lit("null")) for c in source_pk]
124                    ).alias("combined_pk")
125                )
126                comb_pk_expr = (
127                    sort_array(collect_list("combined_pk"))
128                    if dq_spec.sort_processed_keys
129                    else collect_list("combined_pk")
130                )
131                processed_keys_df = processed_keys_df.agg(
132                    concat_ws("||", comb_pk_expr).alias("processed_keys")
133                )
134
135                results_df = results_df.join(processed_keys_df, lit(1) == lit(1))
136
137            cls._write_to_result_sink(dq_spec, results_df)
138
139            cls._log_or_fail(results, dq_spec)
140
141            if (
142                dq_spec.tag_source_data
143                and dq_spec.result_sink_explode
144                and dq_spec.fail_on_error is not True
145            ):
146                data = Validator.tag_source_with_dq(source_pk, data, results_df)
147        else:
148            raise TypeError(
149                f"Type of Data Quality '{dq_spec.dq_type}' is not supported."
150            )
151
152        return data
153
154    @classmethod
155    def build_data_docs(
156        cls,
157        store_backend: str = DQDefaults.STORE_BACKEND.value,
158        local_fs_root_dir: str = None,
159        data_docs_local_fs: str = None,
160        data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value,
161        bucket: str = None,
162        data_docs_bucket: str = None,
163        expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
164        validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value,
165        checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value,
166    ) -> None:
167        """Build Data Docs for the project.
168
169        This function does a full build of data docs based on all the great expectations
170        checkpoints in the specified location, getting all history of run/validations
171        executed and results.
172
173        Args:
174            store_backend: which store_backend to use (e.g. s3 or file_system).
175            local_fs_root_dir: path of the root directory. Note: only applicable
176                for store_backend file_system
177            data_docs_local_fs: path of the root directory. Note: only applicable
178                for store_backend file_system.
179            data_docs_prefix: prefix where to store data_docs' data.
180            bucket: the bucket name to consider for the store_backend
181                (store DQ artefacts). Note: only applicable for store_backend s3.
182            data_docs_bucket: the bucket name for data docs only. When defined,
183                it will supersede bucket parameter.
184                Note: only applicable for store_backend s3.
185            expectations_store_prefix: prefix where to store expectations' data.
186                Note: only applicable for store_backend s3.
187            validations_store_prefix: prefix where to store validations' data.
188                Note: only applicable for store_backend s3.
189            checkpoint_store_prefix: prefix where to store checkpoints' data.
190                Note: only applicable for store_backend s3.
191        """
192        if store_backend == DQDefaults.STORE_BACKEND.value:
193            dq_spec = DQSpec(
194                spec_id="dq_validator",
195                input_id="dq",
196                dq_type=DQType.VALIDATOR.value,
197                store_backend=DQDefaults.STORE_BACKEND.value,
198                data_docs_prefix=data_docs_prefix,
199                bucket=bucket,
200                data_docs_bucket=data_docs_bucket,
201                expectations_store_prefix=expectations_store_prefix,
202                validations_store_prefix=validations_store_prefix,
203                checkpoint_store_prefix=checkpoint_store_prefix,
204            )
205        elif store_backend == DQDefaults.FILE_SYSTEM_STORE.value:
206            dq_spec = DQSpec(
207                spec_id="dq_validator",
208                input_id="dq",
209                dq_type=DQType.VALIDATOR.value,
210                store_backend=DQDefaults.FILE_SYSTEM_STORE.value,
211                local_fs_root_dir=local_fs_root_dir,
212                data_docs_local_fs=data_docs_local_fs,
213                data_docs_prefix=data_docs_prefix,
214            )
215        context = get_context(project_config=cls._get_data_context_config(dq_spec))
216        cls._LOGGER.info("The data docs were rebuilt")
217        context.build_data_docs()
218
219    @classmethod
220    def _check_critical_functions_tags(cls, failed_expectations: List[Any]) -> list:
221        critical_failure = []
222
223        for expectation in failed_expectations:
224            meta = expectation["meta"]
225            if meta and (
226                ("notes" in meta.keys() and "Critical function" in meta["notes"])
227                or (
228                    "content" in meta["notes"].keys()
229                    and "Critical function" in meta["notes"]["content"]
230                )
231            ):
232                critical_failure.append(expectation["expectation_type"])
233
234        return critical_failure
235
236    @classmethod
237    def _configure_and_run_checkpoint(
238        cls,
239        dq_spec: DQSpec,
240        context: EphemeralDataContext,
241        batch_request: RuntimeBatchRequest,
242        expectation_suite_name: str,
243        source_pk: List[str],
244    ) -> Tuple[CheckpointResult, DataFrame]:
245        """Configure, run and return checkpoint results.
246
247        A checkpoint is what enables us to run the validations of the expectations'
248        suite on the batches of data.
249
250        Args:
251            dq_spec: data quality specification.
252            context: the EphemeralDataContext containing the configurations for the data
253                source and store backend.
254            batch_request: run time batch request to be able to query underlying data.
255            expectation_suite_name: name of the expectation suite.
256            source_pk: the primary key of the source data.
257
258        Returns:
259            The checkpoint results in two types: CheckpointResult and Dataframe.
260        """
261        checkpoint_name = f"{dq_spec.spec_id}-{dq_spec.input_id}-checkpoint"
262        context.add_or_update_checkpoint(
263            name=checkpoint_name,
264            class_name=DQDefaults.DATA_CHECKPOINTS_CLASS_NAME.value,
265            config_version=DQDefaults.DATA_CHECKPOINTS_CONFIG_VERSION.value,
266            run_name_template=f"%Y%m%d-%H%M%S-{checkpoint_name}",
267        )
268
269        result_format: Dict[str, Any] = {
270            "result_format": dq_spec.gx_result_format,
271        }
272        if source_pk:
273            result_format = {
274                **result_format,
275                "unexpected_index_column_names": source_pk,
276            }
277
278        results = context.run_checkpoint(
279            checkpoint_name=checkpoint_name,
280            validations=[
281                {
282                    "batch_request": batch_request,
283                    "expectation_suite_name": expectation_suite_name,
284                }
285            ],
286            result_format=result_format,
287        )
288
289        return results, cls._transform_checkpoint_results(
290            results.to_json_dict(), dq_spec
291        )
292
293    @classmethod
294    def _explode_results(
295        cls,
296        df: DataFrame,
297        dq_spec: DQSpec,
298    ) -> DataFrame:
299        """Transform dq results dataframe exploding a set of columns.
300
301        Args:
302            df: dataframe with dq results to be exploded.
303            dq_spec: data quality specification.
304        """
305        df = df.withColumn(
306            "validation_results", explode("run_results.validation_result.results")
307        ).withColumn("source", lit(dq_spec.source))
308
309        new_columns = [
310            "validation_results.expectation_config.kwargs.*",
311            "run_results.validation_result.statistics.*",
312            "validation_results.expectation_config.expectation_type",
313            "validation_results.success as expectation_success",
314            "validation_results.exception_info",
315        ] + dq_spec.result_sink_extra_columns
316
317        df_exploded = df.selectExpr(*df.columns, *new_columns).drop(
318            *[c.replace(".*", "").split(" as")[0] for c in new_columns]
319        )
320
321        schema = df_exploded.schema.simpleString()
322        if "unexpected_index_list" in schema:
323            df_exploded = (
324                df_exploded.withColumn(
325                    "unexpected_index_list",
326                    array(struct(lit(True).alias("run_success"))),
327                )
328                if df.select(
329                    col("validation_results.result.unexpected_index_list")
330                ).dtypes[0][1]
331                == "array<string>"
332                else df_exploded.withColumn(
333                    "unexpected_index_list",
334                    transform(
335                        col("validation_results.result.unexpected_index_list"),
336                        lambda x: x.withField("run_success", lit(False)),
337                    ),
338                )
339            )
340
341        if "observed_value" in schema:
342            df_exploded = df_exploded.withColumn(
343                "observed_value", col("validation_results.result.observed_value")
344            )
345
346        return (
347            df_exploded.withColumn("run_time_year", year(to_timestamp("run_time")))
348            .withColumn("run_time_month", month(to_timestamp("run_time")))
349            .withColumn("run_time_day", dayofmonth(to_timestamp("run_time")))
350            .withColumn("checkpoint_config", to_json(col("checkpoint_config")))
351            .withColumn("run_results", to_json(col("run_results")))
352            .withColumn(
353                "kwargs", to_json(col("validation_results.expectation_config.kwargs"))
354            )
355            .withColumn("validation_results", to_json(col("validation_results")))
356        )
357
358    @classmethod
359    def _get_batch_request(
360        cls, dq_spec: DQSpec, data: DataFrame
361    ) -> RuntimeBatchRequest:
362        """Get run time batch request to be able to query underlying data.
363
364        Args:
365            dq_spec: data quality process specification.
366            data: input dataframe to run the dq process on.
367
368        Returns:
369            The RuntimeBatchRequest object configuration.
370        """
371        return RuntimeBatchRequest(
372            datasource_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource",
373            data_connector_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector",
374            data_asset_name=(
375                dq_spec.data_asset_name
376                if dq_spec.data_asset_name
377                else f"{dq_spec.spec_id}-{dq_spec.input_id}"
378            ),
379            batch_identifiers={
380                "spec_id": dq_spec.spec_id,
381                "input_id": dq_spec.input_id,
382                "timestamp": cls._TIMESTAMP,
383            },
384            runtime_parameters={"batch_data": data},
385        )
386
387    @classmethod
388    def _get_data_context_config(cls, dq_spec: DQSpec) -> DataContextConfig:
389        """Get the configuration of the data context.
390
391        Based on the configuration it is possible to define the backend to be
392        the file system (e.g. local file system) or S3, meaning that the DQ artefacts
393        will be stored according to this configuration.
394
395        Args:
396            dq_spec: data quality process specification.
397
398        Returns:
399            The DataContextConfig object configuration.
400        """
401        store_backend: Union[FilesystemStoreBackendDefaults, S3StoreBackendDefaults]
402        data_docs_site = None
403
404        if dq_spec.store_backend == DQDefaults.FILE_SYSTEM_STORE.value:
405            store_backend = FilesystemStoreBackendDefaults(
406                root_directory=dq_spec.local_fs_root_dir
407            )
408            data_docs_site = cls._get_data_docs_sites(
409                "local_site", store_backend.data_docs_sites, dq_spec
410            )
411        elif dq_spec.store_backend == DQDefaults.FILE_SYSTEM_S3_STORE.value:
412            store_backend = S3StoreBackendDefaults(
413                default_bucket_name=dq_spec.bucket,
414                validations_store_prefix=dq_spec.validations_store_prefix,
415                checkpoint_store_prefix=dq_spec.checkpoint_store_prefix,
416                expectations_store_prefix=dq_spec.expectations_store_prefix,
417                data_docs_prefix=dq_spec.data_docs_prefix,
418                data_docs_bucket_name=(
419                    dq_spec.data_docs_bucket
420                    if dq_spec.data_docs_bucket
421                    else dq_spec.bucket
422                ),
423            )
424            data_docs_site = cls._get_data_docs_sites(
425                "s3_site", store_backend.data_docs_sites, dq_spec
426            )
427
428        return DataContextConfig(
429            store_backend_defaults=store_backend,
430            data_docs_sites=data_docs_site,
431            anonymous_usage_statistics=AnonymizedUsageStatisticsConfig(enabled=False),
432        )
433
434    @classmethod
435    def _get_data_docs_sites(
436        cls, site_name: str, data_docs_site: dict, dq_spec: DQSpec
437    ) -> dict:
438        """Get the custom configuration of the data_docs_sites.
439
440        Args:
441            site_name: the name to give to the site.
442            data_docs_site: the default configuration for the data_docs_site.
443            dq_spec: data quality specification.
444
445        Returns:
446            Modified data_docs_site.
447        """
448        data_docs_site[site_name]["show_how_to_buttons"] = False
449
450        if site_name == "local_site":
451            data_docs_site[site_name]["store_backend"][
452                "base_directory"
453            ] = dq_spec.data_docs_prefix
454
455            if dq_spec.data_docs_local_fs:
456                # Enable to write data_docs in a separated path
457                data_docs_site[site_name]["store_backend"][
458                    "root_directory"
459                ] = dq_spec.data_docs_local_fs
460
461        return data_docs_site
462
463    @classmethod
464    def _get_data_source_defaults(cls, dq_spec: DQSpec) -> dict:
465        """Get the configuration for a datasource.
466
467        Args:
468            dq_spec: data quality specification.
469
470        Returns:
471            The python dictionary with the datasource configuration.
472        """
473        return {
474            "name": f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource",
475            "class_name": DQDefaults.DATASOURCE_CLASS_NAME.value,
476            "execution_engine": {
477                "class_name": DQDefaults.DATASOURCE_EXECUTION_ENGINE.value,
478                "persist": False,
479            },
480            "data_connectors": {
481                f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector": {
482                    "module_name": DQDefaults.DATA_CONNECTORS_MODULE_NAME.value,
483                    "class_name": DQDefaults.DATA_CONNECTORS_CLASS_NAME.value,
484                    "assets": {
485                        (
486                            dq_spec.data_asset_name
487                            if dq_spec.data_asset_name
488                            else f"{dq_spec.spec_id}-{dq_spec.input_id}"
489                        ): {"batch_identifiers": DQDefaults.DQ_BATCH_IDENTIFIERS.value}
490                    },
491                }
492            },
493        }
494
495    @classmethod
496    def _get_failed_expectations(
497        cls, results: CheckpointResult, dq_spec: DQSpec
498    ) -> List[Any]:
499        """Get the failed expectations of a Checkpoint result.
500
501        Args:
502            results: the results of the DQ process.
503            dq_spec: data quality specification.
504
505        Returns: a list of failed expectations.
506        """
507        failed_expectations = []
508        for validation_result in results.list_validation_results():
509            expectations_results = validation_result["results"]
510            for result in expectations_results:
511                if not result["success"]:
512                    failed_expectations.append(result["expectation_config"])
513                    if result["exception_info"]["raised_exception"]:
514                        cls._LOGGER.error(
515                            f"""The expectation {str(result["expectation_config"])}
516                            raised the following exception:
517                            {result["exception_info"]["exception_message"]}"""
518                        )
519            cls._LOGGER.error(
520                f"{len(failed_expectations)} out of {len(expectations_results)} "
521                f"Data Quality Expectation(s) have failed! Failed Expectations: "
522                f"{failed_expectations}"
523            )
524
525            percentage_failure = 1 - (
526                validation_result["statistics"]["success_percent"] / 100
527            )
528
529            if (
530                dq_spec.max_percentage_failure is not None
531                and dq_spec.max_percentage_failure < percentage_failure
532            ):
533                raise DQValidationsFailedException(
534                    f"Max error threshold is being surpassed! "
535                    f"Expected: {dq_spec.max_percentage_failure} "
536                    f"Got: {percentage_failure}"
537                )
538
539        return failed_expectations
540
541    @classmethod
542    def _get_unexpected_rows_pk(cls, dq_spec: DQSpec) -> Optional[List[str]]:
543        """Get primary key for using on rows failing DQ validations.
544
545        Args:
546            dq_spec: data quality specification.
547
548        Returns: the list of columns that are part of the primary key.
549        """
550        if dq_spec.unexpected_rows_pk:
551            return dq_spec.unexpected_rows_pk
552        elif dq_spec.tbl_to_derive_pk:
553            return TableManager(
554                {"function": "get_tbl_pk", "table_or_view": dq_spec.tbl_to_derive_pk}
555            ).get_tbl_pk()
556        elif dq_spec.tag_source_data:
557            raise ValueError(
558                "You need to provide either the argument "
559                "'unexpected_rows_pk' or 'tbl_to_derive_pk'."
560            )
561        else:
562            return None
563
564    @classmethod
565    def _log_or_fail(cls, results: CheckpointResult, dq_spec: DQSpec) -> None:
566        """Log the execution of the Data Quality process.
567
568        Args:
569            results: the results of the DQ process.
570            dq_spec: data quality specification.
571        """
572        if results["success"]:
573            cls._LOGGER.info(
574                "The data passed all the expectations defined. Everything looks good!"
575            )
576        else:
577            failed_expectations = cls._get_failed_expectations(results, dq_spec)
578            if dq_spec.critical_functions:
579                critical_failure = cls._check_critical_functions_tags(
580                    failed_expectations
581                )
582
583                if critical_failure:
584                    raise DQValidationsFailedException(
585                        f"Data Quality Validations Failed, the following critical "
586                        f"expectations failed: {critical_failure}."
587                    )
588            elif dq_spec.fail_on_error:
589                raise DQValidationsFailedException("Data Quality Validations Failed!")
590
591    @classmethod
592    def _transform_checkpoint_results(
593        cls, checkpoint_results: dict, dq_spec: DQSpec
594    ) -> DataFrame:
595        """Transforms the checkpoint results and creates new entries.
596
597        All the items of the dictionary are cast to a json like format.
598        The validation_result_identifier is extracted from the run_results column
599        into a separated column. All columns are cast to json like format.
600        After that the dictionary is converted into a dataframe.
601
602        Args:
603            checkpoint_results: dict with results of the checkpoint run.
604            dq_spec: data quality specification.
605
606        Returns:
607            Transformed results dataframe.
608        """
609        results_json_dict = loads(dumps(checkpoint_results))
610
611        results_dict = {}
612        for key, value in results_json_dict.items():
613            if key == "run_results":
614                checkpoint_result_identifier = list(value.keys())[0]
615                # check if the grabbed identifier is correct
616                if (
617                    str(checkpoint_result_identifier)
618                    .lower()
619                    .startswith(DQDefaults.VALIDATION_COLUMN_IDENTIFIER.value)
620                ):
621                    results_dict["validation_result_identifier"] = (
622                        checkpoint_result_identifier
623                    )
624                    results_dict["run_results"] = value[checkpoint_result_identifier]
625                else:
626                    raise DQCheckpointsResultsException(
627                        "The checkpoint result identifier format is not "
628                        "in accordance to what is expected"
629                    )
630            else:
631                results_dict[key] = value
632
633        df = ExecEnv.SESSION.createDataFrame(
634            [json.dumps(results_dict)],
635            schema=StringType(),
636        )
637        schema = schema_of_json(df.select("value").head()[0])
638        df = df.withColumn("value", from_json("value", schema)).select("value.*")
639
640        cols_to_expand = ["run_id"]
641        df = (
642            df.select(
643                [
644                    col(c) if c not in cols_to_expand else col(f"{c}.*")
645                    for c in df.columns
646                ]
647            )
648            .drop(*cols_to_expand)
649            .withColumn("spec_id", lit(dq_spec.spec_id))
650            .withColumn("input_id", lit(dq_spec.input_id))
651        )
652
653        return (
654            cls._explode_results(df, dq_spec)
655            if dq_spec.result_sink_explode
656            else df.withColumn(
657                "checkpoint_config", to_json(col("checkpoint_config"))
658            ).withColumn("run_results", to_json(col("run_results")))
659        )
660
661    @classmethod
662    def _write_to_result_sink(
663        cls,
664        dq_spec: DQSpec,
665        df: DataFrame,
666        data: OrderedDict = None,
667    ) -> None:
668        """Write dq results dataframe to a table or location.
669
670        It can be written:
671        - a raw output (having result_sink_explode set as False)
672        - an exploded output (having result_sink_explode set as True), which
673        is more prepared for analysis, with some columns exploded, flatten and
674        transformed. It can also be set result_sink_extra_columns with other
675        columns desired to have in the output table or location.
676
677        Args:
678            dq_spec: data quality specification.
679            df: dataframe with dq results to write.
680            data: list of all dfs generated on previous steps before writer.
681        """
682        if dq_spec.result_sink_db_table or dq_spec.result_sink_location:
683            options = {"mergeSchema": "true"} if dq_spec.result_sink_explode else {}
684
685            WriterFactory.get_writer(
686                spec=OutputSpec(
687                    spec_id="dq_result_sink",
688                    input_id="dq_result",
689                    db_table=dq_spec.result_sink_db_table,
690                    location=dq_spec.result_sink_location,
691                    partitions=(
692                        dq_spec.result_sink_partitions
693                        if dq_spec.result_sink_partitions
694                        else []
695                    ),
696                    write_type=WriteType.APPEND.value,
697                    data_format=dq_spec.result_sink_format,
698                    options=(
699                        options
700                        if dq_spec.result_sink_options is None
701                        else {**dq_spec.result_sink_options, **options}
702                    ),
703                ),
704                df=df,
705                data=data,
706            ).write()
class DQFactory:
 60class DQFactory(object):
 61    """Class for the Data Quality Factory."""
 62
 63    _LOGGER = LoggingHandler(__name__).get_logger()
 64    _TIMESTAMP = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
 65
 66    @classmethod
 67    def run_dq_process(cls, dq_spec: DQSpec, data: DataFrame) -> DataFrame:
 68        """Run the specified data quality process on a dataframe.
 69
 70        Based on the dq_specs we apply the defined expectations on top of the dataframe
 71        in order to apply the necessary validations and then output the result of
 72        the data quality process.
 73
 74        Args:
 75            dq_spec: data quality specification.
 76            data: input dataframe to run the dq process on.
 77
 78        Returns:
 79            The DataFrame containing the results of the DQ process.
 80        """
 81        # import custom expectations for them to be available to be used.
 82        for expectation in DQDefaults.CUSTOM_EXPECTATION_LIST.value:
 83            importlib.__import__(
 84                "lakehouse_engine.dq_processors.custom_expectations." + expectation
 85            )
 86
 87        context = get_context(project_config=cls._get_data_context_config(dq_spec))
 88        context.add_datasource(**cls._get_data_source_defaults(dq_spec))
 89
 90        expectation_suite_name = (
 91            dq_spec.expectation_suite_name
 92            if dq_spec.expectation_suite_name
 93            else f"{dq_spec.spec_id}-{dq_spec.input_id}-{dq_spec.dq_type}"
 94        )
 95        context.add_or_update_expectation_suite(
 96            expectation_suite_name=expectation_suite_name
 97        )
 98
 99        batch_request = cls._get_batch_request(dq_spec, data)
100
101        if (
102            dq_spec.dq_type == DQType.VALIDATOR.value
103            or dq_spec.dq_type == DQType.PRISMA.value
104        ):
105            Validator.get_dq_validator(
106                context,
107                batch_request,
108                expectation_suite_name,
109                dq_spec.dq_functions,
110                dq_spec.critical_functions,
111            )
112
113            source_pk = cls._get_unexpected_rows_pk(dq_spec)
114            results, results_df = cls._configure_and_run_checkpoint(
115                dq_spec, context, batch_request, expectation_suite_name, source_pk
116            )
117
118            if dq_spec.dq_type == DQType.PRISMA.value:
119
120                results_df = results_df.withColumn("source_primary_key", lit(source_pk))
121
122                processed_keys_df = data.select(
123                    concat_ws(
124                        ", ", *[coalesce(col(c), lit("null")) for c in source_pk]
125                    ).alias("combined_pk")
126                )
127                comb_pk_expr = (
128                    sort_array(collect_list("combined_pk"))
129                    if dq_spec.sort_processed_keys
130                    else collect_list("combined_pk")
131                )
132                processed_keys_df = processed_keys_df.agg(
133                    concat_ws("||", comb_pk_expr).alias("processed_keys")
134                )
135
136                results_df = results_df.join(processed_keys_df, lit(1) == lit(1))
137
138            cls._write_to_result_sink(dq_spec, results_df)
139
140            cls._log_or_fail(results, dq_spec)
141
142            if (
143                dq_spec.tag_source_data
144                and dq_spec.result_sink_explode
145                and dq_spec.fail_on_error is not True
146            ):
147                data = Validator.tag_source_with_dq(source_pk, data, results_df)
148        else:
149            raise TypeError(
150                f"Type of Data Quality '{dq_spec.dq_type}' is not supported."
151            )
152
153        return data
154
155    @classmethod
156    def build_data_docs(
157        cls,
158        store_backend: str = DQDefaults.STORE_BACKEND.value,
159        local_fs_root_dir: str = None,
160        data_docs_local_fs: str = None,
161        data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value,
162        bucket: str = None,
163        data_docs_bucket: str = None,
164        expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
165        validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value,
166        checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value,
167    ) -> None:
168        """Build Data Docs for the project.
169
170        This function does a full build of data docs based on all the great expectations
171        checkpoints in the specified location, getting all history of run/validations
172        executed and results.
173
174        Args:
175            store_backend: which store_backend to use (e.g. s3 or file_system).
176            local_fs_root_dir: path of the root directory. Note: only applicable
177                for store_backend file_system
178            data_docs_local_fs: path of the root directory. Note: only applicable
179                for store_backend file_system.
180            data_docs_prefix: prefix where to store data_docs' data.
181            bucket: the bucket name to consider for the store_backend
182                (store DQ artefacts). Note: only applicable for store_backend s3.
183            data_docs_bucket: the bucket name for data docs only. When defined,
184                it will supersede bucket parameter.
185                Note: only applicable for store_backend s3.
186            expectations_store_prefix: prefix where to store expectations' data.
187                Note: only applicable for store_backend s3.
188            validations_store_prefix: prefix where to store validations' data.
189                Note: only applicable for store_backend s3.
190            checkpoint_store_prefix: prefix where to store checkpoints' data.
191                Note: only applicable for store_backend s3.
192        """
193        if store_backend == DQDefaults.STORE_BACKEND.value:
194            dq_spec = DQSpec(
195                spec_id="dq_validator",
196                input_id="dq",
197                dq_type=DQType.VALIDATOR.value,
198                store_backend=DQDefaults.STORE_BACKEND.value,
199                data_docs_prefix=data_docs_prefix,
200                bucket=bucket,
201                data_docs_bucket=data_docs_bucket,
202                expectations_store_prefix=expectations_store_prefix,
203                validations_store_prefix=validations_store_prefix,
204                checkpoint_store_prefix=checkpoint_store_prefix,
205            )
206        elif store_backend == DQDefaults.FILE_SYSTEM_STORE.value:
207            dq_spec = DQSpec(
208                spec_id="dq_validator",
209                input_id="dq",
210                dq_type=DQType.VALIDATOR.value,
211                store_backend=DQDefaults.FILE_SYSTEM_STORE.value,
212                local_fs_root_dir=local_fs_root_dir,
213                data_docs_local_fs=data_docs_local_fs,
214                data_docs_prefix=data_docs_prefix,
215            )
216        context = get_context(project_config=cls._get_data_context_config(dq_spec))
217        cls._LOGGER.info("The data docs were rebuilt")
218        context.build_data_docs()
219
220    @classmethod
221    def _check_critical_functions_tags(cls, failed_expectations: List[Any]) -> list:
222        critical_failure = []
223
224        for expectation in failed_expectations:
225            meta = expectation["meta"]
226            if meta and (
227                ("notes" in meta.keys() and "Critical function" in meta["notes"])
228                or (
229                    "content" in meta["notes"].keys()
230                    and "Critical function" in meta["notes"]["content"]
231                )
232            ):
233                critical_failure.append(expectation["expectation_type"])
234
235        return critical_failure
236
237    @classmethod
238    def _configure_and_run_checkpoint(
239        cls,
240        dq_spec: DQSpec,
241        context: EphemeralDataContext,
242        batch_request: RuntimeBatchRequest,
243        expectation_suite_name: str,
244        source_pk: List[str],
245    ) -> Tuple[CheckpointResult, DataFrame]:
246        """Configure, run and return checkpoint results.
247
248        A checkpoint is what enables us to run the validations of the expectations'
249        suite on the batches of data.
250
251        Args:
252            dq_spec: data quality specification.
253            context: the EphemeralDataContext containing the configurations for the data
254                source and store backend.
255            batch_request: run time batch request to be able to query underlying data.
256            expectation_suite_name: name of the expectation suite.
257            source_pk: the primary key of the source data.
258
259        Returns:
260            The checkpoint results in two types: CheckpointResult and Dataframe.
261        """
262        checkpoint_name = f"{dq_spec.spec_id}-{dq_spec.input_id}-checkpoint"
263        context.add_or_update_checkpoint(
264            name=checkpoint_name,
265            class_name=DQDefaults.DATA_CHECKPOINTS_CLASS_NAME.value,
266            config_version=DQDefaults.DATA_CHECKPOINTS_CONFIG_VERSION.value,
267            run_name_template=f"%Y%m%d-%H%M%S-{checkpoint_name}",
268        )
269
270        result_format: Dict[str, Any] = {
271            "result_format": dq_spec.gx_result_format,
272        }
273        if source_pk:
274            result_format = {
275                **result_format,
276                "unexpected_index_column_names": source_pk,
277            }
278
279        results = context.run_checkpoint(
280            checkpoint_name=checkpoint_name,
281            validations=[
282                {
283                    "batch_request": batch_request,
284                    "expectation_suite_name": expectation_suite_name,
285                }
286            ],
287            result_format=result_format,
288        )
289
290        return results, cls._transform_checkpoint_results(
291            results.to_json_dict(), dq_spec
292        )
293
294    @classmethod
295    def _explode_results(
296        cls,
297        df: DataFrame,
298        dq_spec: DQSpec,
299    ) -> DataFrame:
300        """Transform dq results dataframe exploding a set of columns.
301
302        Args:
303            df: dataframe with dq results to be exploded.
304            dq_spec: data quality specification.
305        """
306        df = df.withColumn(
307            "validation_results", explode("run_results.validation_result.results")
308        ).withColumn("source", lit(dq_spec.source))
309
310        new_columns = [
311            "validation_results.expectation_config.kwargs.*",
312            "run_results.validation_result.statistics.*",
313            "validation_results.expectation_config.expectation_type",
314            "validation_results.success as expectation_success",
315            "validation_results.exception_info",
316        ] + dq_spec.result_sink_extra_columns
317
318        df_exploded = df.selectExpr(*df.columns, *new_columns).drop(
319            *[c.replace(".*", "").split(" as")[0] for c in new_columns]
320        )
321
322        schema = df_exploded.schema.simpleString()
323        if "unexpected_index_list" in schema:
324            df_exploded = (
325                df_exploded.withColumn(
326                    "unexpected_index_list",
327                    array(struct(lit(True).alias("run_success"))),
328                )
329                if df.select(
330                    col("validation_results.result.unexpected_index_list")
331                ).dtypes[0][1]
332                == "array<string>"
333                else df_exploded.withColumn(
334                    "unexpected_index_list",
335                    transform(
336                        col("validation_results.result.unexpected_index_list"),
337                        lambda x: x.withField("run_success", lit(False)),
338                    ),
339                )
340            )
341
342        if "observed_value" in schema:
343            df_exploded = df_exploded.withColumn(
344                "observed_value", col("validation_results.result.observed_value")
345            )
346
347        return (
348            df_exploded.withColumn("run_time_year", year(to_timestamp("run_time")))
349            .withColumn("run_time_month", month(to_timestamp("run_time")))
350            .withColumn("run_time_day", dayofmonth(to_timestamp("run_time")))
351            .withColumn("checkpoint_config", to_json(col("checkpoint_config")))
352            .withColumn("run_results", to_json(col("run_results")))
353            .withColumn(
354                "kwargs", to_json(col("validation_results.expectation_config.kwargs"))
355            )
356            .withColumn("validation_results", to_json(col("validation_results")))
357        )
358
359    @classmethod
360    def _get_batch_request(
361        cls, dq_spec: DQSpec, data: DataFrame
362    ) -> RuntimeBatchRequest:
363        """Get run time batch request to be able to query underlying data.
364
365        Args:
366            dq_spec: data quality process specification.
367            data: input dataframe to run the dq process on.
368
369        Returns:
370            The RuntimeBatchRequest object configuration.
371        """
372        return RuntimeBatchRequest(
373            datasource_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource",
374            data_connector_name=f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector",
375            data_asset_name=(
376                dq_spec.data_asset_name
377                if dq_spec.data_asset_name
378                else f"{dq_spec.spec_id}-{dq_spec.input_id}"
379            ),
380            batch_identifiers={
381                "spec_id": dq_spec.spec_id,
382                "input_id": dq_spec.input_id,
383                "timestamp": cls._TIMESTAMP,
384            },
385            runtime_parameters={"batch_data": data},
386        )
387
388    @classmethod
389    def _get_data_context_config(cls, dq_spec: DQSpec) -> DataContextConfig:
390        """Get the configuration of the data context.
391
392        Based on the configuration it is possible to define the backend to be
393        the file system (e.g. local file system) or S3, meaning that the DQ artefacts
394        will be stored according to this configuration.
395
396        Args:
397            dq_spec: data quality process specification.
398
399        Returns:
400            The DataContextConfig object configuration.
401        """
402        store_backend: Union[FilesystemStoreBackendDefaults, S3StoreBackendDefaults]
403        data_docs_site = None
404
405        if dq_spec.store_backend == DQDefaults.FILE_SYSTEM_STORE.value:
406            store_backend = FilesystemStoreBackendDefaults(
407                root_directory=dq_spec.local_fs_root_dir
408            )
409            data_docs_site = cls._get_data_docs_sites(
410                "local_site", store_backend.data_docs_sites, dq_spec
411            )
412        elif dq_spec.store_backend == DQDefaults.FILE_SYSTEM_S3_STORE.value:
413            store_backend = S3StoreBackendDefaults(
414                default_bucket_name=dq_spec.bucket,
415                validations_store_prefix=dq_spec.validations_store_prefix,
416                checkpoint_store_prefix=dq_spec.checkpoint_store_prefix,
417                expectations_store_prefix=dq_spec.expectations_store_prefix,
418                data_docs_prefix=dq_spec.data_docs_prefix,
419                data_docs_bucket_name=(
420                    dq_spec.data_docs_bucket
421                    if dq_spec.data_docs_bucket
422                    else dq_spec.bucket
423                ),
424            )
425            data_docs_site = cls._get_data_docs_sites(
426                "s3_site", store_backend.data_docs_sites, dq_spec
427            )
428
429        return DataContextConfig(
430            store_backend_defaults=store_backend,
431            data_docs_sites=data_docs_site,
432            anonymous_usage_statistics=AnonymizedUsageStatisticsConfig(enabled=False),
433        )
434
435    @classmethod
436    def _get_data_docs_sites(
437        cls, site_name: str, data_docs_site: dict, dq_spec: DQSpec
438    ) -> dict:
439        """Get the custom configuration of the data_docs_sites.
440
441        Args:
442            site_name: the name to give to the site.
443            data_docs_site: the default configuration for the data_docs_site.
444            dq_spec: data quality specification.
445
446        Returns:
447            Modified data_docs_site.
448        """
449        data_docs_site[site_name]["show_how_to_buttons"] = False
450
451        if site_name == "local_site":
452            data_docs_site[site_name]["store_backend"][
453                "base_directory"
454            ] = dq_spec.data_docs_prefix
455
456            if dq_spec.data_docs_local_fs:
457                # Enable to write data_docs in a separated path
458                data_docs_site[site_name]["store_backend"][
459                    "root_directory"
460                ] = dq_spec.data_docs_local_fs
461
462        return data_docs_site
463
464    @classmethod
465    def _get_data_source_defaults(cls, dq_spec: DQSpec) -> dict:
466        """Get the configuration for a datasource.
467
468        Args:
469            dq_spec: data quality specification.
470
471        Returns:
472            The python dictionary with the datasource configuration.
473        """
474        return {
475            "name": f"{dq_spec.spec_id}-{dq_spec.input_id}-datasource",
476            "class_name": DQDefaults.DATASOURCE_CLASS_NAME.value,
477            "execution_engine": {
478                "class_name": DQDefaults.DATASOURCE_EXECUTION_ENGINE.value,
479                "persist": False,
480            },
481            "data_connectors": {
482                f"{dq_spec.spec_id}-{dq_spec.input_id}-data_connector": {
483                    "module_name": DQDefaults.DATA_CONNECTORS_MODULE_NAME.value,
484                    "class_name": DQDefaults.DATA_CONNECTORS_CLASS_NAME.value,
485                    "assets": {
486                        (
487                            dq_spec.data_asset_name
488                            if dq_spec.data_asset_name
489                            else f"{dq_spec.spec_id}-{dq_spec.input_id}"
490                        ): {"batch_identifiers": DQDefaults.DQ_BATCH_IDENTIFIERS.value}
491                    },
492                }
493            },
494        }
495
496    @classmethod
497    def _get_failed_expectations(
498        cls, results: CheckpointResult, dq_spec: DQSpec
499    ) -> List[Any]:
500        """Get the failed expectations of a Checkpoint result.
501
502        Args:
503            results: the results of the DQ process.
504            dq_spec: data quality specification.
505
506        Returns: a list of failed expectations.
507        """
508        failed_expectations = []
509        for validation_result in results.list_validation_results():
510            expectations_results = validation_result["results"]
511            for result in expectations_results:
512                if not result["success"]:
513                    failed_expectations.append(result["expectation_config"])
514                    if result["exception_info"]["raised_exception"]:
515                        cls._LOGGER.error(
516                            f"""The expectation {str(result["expectation_config"])}
517                            raised the following exception:
518                            {result["exception_info"]["exception_message"]}"""
519                        )
520            cls._LOGGER.error(
521                f"{len(failed_expectations)} out of {len(expectations_results)} "
522                f"Data Quality Expectation(s) have failed! Failed Expectations: "
523                f"{failed_expectations}"
524            )
525
526            percentage_failure = 1 - (
527                validation_result["statistics"]["success_percent"] / 100
528            )
529
530            if (
531                dq_spec.max_percentage_failure is not None
532                and dq_spec.max_percentage_failure < percentage_failure
533            ):
534                raise DQValidationsFailedException(
535                    f"Max error threshold is being surpassed! "
536                    f"Expected: {dq_spec.max_percentage_failure} "
537                    f"Got: {percentage_failure}"
538                )
539
540        return failed_expectations
541
542    @classmethod
543    def _get_unexpected_rows_pk(cls, dq_spec: DQSpec) -> Optional[List[str]]:
544        """Get primary key for using on rows failing DQ validations.
545
546        Args:
547            dq_spec: data quality specification.
548
549        Returns: the list of columns that are part of the primary key.
550        """
551        if dq_spec.unexpected_rows_pk:
552            return dq_spec.unexpected_rows_pk
553        elif dq_spec.tbl_to_derive_pk:
554            return TableManager(
555                {"function": "get_tbl_pk", "table_or_view": dq_spec.tbl_to_derive_pk}
556            ).get_tbl_pk()
557        elif dq_spec.tag_source_data:
558            raise ValueError(
559                "You need to provide either the argument "
560                "'unexpected_rows_pk' or 'tbl_to_derive_pk'."
561            )
562        else:
563            return None
564
565    @classmethod
566    def _log_or_fail(cls, results: CheckpointResult, dq_spec: DQSpec) -> None:
567        """Log the execution of the Data Quality process.
568
569        Args:
570            results: the results of the DQ process.
571            dq_spec: data quality specification.
572        """
573        if results["success"]:
574            cls._LOGGER.info(
575                "The data passed all the expectations defined. Everything looks good!"
576            )
577        else:
578            failed_expectations = cls._get_failed_expectations(results, dq_spec)
579            if dq_spec.critical_functions:
580                critical_failure = cls._check_critical_functions_tags(
581                    failed_expectations
582                )
583
584                if critical_failure:
585                    raise DQValidationsFailedException(
586                        f"Data Quality Validations Failed, the following critical "
587                        f"expectations failed: {critical_failure}."
588                    )
589            elif dq_spec.fail_on_error:
590                raise DQValidationsFailedException("Data Quality Validations Failed!")
591
592    @classmethod
593    def _transform_checkpoint_results(
594        cls, checkpoint_results: dict, dq_spec: DQSpec
595    ) -> DataFrame:
596        """Transforms the checkpoint results and creates new entries.
597
598        All the items of the dictionary are cast to a json like format.
599        The validation_result_identifier is extracted from the run_results column
600        into a separated column. All columns are cast to json like format.
601        After that the dictionary is converted into a dataframe.
602
603        Args:
604            checkpoint_results: dict with results of the checkpoint run.
605            dq_spec: data quality specification.
606
607        Returns:
608            Transformed results dataframe.
609        """
610        results_json_dict = loads(dumps(checkpoint_results))
611
612        results_dict = {}
613        for key, value in results_json_dict.items():
614            if key == "run_results":
615                checkpoint_result_identifier = list(value.keys())[0]
616                # check if the grabbed identifier is correct
617                if (
618                    str(checkpoint_result_identifier)
619                    .lower()
620                    .startswith(DQDefaults.VALIDATION_COLUMN_IDENTIFIER.value)
621                ):
622                    results_dict["validation_result_identifier"] = (
623                        checkpoint_result_identifier
624                    )
625                    results_dict["run_results"] = value[checkpoint_result_identifier]
626                else:
627                    raise DQCheckpointsResultsException(
628                        "The checkpoint result identifier format is not "
629                        "in accordance to what is expected"
630                    )
631            else:
632                results_dict[key] = value
633
634        df = ExecEnv.SESSION.createDataFrame(
635            [json.dumps(results_dict)],
636            schema=StringType(),
637        )
638        schema = schema_of_json(df.select("value").head()[0])
639        df = df.withColumn("value", from_json("value", schema)).select("value.*")
640
641        cols_to_expand = ["run_id"]
642        df = (
643            df.select(
644                [
645                    col(c) if c not in cols_to_expand else col(f"{c}.*")
646                    for c in df.columns
647                ]
648            )
649            .drop(*cols_to_expand)
650            .withColumn("spec_id", lit(dq_spec.spec_id))
651            .withColumn("input_id", lit(dq_spec.input_id))
652        )
653
654        return (
655            cls._explode_results(df, dq_spec)
656            if dq_spec.result_sink_explode
657            else df.withColumn(
658                "checkpoint_config", to_json(col("checkpoint_config"))
659            ).withColumn("run_results", to_json(col("run_results")))
660        )
661
662    @classmethod
663    def _write_to_result_sink(
664        cls,
665        dq_spec: DQSpec,
666        df: DataFrame,
667        data: OrderedDict = None,
668    ) -> None:
669        """Write dq results dataframe to a table or location.
670
671        It can be written:
672        - a raw output (having result_sink_explode set as False)
673        - an exploded output (having result_sink_explode set as True), which
674        is more prepared for analysis, with some columns exploded, flatten and
675        transformed. It can also be set result_sink_extra_columns with other
676        columns desired to have in the output table or location.
677
678        Args:
679            dq_spec: data quality specification.
680            df: dataframe with dq results to write.
681            data: list of all dfs generated on previous steps before writer.
682        """
683        if dq_spec.result_sink_db_table or dq_spec.result_sink_location:
684            options = {"mergeSchema": "true"} if dq_spec.result_sink_explode else {}
685
686            WriterFactory.get_writer(
687                spec=OutputSpec(
688                    spec_id="dq_result_sink",
689                    input_id="dq_result",
690                    db_table=dq_spec.result_sink_db_table,
691                    location=dq_spec.result_sink_location,
692                    partitions=(
693                        dq_spec.result_sink_partitions
694                        if dq_spec.result_sink_partitions
695                        else []
696                    ),
697                    write_type=WriteType.APPEND.value,
698                    data_format=dq_spec.result_sink_format,
699                    options=(
700                        options
701                        if dq_spec.result_sink_options is None
702                        else {**dq_spec.result_sink_options, **options}
703                    ),
704                ),
705                df=df,
706                data=data,
707            ).write()

Class for the Data Quality Factory.

@classmethod
def run_dq_process( cls, dq_spec: lakehouse_engine.core.definitions.DQSpec, data: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
 66    @classmethod
 67    def run_dq_process(cls, dq_spec: DQSpec, data: DataFrame) -> DataFrame:
 68        """Run the specified data quality process on a dataframe.
 69
 70        Based on the dq_specs we apply the defined expectations on top of the dataframe
 71        in order to apply the necessary validations and then output the result of
 72        the data quality process.
 73
 74        Args:
 75            dq_spec: data quality specification.
 76            data: input dataframe to run the dq process on.
 77
 78        Returns:
 79            The DataFrame containing the results of the DQ process.
 80        """
 81        # import custom expectations for them to be available to be used.
 82        for expectation in DQDefaults.CUSTOM_EXPECTATION_LIST.value:
 83            importlib.__import__(
 84                "lakehouse_engine.dq_processors.custom_expectations." + expectation
 85            )
 86
 87        context = get_context(project_config=cls._get_data_context_config(dq_spec))
 88        context.add_datasource(**cls._get_data_source_defaults(dq_spec))
 89
 90        expectation_suite_name = (
 91            dq_spec.expectation_suite_name
 92            if dq_spec.expectation_suite_name
 93            else f"{dq_spec.spec_id}-{dq_spec.input_id}-{dq_spec.dq_type}"
 94        )
 95        context.add_or_update_expectation_suite(
 96            expectation_suite_name=expectation_suite_name
 97        )
 98
 99        batch_request = cls._get_batch_request(dq_spec, data)
100
101        if (
102            dq_spec.dq_type == DQType.VALIDATOR.value
103            or dq_spec.dq_type == DQType.PRISMA.value
104        ):
105            Validator.get_dq_validator(
106                context,
107                batch_request,
108                expectation_suite_name,
109                dq_spec.dq_functions,
110                dq_spec.critical_functions,
111            )
112
113            source_pk = cls._get_unexpected_rows_pk(dq_spec)
114            results, results_df = cls._configure_and_run_checkpoint(
115                dq_spec, context, batch_request, expectation_suite_name, source_pk
116            )
117
118            if dq_spec.dq_type == DQType.PRISMA.value:
119
120                results_df = results_df.withColumn("source_primary_key", lit(source_pk))
121
122                processed_keys_df = data.select(
123                    concat_ws(
124                        ", ", *[coalesce(col(c), lit("null")) for c in source_pk]
125                    ).alias("combined_pk")
126                )
127                comb_pk_expr = (
128                    sort_array(collect_list("combined_pk"))
129                    if dq_spec.sort_processed_keys
130                    else collect_list("combined_pk")
131                )
132                processed_keys_df = processed_keys_df.agg(
133                    concat_ws("||", comb_pk_expr).alias("processed_keys")
134                )
135
136                results_df = results_df.join(processed_keys_df, lit(1) == lit(1))
137
138            cls._write_to_result_sink(dq_spec, results_df)
139
140            cls._log_or_fail(results, dq_spec)
141
142            if (
143                dq_spec.tag_source_data
144                and dq_spec.result_sink_explode
145                and dq_spec.fail_on_error is not True
146            ):
147                data = Validator.tag_source_with_dq(source_pk, data, results_df)
148        else:
149            raise TypeError(
150                f"Type of Data Quality '{dq_spec.dq_type}' is not supported."
151            )
152
153        return data

Run the specified data quality process on a dataframe.

Based on the dq_specs we apply the defined expectations on top of the dataframe in order to apply the necessary validations and then output the result of the data quality process.

Arguments:
  • dq_spec: data quality specification.
  • data: input dataframe to run the dq process on.
Returns:

The DataFrame containing the results of the DQ process.

@classmethod
def build_data_docs( cls, store_backend: str = 's3', local_fs_root_dir: str = None, data_docs_local_fs: str = None, data_docs_prefix: str = 'dq/data_docs/site/', bucket: str = None, data_docs_bucket: str = None, expectations_store_prefix: str = 'dq/expectations/', validations_store_prefix: str = 'dq/validations/', checkpoint_store_prefix: str = 'dq/checkpoints/') -> None:
155    @classmethod
156    def build_data_docs(
157        cls,
158        store_backend: str = DQDefaults.STORE_BACKEND.value,
159        local_fs_root_dir: str = None,
160        data_docs_local_fs: str = None,
161        data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value,
162        bucket: str = None,
163        data_docs_bucket: str = None,
164        expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
165        validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value,
166        checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value,
167    ) -> None:
168        """Build Data Docs for the project.
169
170        This function does a full build of data docs based on all the great expectations
171        checkpoints in the specified location, getting all history of run/validations
172        executed and results.
173
174        Args:
175            store_backend: which store_backend to use (e.g. s3 or file_system).
176            local_fs_root_dir: path of the root directory. Note: only applicable
177                for store_backend file_system
178            data_docs_local_fs: path of the root directory. Note: only applicable
179                for store_backend file_system.
180            data_docs_prefix: prefix where to store data_docs' data.
181            bucket: the bucket name to consider for the store_backend
182                (store DQ artefacts). Note: only applicable for store_backend s3.
183            data_docs_bucket: the bucket name for data docs only. When defined,
184                it will supersede bucket parameter.
185                Note: only applicable for store_backend s3.
186            expectations_store_prefix: prefix where to store expectations' data.
187                Note: only applicable for store_backend s3.
188            validations_store_prefix: prefix where to store validations' data.
189                Note: only applicable for store_backend s3.
190            checkpoint_store_prefix: prefix where to store checkpoints' data.
191                Note: only applicable for store_backend s3.
192        """
193        if store_backend == DQDefaults.STORE_BACKEND.value:
194            dq_spec = DQSpec(
195                spec_id="dq_validator",
196                input_id="dq",
197                dq_type=DQType.VALIDATOR.value,
198                store_backend=DQDefaults.STORE_BACKEND.value,
199                data_docs_prefix=data_docs_prefix,
200                bucket=bucket,
201                data_docs_bucket=data_docs_bucket,
202                expectations_store_prefix=expectations_store_prefix,
203                validations_store_prefix=validations_store_prefix,
204                checkpoint_store_prefix=checkpoint_store_prefix,
205            )
206        elif store_backend == DQDefaults.FILE_SYSTEM_STORE.value:
207            dq_spec = DQSpec(
208                spec_id="dq_validator",
209                input_id="dq",
210                dq_type=DQType.VALIDATOR.value,
211                store_backend=DQDefaults.FILE_SYSTEM_STORE.value,
212                local_fs_root_dir=local_fs_root_dir,
213                data_docs_local_fs=data_docs_local_fs,
214                data_docs_prefix=data_docs_prefix,
215            )
216        context = get_context(project_config=cls._get_data_context_config(dq_spec))
217        cls._LOGGER.info("The data docs were rebuilt")
218        context.build_data_docs()

Build Data Docs for the project.

This function does a full build of data docs based on all the great expectations checkpoints in the specified location, getting all history of run/validations executed and results.

Arguments:
  • store_backend: which store_backend to use (e.g. s3 or file_system).
  • local_fs_root_dir: path of the root directory. Note: only applicable for store_backend file_system
  • data_docs_local_fs: path of the root directory. Note: only applicable for store_backend file_system.
  • data_docs_prefix: prefix where to store data_docs' data.
  • bucket: the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3.
  • data_docs_bucket: the bucket name for data docs only. When defined, it will supersede bucket parameter. Note: only applicable for store_backend s3.
  • expectations_store_prefix: prefix where to store expectations' data. Note: only applicable for store_backend s3.
  • validations_store_prefix: prefix where to store validations' data. Note: only applicable for store_backend s3.
  • checkpoint_store_prefix: prefix where to store checkpoints' data. Note: only applicable for store_backend s3.