lakehouse_engine.engine

Contract of the lakehouse engine with all the available functions to be executed.

  1"""Contract of the lakehouse engine with all the available functions to be executed."""
  2
  3from typing import List, Optional, OrderedDict
  4
  5from lakehouse_engine.algorithms.data_loader import DataLoader
  6from lakehouse_engine.algorithms.gab import GAB
  7from lakehouse_engine.algorithms.reconciliator import Reconciliator
  8from lakehouse_engine.algorithms.sensor import Sensor, SensorStatus
  9from lakehouse_engine.core.definitions import (
 10    CollectEngineUsage,
 11    DQDefaults,
 12    SAPLogchain,
 13    TerminatorSpec,
 14)
 15from lakehouse_engine.core.exec_env import ExecEnv
 16from lakehouse_engine.core.file_manager import FileManagerFactory
 17from lakehouse_engine.core.sensor_manager import SensorUpstreamManager
 18from lakehouse_engine.core.table_manager import TableManager
 19from lakehouse_engine.terminators.notifier_factory import NotifierFactory
 20from lakehouse_engine.terminators.sensor_terminator import SensorTerminator
 21from lakehouse_engine.utils.acon_utils import validate_and_resolve_acon
 22from lakehouse_engine.utils.configs.config_utils import ConfigUtils
 23from lakehouse_engine.utils.engine_usage_stats import EngineUsageStats
 24
 25
 26def load_data(
 27    acon_path: Optional[str] = None,
 28    acon: Optional[dict] = None,
 29    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 30    spark_confs: dict = None,
 31) -> Optional[OrderedDict]:
 32    """Load data using the DataLoader algorithm.
 33
 34    Args:
 35        acon_path: path of the acon (algorithm configuration) file.
 36        acon: acon provided directly through python code (e.g., notebooks or other
 37            apps).
 38        collect_engine_usage: Lakehouse usage statistics collection strategy.
 39        spark_confs: optional dictionary with the spark confs to be used when collecting
 40            the engine usage.
 41    """
 42    try:
 43        acon = ConfigUtils.get_acon(acon_path, acon)
 44        ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None))
 45        acon = validate_and_resolve_acon(acon, "in_motion")
 46    finally:
 47        EngineUsageStats.store_engine_usage(
 48            acon, load_data.__name__, collect_engine_usage, spark_confs
 49        )
 50    return DataLoader(acon).execute()
 51
 52
 53def execute_reconciliation(
 54    acon_path: Optional[str] = None,
 55    acon: Optional[dict] = None,
 56    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 57    spark_confs: dict = None,
 58) -> None:
 59    """Execute the Reconciliator algorithm.
 60
 61    Args:
 62        acon_path: path of the acon (algorithm configuration) file.
 63        acon: acon provided directly through python code (e.g., notebooks or other
 64            apps).
 65        collect_engine_usage: Lakehouse usage statistics collection strategy.
 66        spark_confs: optional dictionary with the spark confs to be used when collecting
 67            the engine usage.
 68    """
 69    try:
 70        acon = ConfigUtils.get_acon(acon_path, acon)
 71        ExecEnv.get_or_create(
 72            app_name="reconciliator", config=acon.get("exec_env", None)
 73        )
 74        acon = validate_and_resolve_acon(acon)
 75    finally:
 76        EngineUsageStats.store_engine_usage(
 77            acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs
 78        )
 79    Reconciliator(acon).execute()
 80
 81
 82def execute_dq_validation(
 83    acon_path: Optional[str] = None,
 84    acon: Optional[dict] = None,
 85    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 86    spark_confs: dict = None,
 87) -> None:
 88    """Execute the DQValidator algorithm.
 89
 90    Args:
 91        acon_path: path of the acon (algorithm configuration) file.
 92        acon: acon provided directly through python code (e.g., notebooks or other
 93            apps).
 94        collect_engine_usage: Lakehouse usage statistics collection strategy.
 95        spark_confs: optional dictionary with the spark confs to be used when collecting
 96            the engine usage.
 97    """
 98    from lakehouse_engine.algorithms.dq_validator import DQValidator
 99
