Data Loader
How to configure a DataLoader algorithm in the lakehouse-engine by using an ACON file?
An algorithm (e.g., data load) in the lakehouse-engine is configured using an ACON. The lakehouse-engine is a configuration-driven framework, so people don't have to write code to execute a Spark algorithm. In contrast, the algorithm is written in pyspark and accepts configurations through a JSON file (an ACON - algorithm configuration). The ACON is the configuration providing the behaviour of a lakehouse engine algorithm. You can check the algorithm code, and how it interprets the ACON here. In this page we will go through the structure of an ACON file and what are the most suitable ACON files for common data engineering scenarios. Check the underneath pages to find several ACON examples that cover many data extraction, transformation and loading scenarios.
Overview of the Structure of the ACON file for DataLoads
An ACON-based algorithm needs several specifications to work properly, but some of them might be optional. The available specifications are:
- Input specifications (input_specs): specify how to read data. This is a mandatory keyword.
- Transform specifications (transform_specs): specify how to transform data.
- Data quality specifications (dq_specs): specify how to execute the data quality process.
- Output specifications (output_specs): specify how to write data to the target. This is a mandatory keyword.
- Terminate specifications (terminate_specs): specify what to do after writing into the target (e.g., optimising target table, vacuum, compute stats, expose change data feed to external location, etc.).
- Execution environment (exec_env): custom Spark session configurations to be provided for your algorithm (configurations can also be provided from your job/cluster configuration, which we highly advise you to do instead of passing performance related configs here for example).
Below is an example of a complete ACON file that reads from a s3 folder with CSVs and incrementally loads that data (using a merge) into a delta lake table.
spec_id is one of the main concepts to ensure you can chain the steps of the algorithm, so, for example, you can specify the transformations (in transform_specs) of a DataFrame that was read in the input_specs. Check ACON below to see how the spec_id of the input_specs is used as input_id in one transform specification.
from lakehouse_engine.engine import load_data
acon = {
"input_specs": [
{
"spec_id": "orders_bronze",
"read_type": "streaming",
"data_format": "csv",
"schema_path": "s3://my-data-product-bucket/artefacts/metadata/bronze/schemas/orders.json",
"with_filepath": True,
"options": {
"badRecordsPath": "s3://my-data-product-bucket/badrecords/order_events_with_dq/",
"header": False,
"delimiter": "\u005E",
"dateFormat": "yyyyMMdd"
},
"location": "s3://my-data-product-bucket/bronze/orders/"
}
],
"transform_specs": [
{
"spec_id": "orders_bronze_with_extraction_date",
"input_id": "orders_bronze",
"transformers": [
{
"function": "with_row_id"
},
{
"function": "with_regex_value",
"args": {
"input_col": "lhe_extraction_filepath",
"output_col": "extraction_date",
"drop_input_col": True,
"regex": ".*WE_SO_SCL_(\\d+).csv"
}
}
]
}
],
"dq_specs": [
{
"spec_id": "check_orders_bronze_with_extraction_date",
"input_id": "orders_bronze_with_extraction_date",
"dq_type": "validator",
"result_sink_db_table": "my_database.my_table_dq_checks",
"fail_on_error": False,
"dq_functions": [
{
"dq_function": "expect_column_values_to_not_be_null",
"args": {
"column": "omnihub_locale_code"
}
},
{
"dq_function": "expect_column_unique_value_count_to_be_between",
"args": {
"column": "product_division",
"min_value": 10,
"max_value": 100
}
},
{
"dq_function": "expect_column_max_to_be_between",
"args": {
"column": "so_net_value",
"min_value": 10,
"max_value": 1000
}
},
{
"dq_function": "expect_column_value_lengths_to_be_between",
"args": {
"column": "omnihub_locale_code",
"min_value": 1,
"max_value": 10
}
},
{
"dq_function": "expect_column_mean_to_be_between",
"args": {
"column": "coupon_code",
"min_value": 15,
"max_value": 20
}
}
]
}
],
"output_specs": [
{
"spec_id": "orders_silver",
"input_id": "check_orders_bronze_with_extraction_date",
"data_format": "delta",
"write_type": "merge",
"partitions": [
"order_date_header"
],
"merge_opts": {
"merge_predicate": """
new.sales_order_header = current.sales_order_header
and new.sales_order_schedule = current.sales_order_schedule
and new.sales_order_item=current.sales_order_item
and new.epoch_status=current.epoch_status
and new.changed_on=current.changed_on
and new.extraction_date=current.extraction_date
and new.lhe_batch_id=current.lhe_batch_id
and new.lhe_row_id=current.lhe_row_id
""",
"insert_only": True
},
"db_table": "my_database.my_table_with_dq",
"location": "s3://my-data-product-bucket/silver/order_events_with_dq/",
"with_batch_id": True,
"options": {
"checkpointLocation": "s3://my-data-product-bucket/checkpoints/order_events_with_dq/"
}
}
],
"terminate_specs": [
{
"function": "optimize_dataset",
"args": {
"db_table": "my_database.my_table_with_dq"
}
}
],
"exec_env": {
"spark.databricks.delta.schema.autoMerge.enabled": True
}
}
load_data(acon=acon)
Input Specifications
You specify how to read the data by providing a list of Input Specifications. Usually there's just one element in that list, as, in the lakehouse, you are generally focused on reading data from one layer (e.g., source, bronze, silver, gold) and put it on the next layer. However, there may be scenarios where you would like to combine two datasets (e.g., joins or incremental filtering on one dataset based on the values of another one), therefore you can use one or more elements. More information about InputSpecs.
Relevant notes
- A spec id is fundamental, so you can use the input data later on in any step of the algorithm (transform, write, dq process, terminate).
- You don't have to specify
db_table
andlocation
at the same time. Depending on the data_format sometimes you read from a table (e.g., jdbc or deltalake table) sometimes you read from a location (e.g., files like deltalake, parquet, json, avro... or kafka topic).
Transform Specifications
In the lakehouse engine, you transform data by providing a transform specification, which contains a list of transform functions (transformers). So the transform specification acts upon on input, and it can execute multiple lakehouse engine transformation functions (transformers) upon that input.
If you look into the example above we ask the lakehouse engine to execute two functions on the orders_bronze
input
data: with_row_id
and with_regex_value
. Those functions can of course receive arguments. You can see a list of all
available transformation functions (transformers) here lakehouse_engine.transformers
. Then, you just invoke them in
your ACON as demonstrated above, following exactly the same function name and parameters name as described in the code
documentation.
More information about TransformSpec.
Relevant notes
- This stage is fully optional, you can omit it from the ACON.
- There is one relevant option
force_streaming_foreach_batch_processing
that can be used to force the transform to be executed in the foreachBatch function to ensure non-supported streaming operations can be properly executed. You don't have to worry about this if you are using regular lakehouse engine transformers. But if you are providing your custom logic in pyspark code via our lakehouse engine custom_transformation (lakehouse_engine.transformers.custom_transformers
) then sometimes your logic may contain Spark functions that are not compatible with Spark Streaming, and therefore this flag can enable all of your computation to be streaming-compatible by pushing down all the logic into the foreachBatch() function.
Data Quality Specifications
One of the most relevant features of the lakehouse engine is that you can have data quality guardrails that prevent you from loading bad data into your target layer (e.g., bronze, silver or gold). The lakehouse engine data quality process includes one main feature at the moment:
- Validator: The capability to perform data quality checks on that data (e.g., is the max value of a column bigger than x?) and even tag your data with the results of the DQ checks.
The output of the data quality process can be written into a Result Sink target (e.g. table or files) and is integrated with a Data Docs website, which can be a company-wide available website for people to check the quality of their data and share with others.
To achieve all of this functionality the lakehouse engine uses Great Expectations internally. To hide the Great Expectations internals from our user base and provide friendlier abstractions using the ACON, we have developed the concept of DQSpec that can contain many DQFunctionSpec objects, which is very similar to the relationship between the TransformSpec and TransformerSpec, which means you can have multiple Great Expectations functions executed inside a single data quality specification (as in the ACON above).
The names of the functions and args are a 1 to 1 match of Great Expectations API.
More information about DQSpec.
Relevant notes
- You can write the outputs of the DQ process to a sink through the result_sink* parameters of the
DQSpec.
result_sink_options
takes any Spark options for a DataFrame writer, which means you can specify the options according to your sink format (e.g., delta, parquet, json, etc.). We usually recommend using"delta"
as format. - You can use the results of the DQ checks to tag the data that you are validating. When configured, these details will appear as a new column (like any other), as part of the tables of your Data Product.
- To be able to make an analysis with the data of
result_sink*
, we have available an approach in which you setresult_sink_explode
as true (which is the default) and then you have some columns expanded. Those are:- General columns: Those are columns that have the basic information regarding
dq_specs
and will have always values and does not depend on the expectation types chosen. - Columns:checkpoint_config
,run_name
,run_time
,run_results
,success
,validation_result_identifier
,spec_id
,input_id
,validation_results
,run_time_year
,run_time_month
,run_time_day
. - Statistics columns: Those are columns that have information about the runs of expectations, being those values for
the run and not for each expectation. Those columns come from
run_results.validation_result.statistics.*
.- Columns:
evaluated_expectations
,success_percent
,successful_expectations
,unsuccessful_expectations
.
- Columns:
- Expectations columns: Those are columns that have information about the expectation executed.
- Columns:
expectation_type
,batch_id
,expectation_success
,exception_info
. Those columns are exploded fromrun_results.validation_result.results
insideexpectation_config.expectation_type
,expectation_config.kwargs.batch_id
,success as expectation_success
, andexception_info
. Moreover, we also includeunexpected_index_list
,observed_value
andkwargs
.
- Columns:
- Arguments of Expectations columns: Those are columns that will depend on the expectation_type selected. Those
columns are exploded from
run_results.validation_result.results
insideexpectation_config.kwargs.*
.- We can have for
example:
column
,column_A
,column_B
,max_value
,min_value
,value
,value_pairs_set
,value_set
, and others.
- We can have for
example:
- More columns desired? Those can be added, using
result_sink_extra_columns
in which you can select columns like<name>
and/or explode columns like<name>.*
.
- General columns: Those are columns that have the basic information regarding
- Use the parameter
"source"
to identify the data used for an easier analysis. - By default, Great Expectation will also provide a site presenting the history of the DQ validations that you have performed on your data.
- You can make an analysis of all your expectations and create a dashboard aggregating all that information.
- This stage is fully optional, you can omit it from the ACON.
Output Specifications
The output_specs section of an ACON is relatively similar to the input_specs section, but of course focusing on how to write the results of the algorithm, instead of specifying the input for the algorithm, hence the name output_specs (output specifications). More information about OutputSpec.
Relevant notes
- Respect the supported write types and output formats.
- One of the most relevant options to specify in the options parameter is the
checkpoint_location
when in streaming read mode, because that location will be responsible for storing which data you already read and transformed from the source, when the source is a Spark Streaming compatible source (e.g., Kafka or S3 files).
Terminate Specifications
The terminate_specs section of the ACON is responsible for some "wrapping up" activities like optimising a table, vacuuming old files in a delta table, etc. With time the list of available terminators will likely increase (e.g., reconciliation processes), but for now we have the following terminators. This stage is fully optional, you can omit it from the ACON. The most relevant now in the context of the lakehouse initiative are the following:
More information about TerminatorSpec.
Execution Environment
In the exec_env section of the ACON you can pass any Spark Session configuration that you want to define for the execution of your algorithm. This is basically just a JSON structure that takes in any Spark Session property, so no custom lakehouse engine logic. This stage is fully optional, you can omit it from the ACON.
Please be aware that Spark Session configurations that are not allowed to be changed when the Spark cluster is already running need to be passed in the configuration of the job/cluster that runs this algorithm, not here in this section. This section only accepts Spark Session configs that can be changed in runtime. Whenever you introduce an option make sure that it takes effect during runtime, as to the best of our knowledge there's no list of allowed Spark properties to be changed after the cluster is already running. Moreover, typically Spark algorithms fail if you try to modify a config that can only be set up before the cluster is running.