Result Sink

These scenarios store the results of the dq_specs into a result sink. For that, both scenarios include parameters defining the specific table and location (result_sink_db_table and result_sink_location) where the results are expected to be stored. With this configuration, people can, later on, check the history of the DQ executions using the configured table/location, as shown bellow. You can configure saving the output of the results in the result sink following two approaches:

... source column max_value min_value expectation_type expectation_success observed_value run_time_year ...
all columns from raw + more deliveries salesorder null null expect_column_to_exist TRUE null 2023 ...
all columns from raw + more deliveries null null null expect_table_row_count_to_be_between TRUE 23 2023 ...
all columns from raw + more deliveries null null null expect_table_column_count_to_be_between TRUE 6 2023 ...
  • Raw Format Data Model (not recommended) - the results are stored in the raw format that Great Expectations outputs. This is not recommended as the data will be highly nested and in a string format (to prevent problems with schema changes), which makes analysis and the creation of a dashboard on top way harder.
checkpoint_config run_name run_time run_results success validation_result_identifier spec_id input_id
entire configuration 20230323-...-dq_validation 2023-03-23T15:11:32.225354+00:00 results of the 3 expectations true/false for the run identifier spec_id input_id
  • More configurations can be applied in the result sink, as the file format and partitions.
  • It is recommended to:
    • Use the same result sink table/location for all dq_specs across different data loads, from different sources, in the same Data Product.
    • Use the parameter source (only available with "result_sink_explode": True), in the dq_specs, as used in both scenarios, with the name of the data source, to be easier to distinguish sources in the analysis. If not specified, the input_id of the dq_spec will be considered as the source.
    • These recommendations will enable more rich analysis/dashboard at Data Product level, considering all the different sources and data loads that the Data Product is having.

This scenario stores DQ Results (results produces by the execution of the dq_specs) in the Result Sink, in a detailed format, in which people are able to analyse them by Data Quality Run, by expectation_type and by keyword arguments. This is the recommended approach since it makes the analysis on top of the result sink way easier and faster.

For achieving the exploded data model, this scenario introduces the parameter result_sink_explode, which is a flag to determine if the output table/location should have the columns exploded (as True) or not (as False). Default: True, but it is still provided explicitly in this scenario for demo purposes. The table/location will include a schema which contains general columns, statistic columns, arguments of expectations, and others, thus part of the schema will be always with values and other part will depend on the expectations chosen.

from lakehouse_engine.engine import load_data

acon = {
    "input_specs": [
        {
            "spec_id": "dummy_deliveries_source",
            "read_type": "batch",
            "data_format": "csv",
            "options": {
                "header": True,
                "delimiter": "|",
                "inferSchema": True,
            },
            "location": "s3://my_data_product_bucket/dummy_deliveries/",
        }
    ],
    "dq_specs": [
        {
            "spec_id": "dq_validator",
            "input_id": "dummy_deliveries_source",
            "dq_type": "validator",
            "bucket": "my_data_product_bucket",
            "data_docs_bucket": "my_dq_data_docs_bucket",
            "data_docs_prefix": "dq/my_data_product/data_docs/site/",
            "result_sink_db_table": "my_database.dq_result_sink",
            "result_sink_location": "my_dq_path/dq_result_sink/",
            "result_sink_explode": True,
            "tbl_to_derive_pk": "my_database.dummy_deliveries",
            "source": "deliveries_success",
            "dq_functions": [
                {"function": "expect_column_to_exist", "args": {"column": "salesorder"}},
                {"function": "expect_table_row_count_to_be_between", "args": {"min_value": 15, "max_value": 25}},
                {"function": "expect_table_column_count_to_be_between", "args": {"max_value": 7}},
            ],
        }
    ],
    "output_specs": [
        {
            "spec_id": "dummy_deliveries_bronze",
            "input_id": "dq_validator",
            "write_type": "overwrite",
            "data_format": "delta",
            "location": "s3://my_data_product_bucket/bronze/dummy_deliveries_dq_template/",
        }
    ],
}

load_data(acon=acon)

To check the history of the DQ results, you can run commands like:

  • the table: display(spark.table("my_database.dq_result_sink"))
  • the location: display(spark.read.format("delta").load("my_dq_path/dq_result_sink/"))

2. Raw Result Sink

This scenario is very similar to the previous one, but it changes the parameter result_sink_explode to False so that it produces a raw result sink output containing only one row representing the full run of dq_specs (no matter the amount of expectations/dq_functions defined there). Being a raw output, it is not a recommended approach, as it will be more complicated to analyse and make queries on top of it.

from lakehouse_engine.engine import load_data

acon = {
    "input_specs": [
        {
            "spec_id": "dummy_deliveries_source",
            "read_type": "batch",
            "data_format": "csv",
            "options": {
                "header": True,
                "delimiter": "|",
                "inferSchema": True,
            },
            "location": "s3://my_data_product_bucket/dummy_deliveries/",
        }
    ],
    "dq_specs": [
        {
            "spec_id": "dq_validator",
            "input_id": "dummy_deliveries_source",
            "dq_type": "validator",
            "bucket": "my_data_product_bucket",
            "data_docs_bucket": "my_dq_data_docs_bucket",
            "data_docs_prefix": "dq/my_data_product/data_docs/site/",
            "result_sink_db_table": "my_database.dq_result_sink_raw",
            "result_sink_location": "my_dq_path/dq_result_sink_raw/",
            "result_sink_explode": False,
            "tbl_to_derive_pk": "{{ configs.database }}.dummy_deliveries",
            "source": "deliveries_success_raw",
            "dq_functions": [
                {"function": "expect_column_to_exist", "args": {"column": "salesorder"}},
                {"function": "expect_table_row_count_to_be_between", "args": {"min_value": 15, "max_value": 25}},
                {"function": "expect_table_column_count_to_be_between", "args": {"max_value": 7}},
            ],
        }
    ],
    "output_specs": [
        {
            "spec_id": "dummy_deliveries_bronze",
            "input_id": "dq_validator",
            "write_type": "overwrite",
            "data_format": "delta",
            "location": "s3://my_data_product_bucket/bronze/dummy_deliveries_dq_template/",
        }
    ],
}

load_data(acon=acon)

To check the history of the DQ results, you can run commands like:

  • the table: display(spark.table("my_database.dq_result_sink_raw"))
  • the location: display(spark.read.format("delta").load("my_dq_path/dq_result_sink_raw/"))