100    try:
101        acon = ConfigUtils.get_acon(acon_path, acon)
102        ExecEnv.get_or_create(
103            app_name="dq_validator", config=acon.get("exec_env", None)
104        )
105        acon = validate_and_resolve_acon(acon, "at_rest")
106    finally:
107        EngineUsageStats.store_engine_usage(
108            acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs
109        )
110    DQValidator(acon).execute()
111
112
113def manage_table(
114    acon_path: Optional[str] = None,
115    acon: Optional[dict] = None,
116    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
117    spark_confs: dict = None,
118) -> None:
119    """Manipulate tables/views using Table Manager algorithm.
120
121    Args:
122        acon_path: path of the acon (algorithm configuration) file.
123        acon: acon provided directly through python code (e.g., notebooks
124            or other apps).
125        collect_engine_usage: Lakehouse usage statistics collection strategy.
126        spark_confs: optional dictionary with the spark confs to be used when collecting
127            the engine usage.
128    """
129    acon = ConfigUtils.get_acon(acon_path, acon)
130    ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None))
131    EngineUsageStats.store_engine_usage(
132        acon, manage_table.__name__, collect_engine_usage, spark_confs
133    )
134    TableManager(acon).get_function()
135
136
137def manage_files(
138    acon_path: Optional[str] = None,
139    acon: Optional[dict] = None,
140    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
141    spark_confs: dict = None,
142) -> None:
143    """Manipulate s3 files using File Manager algorithm.
144
145    Args:
146        acon_path: path of the acon (algorithm configuration) file.
147        acon: acon provided directly through python code (e.g., notebooks
148            or other apps).
149        collect_engine_usage: Lakehouse usage statistics collection strategy.
150        spark_confs: optional dictionary with the spark confs to be used when collecting
151            the engine usage.
152    """
153    acon = ConfigUtils.get_acon(acon_path, acon)
154    ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None))
155    EngineUsageStats.store_engine_usage(
156        acon, manage_files.__name__, collect_engine_usage, spark_confs
157    )
158    FileManagerFactory.execute_function(configs=acon)
159
160
161def execute_sensor(
162    acon_path: Optional[str] = None,
163    acon: Optional[dict] = None,
164    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
165    spark_confs: dict = None,
166) -> bool:
167    """Execute a sensor based on a Sensor Algorithm Configuration.
168
169    A sensor is useful to check if an upstream system has new data.
170
171    Args:
172        acon_path: path of the acon (algorithm configuration) file.
173        acon: acon provided directly through python code (e.g., notebooks
174            or other apps).
175        collect_engine_usage: Lakehouse usage statistics collection strategy.
176        spark_confs: optional dictionary with the spark confs to be used when collecting
177            the engine usage.
178    """
179    acon = ConfigUtils.get_acon(acon_path, acon)
180    ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None))
181    EngineUsageStats.store_engine_usage(
182        acon, execute_sensor.__name__, collect_engine_usage, spark_confs
183    )
184    return Sensor(acon).execute()
185
186
187def update_sensor_status(
188    sensor_id: str,
189    control_db_table_name: str,
190    status: str = SensorStatus.PROCESSED_NEW_DATA.value,
191    assets: List[str] = None,
192) -> None:
193    """Update internal sensor status.
194
195    Update the sensor status in the control table,
196    it should be used to tell the system
197    that the sensor has processed all new data that was previously identified,
198    hence updating the shifted sensor status.
199    Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to
200    `SensorStatus.PROCESSED_NEW_DATA`,
201    but there might be scenarios - still to identify -
202    where we can update the sensor status from/to different statuses.
203
204    Args:
205        sensor_id: sensor id.
206        control_db_table_name: `db.table` to store sensor checkpoints.
207        status: status of the sensor.
208        assets: a list of assets that are considered as available to
209            consume downstream after this sensor has status
210            PROCESSED_NEW_DATA.
211    """
212    ExecEnv.get_or_create(app_name="update_sensor_status")
213    SensorTerminator.update_sensor_status(
214        sensor_id=sensor_id,
215        control_db_table_name=control_db_table_name,
216        status=status,
217        assets=assets,
218    )
219
220
221def generate_sensor_query(
222    sensor_id: str,
223    filter_exp: str = None,
224    control_db_table_name: str = None,
225    upstream_key: str = None,
226    upstream_value: str = None,
227    upstream_table_name: str = None,
228) -> str:
229    """Generates a preprocess query to be used in a sensor configuration.
230
231    Args:
232        sensor_id: sensor id.
233        filter_exp: expression to filter incoming new data.
234            You can use the placeholder ?default_upstream_key and
235            ?default_upstream_value, so that it can be replaced by the
236            respective values in the control_db_table_name for this specific
237            sensor_id.
238        control_db_table_name: `db.table` to retrieve the last status change
239            timestamp. This is only relevant for the jdbc sensor.
240        upstream_key: the key of custom sensor information to control how to
241            identify new data from the upstream (e.g., a time column in the
242            upstream).
243        upstream_value: the upstream value
244            to identify new data from the upstream (e.g., the value of a time
245            present in the upstream).
246        upstream_table_name: value for custom sensor
247            to query new data from the upstream
248            If none we will set the default value,
249            our `sensor_new_data` view.
250
251    Return:
252        The query string.
253    """
254    ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query")
255    if filter_exp:
256        return SensorUpstreamManager.generate_filter_exp_query(
257            sensor_id=sensor_id,
258            filter_exp=filter_exp,
259            control_db_table_name=control_db_table_name,
260            upstream_key=upstream_key,
261            upstream_value=upstream_value,
262            upstream_table_name=upstream_table_name,
263        )
264    else:
265        return SensorUpstreamManager.generate_sensor_table_preprocess_query(
266            sensor_id=sensor_id
267        )
268
269
270def generate_sensor_sap_logchain_query(
271    chain_id: str,
272    dbtable: str = SAPLogchain.DBTABLE.value,
273    status: str = SAPLogchain.GREEN_STATUS.value,
274    engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
275) -> str:
276    """Generates a sensor query based in the SAP Logchain table.
277
278    Args:
279        chain_id: chain id to query the status on SAP.
280        dbtable: `db.table` to retrieve the data to
281            check if the sap chain is already finished.
282        status: `db.table` to retrieve the last status change
283            timestamp.
284        engine_table_name: table name exposed with the SAP LOGCHAIN data.
285            This table will be used in the jdbc query.
286
287    Return:
288        The query string.
289    """
290    ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query")
291    return SensorUpstreamManager.generate_sensor_sap_logchain_query(
292        chain_id=chain_id,
293        dbtable=dbtable,
294        status=status,
295        engine_table_name=engine_table_name,
296    )
297
298
299def send_notification(args: dict) -> None:
300    """Send a notification using a notifier.
301
302    Args:
303        args: arguments for the notifier.
304    """
305    notifier = NotifierFactory.get_notifier(
306        spec=TerminatorSpec(function="notify", args=args)
307    )
308
309    notifier.create_notification()
310    notifier.send_notification()
311
312
313def build_data_docs(
314    store_backend: str = DQDefaults.STORE_BACKEND.value,
315    local_fs_root_dir: str = None,
316    data_docs_local_fs: str = None,
317    data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value,
318    bucket: str = None,
319    data_docs_bucket: str = None,
320    expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
321    validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value,
322    checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value,
323) -> None:
324    """Build Data Docs for the project.
325
326    This function does a full build of data docs based on all the great expectations
327    checkpoints in the specified location, getting all history of run/validations
328    executed and results.
329
330    Args:
331        store_backend: which store_backend to use (e.g. s3 or file_system).
332        local_fs_root_dir: path of the root directory. Note: only applicable
333            for store_backend file_system
334        data_docs_local_fs: path of the root directory. Note: only applicable
335            for store_backend file_system.
336        data_docs_prefix: prefix where to store data_docs' data.
337        bucket: the bucket name to consider for the store_backend
338            (store DQ artefacts). Note: only applicable for store_backend s3.
339        data_docs_bucket: the bucket name for data docs only. When defined,
340            it will supersede bucket parameter.
341            Note: only applicable for store_backend s3.
342        expectations_store_prefix: prefix where to store expectations' data.
343            Note: only applicable for store_backend s3.
344        validations_store_prefix: prefix where to store validations' data.
345            Note: only applicable for store_backend s3.
346        checkpoint_store_prefix: prefix where to store checkpoints' data.
347            Note: only applicable for store_backend s3.
348    """
349    from lakehouse_engine.dq_processors.dq_factory import DQFactory
350
351    DQFactory.build_data_docs(
352        store_backend=store_backend,
353        local_fs_root_dir=local_fs_root_dir,
354        data_docs_local_fs=data_docs_local_fs,
355        data_docs_prefix=data_docs_prefix,
356        bucket=bucket,
357        data_docs_bucket=data_docs_bucket,
358        expectations_store_prefix=expectations_store_prefix,
359        validations_store_prefix=validations_store_prefix,
360        checkpoint_store_prefix=checkpoint_store_prefix,
361    )
362
363
364def execute_gab(
365    acon_path: Optional[str] = None,
366    acon: Optional[dict] = None,
367    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
368    spark_confs: dict = None,
369) -> None:
370    """Execute the gold asset builder based on a GAB Algorithm Configuration.
371
372    GaB is useful to build your gold assets with predefined functions for recurrent
373    periods.
374
375    Args:
376        acon_path: path of the acon (algorithm configuration) file.
377        acon: acon provided directly through python code (e.g., notebooks
378            or other apps).
379        collect_engine_usage: Lakehouse usage statistics collection strategy.
380        spark_confs: optional dictionary with the spark confs to be used when collecting
381            the engine usage.
382    """
383    acon = ConfigUtils.get_acon(acon_path, acon)
384    ExecEnv.get_or_create(app_name="execute_gab", config=acon.get("exec_env", None))
385    EngineUsageStats.store_engine_usage(
386        acon, execute_gab.__name__, collect_engine_usage, spark_confs
387    )
388    GAB(acon).execute()
def load_data( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> Optional[OrderedDict]:
27def load_data(
28    acon_path: Optional[str] = None,
29    acon: Optional[dict] = None,
30    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
31    spark_confs: dict = None,
32) -> Optional[OrderedDict]:
33    """Load data using the DataLoader algorithm.
34
35    Args:
36        acon_path: path of the acon (algorithm configuration) file.
37        acon: acon provided directly through python code (e.g., notebooks or other
38            apps).
39        collect_engine_usage: Lakehouse usage statistics collection strategy.
40        spark_confs: optional dictionary with the spark confs to be used when collecting
41            the engine usage.
42    """
43    try:
44        acon = ConfigUtils.get_acon(acon_path, acon)
45        ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None))
46        acon = validate_and_resolve_acon(acon, "in_motion")
47    finally:
48        EngineUsageStats.store_engine_usage(
49            acon, load_data.__name__, collect_engine_usage, spark_confs
50        )
51    return DataLoader(acon).execute()

