Streaming Append Load with Optimize Dataset Terminator
This scenario includes a terminator which optimizes a dataset (table), being able of vacuuming the table, optimising it with z-order or not, computing table statistics and more. You can find more details on the Terminator here.
As for other cases, the acon configuration should be executed with load_data
using:
from lakehouse_engine.engine import load_data
acon = {...}
load_data(acon=acon)
Example of ACON configuration:
{
"input_specs": [
{
"spec_id": "sales_source",
"read_type": "streaming",
"data_format": "csv",
"options": {
"header": true,
"delimiter": "|",
"mode": "DROPMALFORMED"
},
"location": "file:///app/tests/lakehouse/in/feature/append_load/streaming_with_terminators/data",
"schema": {
"type": "struct",
"fields": [
{
"name": "salesorder",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "item",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "date",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "customer",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "article",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "amount",
"type": "integer",
"nullable": true,
"metadata": {}
}
]
}
}
],
"output_specs": [
{
"spec_id": "sales_bronze",
"input_id": "sales_source",
"write_type": "append",
"db_table": "test_db.streaming_with_terminators_table",
"data_format": "delta",
"partitions": [
"date"
],
"options": {
"checkpointLocation": "file:///app/tests/lakehouse/out/feature/append_load/streaming_with_terminators/checkpoint"
},
"location": "file:///app/tests/lakehouse/out/feature/append_load/streaming_with_terminators/data"
}
],
"terminate_specs": [
{
"function": "optimize_dataset",
"args": {
"db_table": "test_db.streaming_with_terminators_table",
"debug": true
}
}
]
}