Reconciliator
Checking if data reconciles, using this algorithm, is a matter of reading the truth data and the current data.
You can use any input specification compatible with the lakehouse engine to read truth or current data. On top
of that, you can pass a truth_preprocess_query
and a current_preprocess_query
so you can preprocess the data before
it goes into the actual reconciliation process. The reconciliation process is focused on joining truth
with current
by all provided columns except the ones passed as metrics
.
In the table below, we present how a simple reconciliation would look like:
current_country | current_count | truth_country | truth_count | absolute_diff | perc_diff | yellow | red | recon_type |
---|---|---|---|---|---|---|---|---|
Sweden | 123 | Sweden | 120 | 3 | 0.025 | 0.1 | 0.2 | percentage |
Germany | 2946 | Sweden | 2946 | 0 | 0 | 0.1 | 0.2 | percentage |
France | 2901 | France | 2901 | 0 | 0 | 0.1 | 0.2 | percentage |
Belgium | 426 | Belgium | 425 | 1 | 0.002 | 0.1 | 0.2 | percentage |
The Reconciliator algorithm uses an ACON to configure its execution. You can find the meaning of each ACON property in ReconciliatorSpec object.
Below there is an example of usage of reconciliator.
from lakehouse_engine.engine import execute_reconciliation
truth_query = """
SELECT
shipping_city,
sum(sales_order_qty) as qty,
order_date_header
FROM (
SELECT
ROW_NUMBER() OVER (
PARTITION BY sales_order_header, sales_order_schedule, sales_order_item, shipping_city
ORDER BY changed_on desc
) as rank1,
sales_order_header,
sales_order_item,
sales_order_qty,
order_date_header,
shipping_city
FROM truth -- truth is a locally accessible temp view created by the lakehouse engine
WHERE order_date_header = '2021-10-01'
) a
WHERE a.rank1 = 1
GROUP BY a.shipping_city, a.order_date_header
"""
current_query = """
SELECT
shipping_city,
sum(sales_order_qty) as qty,
order_date_header
FROM (
SELECT
ROW_NUMBER() OVER (
PARTITION BY sales_order_header, sales_order_schedule, sales_order_item, shipping_city
ORDER BY changed_on desc
) as rank1,
sales_order_header,
sales_order_item,
sales_order_qty,
order_date_header,
shipping_city
FROM current -- current is a locally accessible temp view created by the lakehouse engine
WHERE order_date_header = '2021-10-01'
) a
WHERE a.rank1 = 1
GROUP BY a.shipping_city, a.order_date_header
"""
acon = {
"metrics": [{"metric": "qty", "type": "percentage", "aggregation": "avg", "yellow": 0.05, "red": 0.1}],
"truth_input_spec": {
"spec_id": "truth",
"read_type": "batch",
"data_format": "csv",
"schema_path": "s3://my_data_product_bucket/artefacts/metadata/schemas/bronze/orders.json",
"options": {
"delimiter": "^",
"dateFormat": "yyyyMMdd",
},
"location": "s3://my_data_product_bucket/bronze/orders",
},
"truth_preprocess_query": truth_query,
"current_input_spec": {
"spec_id": "current",
"read_type": "batch",
"data_format": "delta",
"db_table": "my_database.orders",
},
"current_preprocess_query": current_query,
}
execute_reconciliation(acon=acon)