Load data using the DataLoader algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def execute_reconciliation( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
54def execute_reconciliation(
55    acon_path: Optional[str] = None,
56    acon: Optional[dict] = None,
57    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
58    spark_confs: dict = None,
59) -> None:
60    """Execute the Reconciliator algorithm.
61
62    Args:
63        acon_path: path of the acon (algorithm configuration) file.
64        acon: acon provided directly through python code (e.g., notebooks or other
65            apps).
66        collect_engine_usage: Lakehouse usage statistics collection strategy.
67        spark_confs: optional dictionary with the spark confs to be used when collecting
68            the engine usage.
69    """
70    try:
71        acon = ConfigUtils.get_acon(acon_path, acon)
72        ExecEnv.get_or_create(
73            app_name="reconciliator", config=acon.get("exec_env", None)
74        )
75        acon = validate_and_resolve_acon(acon)
76    finally:
77        EngineUsageStats.store_engine_usage(
78            acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs
79        )
80    Reconciliator(acon).execute()

Execute the Reconciliator algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def execute_dq_validation( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
 83def execute_dq_validation(
 84    acon_path: Optional[str] = None,
 85    acon: Optional[dict] = None,
 86    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 87    spark_confs: dict = None,
 88) -> None:
 89    """Execute the DQValidator algorithm.
 90
 91    Args:
 92        acon_path: path of the acon (algorithm configuration) file.
 93        acon: acon provided directly through python code (e.g., notebooks or other
 94            apps).
 95        collect_engine_usage: Lakehouse usage statistics collection strategy.
 96        spark_confs: optional dictionary with the spark confs to be used when collecting
 97            the engine usage.
 98    """
 99    from lakehouse_engine.algorithms.dq_validator import DQValidator
100
101    try:
102        acon = ConfigUtils.get_acon(acon_path, acon)
103        ExecEnv.get_or_create(
104            app_name="dq_validator", config=acon.get("exec_env", None)
105        )
106        acon = validate_and_resolve_acon(acon, "at_rest")
107    finally:
108        EngineUsageStats.store_engine_usage(
109            acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs
110        )
111    DQValidator(acon).execute()

Execute the DQValidator algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def manage_table( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
114def manage_table(
115    acon_path: Optional[str] = None,
116    acon: Optional[dict] = None,
117    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
118    spark_confs: dict = None,
119) -> None:
120    """Manipulate tables/views using Table Manager algorithm.
121
122    Args:
123        acon_path: path of the acon (algorithm configuration) file.
124        acon: acon provided directly through python code (e.g., notebooks
125            or other apps).
126        collect_engine_usage: Lakehouse usage statistics collection strategy.
127        spark_confs: optional dictionary with the spark confs to be used when collecting
128            the engine usage.
129    """
130    acon = ConfigUtils.get_acon(acon_path, acon)
131    ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None))
132    EngineUsageStats.store_engine_usage(
133        acon, manage_table.__name__, collect_engine_usage, spark_confs
134    )
135    TableManager(acon).get_function()

Manipulate tables/views using Table Manager algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def manage_files( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
138def manage_files(
139    acon_path: Optional[str] = None,
140    acon: Optional[dict] = None,
141    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
142    spark_confs: dict = None,
143) -> None:
144    """Manipulate s3 files using File Manager algorithm.
145
146    Args:
147        acon_path: path of the acon (algorithm configuration) file.
148        acon: acon provided directly through python code (e.g., notebooks
149            or other apps).
150        collect_engine_usage: Lakehouse usage statistics collection strategy.
151        spark_confs: optional dictionary with the spark confs to be used when collecting
152            the engine usage.
153    """
154    acon = ConfigUtils.get_acon(acon_path, acon)
155    ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None))
156    EngineUsageStats.store_engine_usage(
157        acon, manage_files.__name__, collect_engine_usage, spark_confs
158    )
159    FileManagerFactory.execute_function(configs=acon)

