Reconciliator

Checking if data reconciles, using this algorithm, is a matter of reading the truth data and the current data. You can use any input specification compatible with the lakehouse engine to read truth or current data. On top of that, you can pass a truth_preprocess_query and a current_preprocess_query so you can preprocess the data before it goes into the actual reconciliation process. The reconciliation process is focused on joining truth with current by all provided columns except the ones passed as metrics.

In the table below, we present how a simple reconciliation would look like:

current_country current_count truth_country truth_count absolute_diff perc_diff yellow red recon_type
Sweden 123 Sweden 120 3 0.025 0.1 0.2 percentage
Germany 2946 Sweden 2946 0 0 0.1 0.2 percentage
France 2901 France 2901 0 0 0.1 0.2 percentage
Belgium 426 Belgium 425 1 0.002 0.1 0.2 percentage

The Reconciliator algorithm uses an ACON to configure its execution. You can find the meaning of each ACON property in ReconciliatorSpec object.

Below there is an example of usage of reconciliator.

from lakehouse_engine.engine import execute_reconciliation

truth_query = """
  SELECT
    shipping_city,
    sum(sales_order_qty) as qty,
    order_date_header
  FROM (
    SELECT
      ROW_NUMBER() OVER (
        PARTITION BY sales_order_header, sales_order_schedule, sales_order_item, shipping_city
        ORDER BY changed_on desc
      ) as rank1,
      sales_order_header,
      sales_order_item,
      sales_order_qty,
      order_date_header,
      shipping_city
    FROM truth -- truth is a locally accessible temp view created by the lakehouse engine
    WHERE order_date_header = '2021-10-01'
  ) a
WHERE a.rank1 = 1
GROUP BY a.shipping_city, a.order_date_header
"""

current_query = """
  SELECT
    shipping_city,
    sum(sales_order_qty) as qty,
    order_date_header
  FROM (
    SELECT
      ROW_NUMBER() OVER (
        PARTITION BY sales_order_header, sales_order_schedule, sales_order_item, shipping_city
        ORDER BY changed_on desc
      ) as rank1,
      sales_order_header,
      sales_order_item,
      sales_order_qty,
      order_date_header,
      shipping_city
    FROM current -- current is a locally accessible temp view created by the lakehouse engine
    WHERE order_date_header = '2021-10-01'
  ) a
WHERE a.rank1 = 1
GROUP BY a.shipping_city, a.order_date_header
"""

acon = {
    "metrics": [{"metric": "qty", "type": "percentage", "aggregation": "avg", "yellow": 0.05, "red": 0.1}],
    "truth_input_spec": {
        "spec_id": "truth",
        "read_type": "batch",
        "data_format": "csv",
        "schema_path": "s3://my_data_product_bucket/artefacts/metadata/schemas/bronze/orders.json",
        "options": {
            "delimiter": "^",
            "dateFormat": "yyyyMMdd",
        },
        "location": "s3://my_data_product_bucket/bronze/orders",
    },
    "truth_preprocess_query": truth_query,
    "current_input_spec": {
        "spec_id": "current",
        "read_type": "batch",
        "data_format": "delta",
        "db_table": "my_database.orders",
    },
    "current_preprocess_query": current_query,
}

execute_reconciliation(acon=acon)