Streaming Delta Load with Group and Rank Condensation

This scenario is useful for when we want to do delta loads based on changelogs that need to be first condensed based on a group by and then a rank only, instead of the record mode logic in the record mode based change data capture. As for other cases, the acon configuration should be executed with load_data using:

from lakehouse_engine.engine import load_data
acon = {...}
load_data(acon=acon)

Example of ACON configuration:

{
  "input_specs": [
    {
      "spec_id": "sales_bronze",
      "read_type": "streaming",
      "data_format": "csv",
      "schema_path": "file:///app/tests/lakehouse/in/feature/delta_load/group_and_rank/with_duplicates_in_same_file/streaming/source_schema.json",
      "with_filepath": true,
      "options": {
        "mode": "FAILFAST",
        "header": true,
        "delimiter": "|"
      },
      "location": "file:///app/tests/lakehouse/in/feature/delta_load/group_and_rank/with_duplicates_in_same_file/streaming/data"
    }
  ],
  "transform_specs": [
    {
      "spec_id": "sales_bronze_with_extraction_date",
      "input_id": "sales_bronze",
      "transformers": [
        {
          "function": "with_regex_value",
          "args": {
            "input_col": "lhe_extraction_filepath",
            "output_col": "extraction_date",
            "drop_input_col": true,
            "regex": ".*WE_SO_SCL_(\\d+).csv"
          }
        },
        {
          "function": "with_auto_increment_id"
        },
        {
          "function": "group_and_rank",
          "args": {
            "group_key": [
              "salesorder",
              "item"
            ],
            "ranking_key": [
              "extraction_date",
              "changed_on",
              "lhe_row_id"
            ]
          }
        },
        {
          "function": "repartition",
          "args": {
            "num_partitions": 1
          }
        }
      ]
    }
  ],
  "output_specs": [
    {
      "spec_id": "sales_silver",
      "input_id": "sales_bronze_with_extraction_date",
      "write_type": "merge",
      "data_format": "delta",
      "location": "file:///app/tests/lakehouse/out/feature/delta_load/group_and_rank/with_duplicates_in_same_file/streaming/data",
      "options": {
        "checkpointLocation": "file:///app/tests/lakehouse/out/feature/delta_load/group_and_rank/with_duplicates_in_same_file/streaming/checkpoint"
      },
      "with_batch_id": true,
      "merge_opts": {
        "merge_predicate": "current.salesorder = new.salesorder and current.item = new.item",
        "update_predicate": "new.extraction_date >= current.extraction_date and new.changed_on >= current.changed_on",
        "delete_predicate": "new.extraction_date >= current.extraction_date and new.changed_on >= current.changed_on and new.event = 'deleted'"
      }
    }
  ]
}
Relevant notes:
  • This type of delta load with this type of condensation is useful when the source changelog can be condensed based on dates, instead of technical fields like datapakid, record, record_mode, etc., as we see in SAP BW DSOs.An example of such system is Omnihub Tibco orders and deliveries files.