Manipulate s3 files using File Manager algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def execute_sensor( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> bool:
162def execute_sensor(
163    acon_path: Optional[str] = None,
164    acon: Optional[dict] = None,
165    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
166    spark_confs: dict = None,
167) -> bool:
168    """Execute a sensor based on a Sensor Algorithm Configuration.
169
170    A sensor is useful to check if an upstream system has new data.
171
172    Args:
173        acon_path: path of the acon (algorithm configuration) file.
174        acon: acon provided directly through python code (e.g., notebooks
175            or other apps).
176        collect_engine_usage: Lakehouse usage statistics collection strategy.
177        spark_confs: optional dictionary with the spark confs to be used when collecting
178            the engine usage.
179    """
180    acon = ConfigUtils.get_acon(acon_path, acon)
181    ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None))
182    EngineUsageStats.store_engine_usage(
183        acon, execute_sensor.__name__, collect_engine_usage, spark_confs
184    )
185    return Sensor(acon).execute()

Execute a sensor based on a Sensor Algorithm Configuration.

A sensor is useful to check if an upstream system has new data.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def update_sensor_status( sensor_id: str, control_db_table_name: str, status: str = 'PROCESSED_NEW_DATA', assets: List[str] = None) -> None:
188def update_sensor_status(
189    sensor_id: str,
190    control_db_table_name: str,
191    status: str = SensorStatus.PROCESSED_NEW_DATA.value,
192    assets: List[str] = None,
193) -> None:
194    """Update internal sensor status.
195
196    Update the sensor status in the control table,
197    it should be used to tell the system
198    that the sensor has processed all new data that was previously identified,
199    hence updating the shifted sensor status.
200    Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to
201    `SensorStatus.PROCESSED_NEW_DATA`,
202    but there might be scenarios - still to identify -
203    where we can update the sensor status from/to different statuses.
204
205    Args:
206        sensor_id: sensor id.
207        control_db_table_name: `db.table` to store sensor checkpoints.
208        status: status of the sensor.
209        assets: a list of assets that are considered as available to
210            consume downstream after this sensor has status
211            PROCESSED_NEW_DATA.
212    """
213    ExecEnv.get_or_create(app_name="update_sensor_status")
214    SensorTerminator.update_sensor_status(
215        sensor_id=sensor_id,
216        control_db_table_name=control_db_table_name,
217        status=status,
218        assets=assets,
219    )

