Engine
Contract of the lakehouse engine with all the available functions to be executed.
build_data_docs(store_backend=DQDefaults.STORE_BACKEND.value, local_fs_root_dir=None, data_docs_local_fs=None, data_docs_prefix=DQDefaults.DATA_DOCS_PREFIX.value, bucket=None, data_docs_bucket=None, expectations_store_prefix=DQDefaults.EXPECTATIONS_STORE_PREFIX.value, validations_store_prefix=DQDefaults.VALIDATIONS_STORE_PREFIX.value, checkpoint_store_prefix=DQDefaults.CHECKPOINT_STORE_PREFIX.value)
¶
Build Data Docs for the project.
This function does a full build of data docs based on all the great expectations checkpoints in the specified location, getting all history of run/validations executed and results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store_backend |
str
|
which store_backend to use (e.g. s3 or file_system). |
DQDefaults.STORE_BACKEND.value
|
local_fs_root_dir |
str
|
path of the root directory. Note: only applicable for store_backend file_system |
None
|
data_docs_local_fs |
str
|
path of the root directory. Note: only applicable for store_backend file_system. |
None
|
data_docs_prefix |
str
|
prefix where to store data_docs' data. |
DQDefaults.DATA_DOCS_PREFIX.value
|
bucket |
str
|
the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3. |
None
|
data_docs_bucket |
str
|
the bucket name for data docs only. When defined, it will supersede bucket parameter. Note: only applicable for store_backend s3. |
None
|
expectations_store_prefix |
str
|
prefix where to store expectations' data. Note: only applicable for store_backend s3. |
DQDefaults.EXPECTATIONS_STORE_PREFIX.value
|
validations_store_prefix |
str
|
prefix where to store validations' data. Note: only applicable for store_backend s3. |
DQDefaults.VALIDATIONS_STORE_PREFIX.value
|
checkpoint_store_prefix |
str
|
prefix where to store checkpoints' data. Note: only applicable for store_backend s3. |
DQDefaults.CHECKPOINT_STORE_PREFIX.value
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_dq_validation(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute the DQValidator algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_gab(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute the gold asset builder based on a GAB Algorithm Configuration.
GaB is useful to build your gold assets with predefined functions for recurrent periods.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_reconciliation(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute the Reconciliator algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
execute_sensor(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Execute a sensor based on a Sensor Algorithm Configuration.
A sensor is useful to check if an upstream system has new data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
generate_sensor_query(sensor_id, filter_exp=None, control_db_table_name=None, upstream_key=None, upstream_value=None, upstream_table_name=None)
¶
Generates a preprocess query to be used in a sensor configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sensor_id |
str
|
sensor id. |
required |
filter_exp |
str
|
expression to filter incoming new data. You can use the placeholder ?default_upstream_key and ?default_upstream_value, so that it can be replaced by the respective values in the control_db_table_name for this specific sensor_id. |
None
|
control_db_table_name |
str
|
|
None
|
upstream_key |
str
|
the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream). |
None
|
upstream_value |
str
|
the upstream value to identify new data from the upstream (e.g., the value of a time present in the upstream). |
None
|
upstream_table_name |
str
|
value for custom sensor
to query new data from the upstream
If none we will set the default value,
our |
None
|
Returns:
Type | Description |
---|---|
str
|
The query string. |
Source code in mkdocs/lakehouse_engine/packages/engine.py
generate_sensor_sap_logchain_query(chain_id, dbtable=SAPLogchain.DBTABLE.value, status=SAPLogchain.GREEN_STATUS.value, engine_table_name=SAPLogchain.ENGINE_TABLE.value)
¶
Generates a sensor query based in the SAP Logchain table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
chain_id |
str
|
chain id to query the status on SAP. |
required |
dbtable |
str
|
|
SAPLogchain.DBTABLE.value
|
status |
str
|
|
SAPLogchain.GREEN_STATUS.value
|
engine_table_name |
str
|
table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query. |
SAPLogchain.ENGINE_TABLE.value
|
Returns:
Type | Description |
---|---|
str
|
The query string. |
Source code in mkdocs/lakehouse_engine/packages/engine.py
load_data(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Load data using the DataLoader algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
manage_files(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Manipulate s3 files using File Manager algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
manage_table(acon_path=None, acon=None, collect_engine_usage=CollectEngineUsage.PROD_ONLY.value, spark_confs=None)
¶
Manipulate tables/views using Table Manager algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acon_path |
Optional[str]
|
path of the acon (algorithm configuration) file. |
None
|
acon |
Optional[dict]
|
acon provided directly through python code (e.g., notebooks or other apps). |
None
|
collect_engine_usage |
str
|
Lakehouse usage statistics collection strategy. |
CollectEngineUsage.PROD_ONLY.value
|
spark_confs |
dict
|
optional dictionary with the spark confs to be used when collecting the engine usage. |
None
|
Source code in mkdocs/lakehouse_engine/packages/engine.py
send_notification(args)
¶
Send a notification using a notifier.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
dict
|
arguments for the notifier. |
required |
Source code in mkdocs/lakehouse_engine/packages/engine.py
update_sensor_status(sensor_id, control_db_table_name, status=SensorStatus.PROCESSED_NEW_DATA.value, assets=None)
¶
Update internal sensor status.
Update the sensor status in the control table,
it should be used to tell the system
that the sensor has processed all new data that was previously identified,
hence updating the shifted sensor status.
Usually used to move from SensorStatus.ACQUIRED_NEW_DATA
to
SensorStatus.PROCESSED_NEW_DATA
,
but there might be scenarios - still to identify -
where we can update the sensor status from/to different statuses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sensor_id |
str
|
sensor id. |
required |
control_db_table_name |
str
|
|
required |
status |
str
|
status of the sensor. |
SensorStatus.PROCESSED_NEW_DATA.value
|
assets |
List[str]
|
a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA. |
None
|