Engine
Contract of the lakehouse engine with all the available functions to be executed.
execute_dq_validation(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute the DQValidator algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_gab(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute the gold asset builder based on a GAB Algorithm Configuration.
GaB is useful to build your gold assets with predefined functions for recurrent periods.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_heartbeat_sensor_data_feed(heartbeat_sensor_data_feed_path, heartbeat_sensor_control_table)
¶
Control table Data feeder.
It reads the CSV file stored at data
folder and
perform UPSERT and DELETE in control table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
heartbeat_sensor_data_feed_path |
str
|
path where CSV file is stored. |
required |
heartbeat_sensor_control_table |
str
|
CONTROL table of Heartbeat sensor. |
required |
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_reconciliation(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute the Reconciliator algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_sensor(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute a sensor based on a Sensor Algorithm Configuration.
A sensor is useful to check if an upstream system has new data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_sensor_heartbeat(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute a sensor based on a Heartbeat Algorithm Configuration.
The heartbeat mechanism monitors whether an upstream system has new data.
The heartbeat job runs continuously within a defined data product or according to a user-defined schedule.
This job operates based on the Control table, where source-related entries can be fed by users using the Heartbeat Data Feeder job.
Each source (such as SAP, delta_table, Kafka, Local Manual Upload, etc.) can have tasks added in parallel within the Heartbeat Job.
Based on source heartbeat ACON and control table entries, Heartbeat will send a final sensor acon to the existing sensor modules, which checks if a new event is available for the control table record.
The sensor then returns the NEW_EVENT_AVAILABLE status to the Heartbeat modules, which update the control table.
Following this, the related Databricks jobs are triggered through the Databricks Job API, ensuring that all dependencies are met.
This process allows the Heartbeat sensor to efficiently manage and centralize the entire workflow with minimal user intervention and enhance sensor features by providing centralization, efficently manage and track using control table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
generate_sensor_query(sensor_id, filter_exp=None, control_db_table_name=None, upstream_key=None, upstream_value=None, upstream_table_name=None)
¶
Generates a preprocess query to be used in a sensor configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sensor_id |
str
|
sensor id. |
required |
filter_exp |
str
|
expression to filter incoming new data. You can use the placeholder ?default_upstream_key and ?default_upstream_value, so that it can be replaced by the respective values in the control_db_table_name for this specific sensor_id. |
None
|
control_db_table_name |
str
|
|
None
|
upstream_key |
str
|
the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream). |
None
|
upstream_value |
str
|
the upstream value to identify new data from the upstream (e.g., the value of a time present in the upstream). |
None
|
upstream_table_name |
str
|
value for custom sensor
to query new data from the upstream
If none we will set the default value,
our |
None
|
Returns:
Type | Description |
---|---|
str
|
The query string. |
Source code in mkdocs/lakehouse_engine/packages/engine.py
generate_sensor_sap_logchain_query(chain_id, dbtable=SAPLogchain.DBTABLE.value, status=SAPLogchain.GREEN_STATUS.value, engine_table_name=SAPLogchain.ENGINE_TABLE.value)
¶
Generates a sensor query based in the SAP Logchain table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
chain_id |
str
|
chain id to query the status on SAP. |
required |
dbtable |
str
|
|
SAPLogchain.DBTABLE.value
|
status |
str
|
|
SAPLogchain.GREEN_STATUS.value
|
engine_table_name |
str
|
table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query. |
SAPLogchain.ENGINE_TABLE.value
|
Returns:
Type | Description |
---|---|
str
|
The query string. |
Source code in mkdocs/lakehouse_engine/packages/engine.py
load_data(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Load data using the DataLoader algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
manage_files(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Manipulate s3 files using File Manager algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
manage_table(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Manipulate tables/views using Table Manager algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
send_notification(args)
¶
Send a notification using a notifier.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
dict
|
arguments for the notifier. |
required |
Source code in mkdocs/lakehouse_engine/packages/engine.py
trigger_heartbeat_sensor_jobs(acon)
¶
Trigger the jobs via Databricks job API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon |
dict
|
Heartbeat ACON containing data product configs and options. |
required |
Source code in mkdocs/lakehouse_engine/packages/engine.py
update_heartbeat_sensor_status(heartbeat_sensor_control_table, sensor_table, job_id)
¶
UPDATE heartbeat sensor status.
Update heartbeat sensor control table with COMPLETE status and job_end_timestamp for the triggered job. Update sensor control table with PROCESSED_NEW_DATA status and status_change_timestamp for the triggered job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
heartbeat_sensor_control_table |
str
|
Heartbeat sensor control table name. |
required |
sensor_table |
str
|
lakehouse engine sensor table name. |
required |
job_id |
str
|
job_id of the running job. It refers to trigger_job_id in Control table. |
required |
Source code in mkdocs/lakehouse_engine/packages/engine.py
update_sensor_status(sensor_id, control_db_table_name, status=SensorStatus.PROCESSED_NEW_DATA.value, assets=None)
¶
Update internal sensor status.
Update the sensor status in the control table,
it should be used to tell the system
that the sensor has processed all new data that was previously identified,
hence updating the shifted sensor status.
Usually used to move from SensorStatus.ACQUIRED_NEW_DATA
to
SensorStatus.PROCESSED_NEW_DATA
,
but there might be scenarios - still to identify -
where we can update the sensor status from/to different statuses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sensor_id |
str
|
sensor id. |
required |
control_db_table_name |
str
|
|
required |
status |
str
|
status of the sensor. |
SensorStatus.PROCESSED_NEW_DATA.value
|
assets |
List[str]
|
a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA. |
None
|