Update internal sensor status.

Update the sensor status in the control table, it should be used to tell the system that the sensor has processed all new data that was previously identified, hence updating the shifted sensor status. Usually used to move from SensorStatus.ACQUIRED_NEW_DATA to SensorStatus.PROCESSED_NEW_DATA, but there might be scenarios - still to identify - where we can update the sensor status from/to different statuses.

Arguments:
  • sensor_id: sensor id.
  • control_db_table_name: db.table to store sensor checkpoints.
  • status: status of the sensor.
  • assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.
def generate_sensor_query( sensor_id: str, filter_exp: str = None, control_db_table_name: str = None, upstream_key: str = None, upstream_value: str = None, upstream_table_name: str = None) -> str:
222def generate_sensor_query(
223    sensor_id: str,
224    filter_exp: str = None,
225    control_db_table_name: str = None,
226    upstream_key: str = None,
227    upstream_value: str = None,
228    upstream_table_name: str = None,
229) -> str:
230    """Generates a preprocess query to be used in a sensor configuration.
231
232    Args:
233        sensor_id: sensor id.
234        filter_exp: expression to filter incoming new data.
235            You can use the placeholder ?default_upstream_key and
236            ?default_upstream_value, so that it can be replaced by the
237            respective values in the control_db_table_name for this specific
238            sensor_id.
239        control_db_table_name: `db.table` to retrieve the last status change
240            timestamp. This is only relevant for the jdbc sensor.
241        upstream_key: the key of custom sensor information to control how to
242            identify new data from the upstream (e.g., a time column in the
243            upstream).
244        upstream_value: the upstream value
245            to identify new data from the upstream (e.g., the value of a time
246            present in the upstream).
247        upstream_table_name: value for custom sensor
248            to query new data from the upstream
249            If none we will set the default value,
250            our `sensor_new_data` view.
251
252    Return:
253        The query string.
254    """
255    ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query")
256    if filter_exp:
257        return SensorUpstreamManager.generate_filter_exp_query(
258            sensor_id=sensor_id,
259            filter_exp=filter_exp,
260            control_db_table_name=control_db_table_name,
261            upstream_key=upstream_key,
262            upstream_value=upstream_value,
263            upstream_table_name=upstream_table_name,
264        )
265    else:
266        return SensorUpstreamManager.generate_sensor_table_preprocess_query(
267            sensor_id=sensor_id
268        )

Generates a preprocess query to be used in a sensor configuration.

Arguments:
  • sensor_id: sensor id.
  • filter_exp: expression to filter incoming new data. You can use the placeholder ?default_upstream_key and ?default_upstream_value, so that it can be replaced by the respective values in the control_db_table_name for this specific sensor_id.
  • control_db_table_name: db.table to retrieve the last status change timestamp. This is only relevant for the jdbc sensor.
  • upstream_key: the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream).
  • upstream_value: the upstream value to identify new data from the upstream (e.g., the value of a time present in the upstream).
  • upstream_table_name: value for custom sensor to query new data from the upstream If none we will set the default value, our sensor_new_data view.
Return:

The query string.

def generate_sensor_sap_logchain_query( chain_id: str, dbtable: str = 'SAPPHA.RSPCLOGCHAIN', status: str = 'G', engine_table_name: str = 'sensor_new_data') -> str:
271def generate_sensor_sap_logchain_query(
272    chain_id: str,
273    dbtable: str = SAPLogchain.DBTABLE.value,
274    status: str = SAPLogchain.GREEN_STATUS.value,
275    engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
276) -> str:
277    """Generates a sensor query based in the SAP Logchain table.
278
279    Args:
280        chain_id: chain id to query the status on SAP.
281        dbtable: `db.table` to retrieve the data to
282            check if the sap chain is already finished.
283        status: `db.table` to retrieve the last status change
284            timestamp.
285        engine_table_name: table name exposed with the SAP LOGCHAIN data.
286            This table will be used in the jdbc query.
287
288    Return:
289        The query string.
290    """
291    ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query")
292    return SensorUpstreamManager.generate_sensor_sap_logchain_query(
293        chain_id=chain_id,
294        dbtable=dbtable,
295        status=status,
296        engine_table_name=engine_table_name,
297    )

Generates a sensor query based in the SAP Logchain table.

Arguments:
  • chain_id: chain id to query the status on SAP.
  • dbtable: db.table to retrieve the data to check if the sap chain is already finished.
  • status: db.table to retrieve the last status change timestamp.
  • engine_table_name: table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query.
Return:

The query string.

def send_notification(args: dict) -> None:
300def send_notification(args: dict) -> None:
301    """Send a notification using a notifier.
302
303    Args:
304        args: arguments for the notifier.
305    """
306    notifier = NotifierFactory.get_notifier(
307        spec=TerminatorSpec(function="notify", args=args)
308    )
309
310    notifier.create_notification()
311    notifier.send_notification()

Send a notification using a notifier.

Arguments:
  • args: arguments for the notifier.
def build_data_docs( store_backend: str = 's3', local_fs_root_dir: str = None, data_docs_local_fs: str = None, data_docs_prefix: str = 'dq/data_docs/site/', bucket: str = None, data_docs_bucket: str = None, expectations_store_prefix: str = 'dq/expectations/', validations_store_prefix: str = 'dq/validations/', checkpoint_store_prefix: str = 'dq/checkpoints/') -> None:
314def build_data_docs(
315    store_backend: str = DQDefaults.STORE_BACKEND.value,
316    local_fs_root_dir: str = None,
317    data_docs_local_fs: str = None,
318    data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value,
319    bucket: str = None,
320    data_docs_bucket: str = None,
321    expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
322    validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value,
323    checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value,
324) -> None:
325    """Build Data Docs for the project.
326
327    This function does a full build of data docs based on all the great expectations
328    checkpoints in the specified location, getting all history of run/validations
329    executed and results.
330
331    Args:
332        store_backend: which store_backend to use (e.g. s3 or file_system).
333        local_fs_root_dir: path of the root directory. Note: only applicable
334            for store_backend file_system
335        data_docs_local_fs: path of the root directory. Note: only applicable
336            for store_backend file_system.
337        data_docs_prefix: prefix where to store data_docs' data.
338        bucket: the bucket name to consider for the store_backend
339            (store DQ artefacts). Note: only applicable for store_backend s3.
340        data_docs_bucket: the bucket name for data docs only. When defined,
341            it will supersede bucket parameter.
342            Note: only applicable for store_backend s3.
343        expectations_store_prefix: prefix where to store expectations' data.
344            Note: only applicable for store_backend s3.
345        validations_store_prefix: prefix where to store validations' data.
346            Note: only applicable for store_backend s3.
347        checkpoint_store_prefix: prefix where to store checkpoints' data.
348            Note: only applicable for store_backend s3.
349    """
350    from lakehouse_engine.dq_processors.dq_factory import DQFactory
351
352    DQFactory.build_data_docs(
353        store_backend=store_backend,
354        local_fs_root_dir=local_fs_root_dir,
355        data_docs_local_fs=data_docs_local_fs,
356        data_docs_prefix=data_docs_prefix,
357        bucket=bucket,
358        data_docs_bucket=data_docs_bucket,
359        expectations_store_prefix=expectations_store_prefix,
360        validations_store_prefix=validations_store_prefix,
361        checkpoint_store_prefix=checkpoint_store_prefix,
362    )

Build Data Docs for the project.

This function does a full build of data docs based on all the great expectations checkpoints in the specified location, getting all history of run/validations executed and results.

Arguments:
  • store_backend: which store_backend to use (e.g. s3 or file_system).
  • local_fs_root_dir: path of the root directory. Note: only applicable for store_backend file_system
  • data_docs_local_fs: path of the root directory. Note: only applicable for store_backend file_system.
  • data_docs_prefix: prefix where to store data_docs' data.
  • bucket: the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3.
  • data_docs_bucket: the bucket name for data docs only. When defined, it will supersede bucket parameter. Note: only applicable for store_backend s3.
  • expectations_store_prefix: prefix where to store expectations' data. Note: only applicable for store_backend s3.
  • validations_store_prefix: prefix where to store validations' data. Note: only applicable for store_backend s3.
  • checkpoint_store_prefix: prefix where to store checkpoints' data. Note: only applicable for store_backend s3.
def execute_gab( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
365def execute_gab(
366    acon_path: Optional[str] = None,
367    acon: Optional[dict] = None,
368    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
369    spark_confs: dict = None,
370) -> None:
371    """Execute the gold asset builder based on a GAB Algorithm Configuration.
372
373    GaB is useful to build your gold assets with predefined functions for recurrent
374    periods.
375
376    Args:
377        acon_path: path of the acon (algorithm configuration) file.
378        acon: acon provided directly through python code (e.g., notebooks
379            or other apps).
380        collect_engine_usage: Lakehouse usage statistics collection strategy.
381        spark_confs: optional dictionary with the spark confs to be used when collecting
382            the engine usage.
383    """
384    acon = ConfigUtils.get_acon(acon_path, acon)
385    ExecEnv.get_or_create(app_name="execute_gab", config=acon.get("exec_env", None))
386    EngineUsageStats.store_engine_usage(
387        acon, execute_gab.__name__, collect_engine_usage, spark_confs
388    )
389    GAB(acon).execute()

Execute the gold asset builder based on a GAB Algorithm Configuration.

GaB is useful to build your gold assets with predefined functions for recurrent periods.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.