lakehouse_engine.engine

Contract of the lakehouse engine with all the available functions to be executed.

  1"""Contract of the lakehouse engine with all the available functions to be executed."""
  2from typing import List, Optional, OrderedDict
  3
  4from lakehouse_engine.algorithms.data_loader import DataLoader
  5from lakehouse_engine.algorithms.dq_validator import DQValidator
  6from lakehouse_engine.algorithms.reconciliator import Reconciliator
  7from lakehouse_engine.algorithms.sensor import Sensor, SensorStatus
  8from lakehouse_engine.core.definitions import (
  9    CollectEngineUsage,
 10    SAPLogchain,
 11    TerminatorSpec,
 12)
 13from lakehouse_engine.core.exec_env import ExecEnv
 14from lakehouse_engine.core.file_manager import FileManagerFactory
 15from lakehouse_engine.core.sensor_manager import SensorUpstreamManager
 16from lakehouse_engine.core.table_manager import TableManager
 17from lakehouse_engine.terminators.notifier_factory import NotifierFactory
 18from lakehouse_engine.terminators.sensor_terminator import SensorTerminator
 19from lakehouse_engine.utils.configs.config_utils import ConfigUtils
 20from lakehouse_engine.utils.engine_usage_stats import EngineUsageStats
 21
 22
 23def load_data(
 24    acon_path: Optional[str] = None,
 25    acon: Optional[dict] = None,
 26    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 27    spark_confs: dict = None,
 28) -> Optional[OrderedDict]:
 29    """Load data using the DataLoader algorithm.
 30
 31    Args:
 32        acon_path: path of the acon (algorithm configuration) file.
 33        acon: acon provided directly through python code (e.g., notebooks or other
 34            apps).
 35        collect_engine_usage: Lakehouse usage statistics collection strategy.
 36        spark_confs: optional dictionary with the spark confs to be used when collecting
 37            the engine usage.
 38    """
 39    acon = ConfigUtils.get_acon(acon_path, acon)
 40    ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None))
 41    EngineUsageStats.store_engine_usage(
 42        acon, load_data.__name__, collect_engine_usage, spark_confs
 43    )
 44    return DataLoader(acon).execute()
 45
 46
 47def execute_reconciliation(
 48    acon_path: Optional[str] = None,
 49    acon: Optional[dict] = None,
 50    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 51    spark_confs: dict = None,
 52) -> None:
 53    """Execute the Reconciliator algorithm.
 54
 55    Args:
 56        acon_path: path of the acon (algorithm configuration) file.
 57        acon: acon provided directly through python code (e.g., notebooks or other
 58            apps).
 59        collect_engine_usage: Lakehouse usage statistics collection strategy.
 60        spark_confs: optional dictionary with the spark confs to be used when collecting
 61            the engine usage.
 62    """
 63    acon = ConfigUtils.get_acon(acon_path, acon)
 64    ExecEnv.get_or_create(app_name="reconciliator", config=acon.get("exec_env", None))
 65    EngineUsageStats.store_engine_usage(
 66        acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs
 67    )
 68    Reconciliator(acon).execute()
 69
 70
 71def execute_dq_validation(
 72    acon_path: Optional[str] = None,
 73    acon: Optional[dict] = None,
 74    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 75    spark_confs: dict = None,
 76) -> None:
 77    """Execute the DQValidator algorithm.
 78
 79    Args:
 80        acon_path: path of the acon (algorithm configuration) file.
 81        acon: acon provided directly through python code (e.g., notebooks or other
 82            apps).
 83        collect_engine_usage: Lakehouse usage statistics collection strategy.
 84        spark_confs: optional dictionary with the spark confs to be used when collecting
 85            the engine usage.
 86    """
 87    acon = ConfigUtils.get_acon(acon_path, acon)
 88    ExecEnv.get_or_create(app_name="dq_validator", config=acon.get("exec_env", None))
 89    EngineUsageStats.store_engine_usage(
 90        acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs
 91    )
 92    DQValidator(acon).execute()
 93
 94
 95def manage_table(
 96    acon_path: Optional[str] = None,
 97    acon: Optional[dict] = None,
 98    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
 99    spark_confs: dict = None,
100) -> None:
101    """Manipulate tables/views using Table Manager algorithm.
102
103    Args:
104        acon_path: path of the acon (algorithm configuration) file.
105        acon: acon provided directly through python code (e.g., notebooks
106            or other apps).
107        collect_engine_usage: Lakehouse usage statistics collection strategy.
108        spark_confs: optional dictionary with the spark confs to be used when collecting
109            the engine usage.
110    """
111    acon = ConfigUtils.get_acon(acon_path, acon)
112    ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None))
113    EngineUsageStats.store_engine_usage(
114        acon, manage_table.__name__, collect_engine_usage, spark_confs
115    )
116    TableManager(acon).get_function()
117
118
119def manage_files(
120    acon_path: Optional[str] = None,
121    acon: Optional[dict] = None,
122    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
123    spark_confs: dict = None,
124) -> None:
125    """Manipulate s3 files using File Manager algorithm.
126
127    Args:
128        acon_path: path of the acon (algorithm configuration) file.
129        acon: acon provided directly through python code (e.g., notebooks
130            or other apps).
131        collect_engine_usage: Lakehouse usage statistics collection strategy.
132        spark_confs: optional dictionary with the spark confs to be used when collecting
133            the engine usage.
134    """
135    acon = ConfigUtils.get_acon(acon_path, acon)
136    ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None))
137    EngineUsageStats.store_engine_usage(
138        acon, manage_files.__name__, collect_engine_usage, spark_confs
139    )
140    FileManagerFactory.execute_function(configs=acon)
141
142
143def execute_sensor(
144    acon_path: Optional[str] = None,
145    acon: Optional[dict] = None,
146    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
147    spark_confs: dict = None,
148) -> bool:
149    """Execute a sensor based on a Sensor Algorithm Configuration.
150
151    A sensor is useful to check if an upstream system has new data.
152
153    Args:
154        acon_path: path of the acon (algorithm configuration) file.
155        acon: acon provided directly through python code (e.g., notebooks
156            or other apps).
157        collect_engine_usage: Lakehouse usage statistics collection strategy.
158        spark_confs: optional dictionary with the spark confs to be used when collecting
159            the engine usage.
160    """
161    acon = ConfigUtils.get_acon(acon_path, acon)
162    ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None))
163    EngineUsageStats.store_engine_usage(
164        acon, execute_sensor.__name__, collect_engine_usage, spark_confs
165    )
166    return Sensor(acon).execute()
167
168
169def update_sensor_status(
170    sensor_id: str,
171    control_db_table_name: str,
172    status: str = SensorStatus.PROCESSED_NEW_DATA.value,
173    assets: List[str] = None,
174) -> None:
175    """Update internal sensor status.
176
177    Update the sensor status in the control table,
178    it should be used to tell the system
179    that the sensor has processed all new data that was previously identified,
180    hence updating the shifted sensor status.
181    Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to
182    `SensorStatus.PROCESSED_NEW_DATA`,
183    but there might be scenarios - still to identify -
184    where we can update the sensor status from/to different statuses.
185
186    Args:
187        sensor_id: sensor id.
188        control_db_table_name: `db.table` to store sensor checkpoints.
189        status: status of the sensor.
190        assets: a list of assets that are considered as available to
191            consume downstream after this sensor has status
192            PROCESSED_NEW_DATA.
193    """
194    ExecEnv.get_or_create(app_name="update_sensor_status")
195    SensorTerminator.update_sensor_status(
196        sensor_id=sensor_id,
197        control_db_table_name=control_db_table_name,
198        status=status,
199        assets=assets,
200    )
201
202
203def generate_sensor_query(
204    sensor_id: str,
205    filter_exp: str = None,
206    control_db_table_name: str = None,
207    upstream_key: str = None,
208    upstream_value: str = None,
209    upstream_table_name: str = None,
210) -> str:
211    """Generates a preprocess query to be used in a sensor configuration.
212
213    Args:
214        sensor_id: sensor id.
215        filter_exp: expression to filter incoming new data.
216            You can use the placeholder ?default_upstream_key and
217            ?default_upstream_value, so that it can be replaced by the
218            respective values in the control_db_table_name for this specific
219            sensor_id.
220        control_db_table_name: `db.table` to retrieve the last status change
221            timestamp. This is only relevant for the jdbc sensor.
222        upstream_key: the key of custom sensor information to control how to
223            identify new data from the upstream (e.g., a time column in the
224            upstream).
225        upstream_value: the upstream value
226            to identify new data from the upstream (e.g., the value of a time
227            present in the upstream).
228        upstream_table_name: value for custom sensor
229            to query new data from the upstream
230            If none we will set the default value,
231            our `sensor_new_data` view.
232
233    Return:
234        The query string.
235    """
236    ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query")
237    if filter_exp:
238        return SensorUpstreamManager.generate_filter_exp_query(
239            sensor_id=sensor_id,
240            filter_exp=filter_exp,
241            control_db_table_name=control_db_table_name,
242            upstream_key=upstream_key,
243            upstream_value=upstream_value,
244            upstream_table_name=upstream_table_name,
245        )
246    else:
247        return SensorUpstreamManager.generate_sensor_table_preprocess_query(
248            sensor_id=sensor_id
249        )
250
251
252def generate_sensor_sap_logchain_query(
253    chain_id: str,
254    dbtable: str = SAPLogchain.DBTABLE.value,
255    status: str = SAPLogchain.GREEN_STATUS.value,
256    engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
257) -> str:
258    """Generates a sensor query based in the SAP Logchain table.
259
260    Args:
261        chain_id: chain id to query the status on SAP.
262        dbtable: `db.table` to retrieve the data to
263            check if the sap chain is already finished.
264        status: `db.table` to retrieve the last status change
265            timestamp.
266        engine_table_name: table name exposed with the SAP LOGCHAIN data.
267            This table will be used in the jdbc query.
268
269    Return:
270        The query string.
271    """
272    ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query")
273    return SensorUpstreamManager.generate_sensor_sap_logchain_query(
274        chain_id=chain_id,
275        dbtable=dbtable,
276        status=status,
277        engine_table_name=engine_table_name,
278    )
279
280
281def send_notification(args: dict) -> None:
282    """Send a notification using a notifier.
283
284    Args:
285        args: arguments for the notifier.
286    """
287    notifier = NotifierFactory.get_notifier(
288        spec=TerminatorSpec(function="notify", args=args)
289    )
290
291    notifier.create_notification()
292    notifier.send_notification()
def load_data( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> Optional[OrderedDict]:
24def load_data(
25    acon_path: Optional[str] = None,
26    acon: Optional[dict] = None,
27    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
28    spark_confs: dict = None,
29) -> Optional[OrderedDict]:
30    """Load data using the DataLoader algorithm.
31
32    Args:
33        acon_path: path of the acon (algorithm configuration) file.
34        acon: acon provided directly through python code (e.g., notebooks or other
35            apps).
36        collect_engine_usage: Lakehouse usage statistics collection strategy.
37        spark_confs: optional dictionary with the spark confs to be used when collecting
38            the engine usage.
39    """
40    acon = ConfigUtils.get_acon(acon_path, acon)
41    ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None))
42    EngineUsageStats.store_engine_usage(
43        acon, load_data.__name__, collect_engine_usage, spark_confs
44    )
45    return DataLoader(acon).execute()

Load data using the DataLoader algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def execute_reconciliation( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
48def execute_reconciliation(
49    acon_path: Optional[str] = None,
50    acon: Optional[dict] = None,
51    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
52    spark_confs: dict = None,
53) -> None:
54    """Execute the Reconciliator algorithm.
55
56    Args:
57        acon_path: path of the acon (algorithm configuration) file.
58        acon: acon provided directly through python code (e.g., notebooks or other
59            apps).
60        collect_engine_usage: Lakehouse usage statistics collection strategy.
61        spark_confs: optional dictionary with the spark confs to be used when collecting
62            the engine usage.
63    """
64    acon = ConfigUtils.get_acon(acon_path, acon)
65    ExecEnv.get_or_create(app_name="reconciliator", config=acon.get("exec_env", None))
66    EngineUsageStats.store_engine_usage(
67        acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs
68    )
69    Reconciliator(acon).execute()

Execute the Reconciliator algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def execute_dq_validation( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
72def execute_dq_validation(
73    acon_path: Optional[str] = None,
74    acon: Optional[dict] = None,
75    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
76    spark_confs: dict = None,
77) -> None:
78    """Execute the DQValidator algorithm.
79
80    Args:
81        acon_path: path of the acon (algorithm configuration) file.
82        acon: acon provided directly through python code (e.g., notebooks or other
83            apps).
84        collect_engine_usage: Lakehouse usage statistics collection strategy.
85        spark_confs: optional dictionary with the spark confs to be used when collecting
86            the engine usage.
87    """
88    acon = ConfigUtils.get_acon(acon_path, acon)
89    ExecEnv.get_or_create(app_name="dq_validator", config=acon.get("exec_env", None))
90    EngineUsageStats.store_engine_usage(
91        acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs
92    )
93    DQValidator(acon).execute()

Execute the DQValidator algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def manage_table( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
 96def manage_table(
 97    acon_path: Optional[str] = None,
 98    acon: Optional[dict] = None,
 99    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
100    spark_confs: dict = None,
101) -> None:
102    """Manipulate tables/views using Table Manager algorithm.
103
104    Args:
105        acon_path: path of the acon (algorithm configuration) file.
106        acon: acon provided directly through python code (e.g., notebooks
107            or other apps).
108        collect_engine_usage: Lakehouse usage statistics collection strategy.
109        spark_confs: optional dictionary with the spark confs to be used when collecting
110            the engine usage.
111    """
112    acon = ConfigUtils.get_acon(acon_path, acon)
113    ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None))
114    EngineUsageStats.store_engine_usage(
115        acon, manage_table.__name__, collect_engine_usage, spark_confs
116    )
117    TableManager(acon).get_function()

Manipulate tables/views using Table Manager algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def manage_files( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> None:
120def manage_files(
121    acon_path: Optional[str] = None,
122    acon: Optional[dict] = None,
123    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
124    spark_confs: dict = None,
125) -> None:
126    """Manipulate s3 files using File Manager algorithm.
127
128    Args:
129        acon_path: path of the acon (algorithm configuration) file.
130        acon: acon provided directly through python code (e.g., notebooks
131            or other apps).
132        collect_engine_usage: Lakehouse usage statistics collection strategy.
133        spark_confs: optional dictionary with the spark confs to be used when collecting
134            the engine usage.
135    """
136    acon = ConfigUtils.get_acon(acon_path, acon)
137    ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None))
138    EngineUsageStats.store_engine_usage(
139        acon, manage_files.__name__, collect_engine_usage, spark_confs
140    )
141    FileManagerFactory.execute_function(configs=acon)

Manipulate s3 files using File Manager algorithm.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def execute_sensor( acon_path: Optional[str] = None, acon: Optional[dict] = None, collect_engine_usage: str = 'prod_only', spark_confs: dict = None) -> bool:
144def execute_sensor(
145    acon_path: Optional[str] = None,
146    acon: Optional[dict] = None,
147    collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value,
148    spark_confs: dict = None,
149) -> bool:
150    """Execute a sensor based on a Sensor Algorithm Configuration.
151
152    A sensor is useful to check if an upstream system has new data.
153
154    Args:
155        acon_path: path of the acon (algorithm configuration) file.
156        acon: acon provided directly through python code (e.g., notebooks
157            or other apps).
158        collect_engine_usage: Lakehouse usage statistics collection strategy.
159        spark_confs: optional dictionary with the spark confs to be used when collecting
160            the engine usage.
161    """
162    acon = ConfigUtils.get_acon(acon_path, acon)
163    ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None))
164    EngineUsageStats.store_engine_usage(
165        acon, execute_sensor.__name__, collect_engine_usage, spark_confs
166    )
167    return Sensor(acon).execute()

Execute a sensor based on a Sensor Algorithm Configuration.

A sensor is useful to check if an upstream system has new data.

Arguments:
  • acon_path: path of the acon (algorithm configuration) file.
  • acon: acon provided directly through python code (e.g., notebooks or other apps).
  • collect_engine_usage: Lakehouse usage statistics collection strategy.
  • spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
def update_sensor_status( sensor_id: str, control_db_table_name: str, status: str = 'PROCESSED_NEW_DATA', assets: List[str] = None) -> None:
170def update_sensor_status(
171    sensor_id: str,
172    control_db_table_name: str,
173    status: str = SensorStatus.PROCESSED_NEW_DATA.value,
174    assets: List[str] = None,
175) -> None:
176    """Update internal sensor status.
177
178    Update the sensor status in the control table,
179    it should be used to tell the system
180    that the sensor has processed all new data that was previously identified,
181    hence updating the shifted sensor status.
182    Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to
183    `SensorStatus.PROCESSED_NEW_DATA`,
184    but there might be scenarios - still to identify -
185    where we can update the sensor status from/to different statuses.
186
187    Args:
188        sensor_id: sensor id.
189        control_db_table_name: `db.table` to store sensor checkpoints.
190        status: status of the sensor.
191        assets: a list of assets that are considered as available to
192            consume downstream after this sensor has status
193            PROCESSED_NEW_DATA.
194    """
195    ExecEnv.get_or_create(app_name="update_sensor_status")
196    SensorTerminator.update_sensor_status(
197        sensor_id=sensor_id,
198        control_db_table_name=control_db_table_name,
199        status=status,
200        assets=assets,
201    )

Update internal sensor status.

Update the sensor status in the control table, it should be used to tell the system that the sensor has processed all new data that was previously identified, hence updating the shifted sensor status. Usually used to move from SensorStatus.ACQUIRED_NEW_DATA to SensorStatus.PROCESSED_NEW_DATA, but there might be scenarios - still to identify - where we can update the sensor status from/to different statuses.

Arguments:
  • sensor_id: sensor id.
  • control_db_table_name: db.table to store sensor checkpoints.
  • status: status of the sensor.
  • assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.
def generate_sensor_query( sensor_id: str, filter_exp: str = None, control_db_table_name: str = None, upstream_key: str = None, upstream_value: str = None, upstream_table_name: str = None) -> str:
204def generate_sensor_query(
205    sensor_id: str,
206    filter_exp: str = None,
207    control_db_table_name: str = None,
208    upstream_key: str = None,
209    upstream_value: str = None,
210    upstream_table_name: str = None,
211) -> str:
212    """Generates a preprocess query to be used in a sensor configuration.
213
214    Args:
215        sensor_id: sensor id.
216        filter_exp: expression to filter incoming new data.
217            You can use the placeholder ?default_upstream_key and
218            ?default_upstream_value, so that it can be replaced by the
219            respective values in the control_db_table_name for this specific
220            sensor_id.
221        control_db_table_name: `db.table` to retrieve the last status change
222            timestamp. This is only relevant for the jdbc sensor.
223        upstream_key: the key of custom sensor information to control how to
224            identify new data from the upstream (e.g., a time column in the
225            upstream).
226        upstream_value: the upstream value
227            to identify new data from the upstream (e.g., the value of a time
228            present in the upstream).
229        upstream_table_name: value for custom sensor
230            to query new data from the upstream
231            If none we will set the default value,
232            our `sensor_new_data` view.
233
234    Return:
235        The query string.
236    """
237    ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query")
238    if filter_exp:
239        return SensorUpstreamManager.generate_filter_exp_query(
240            sensor_id=sensor_id,
241            filter_exp=filter_exp,
242            control_db_table_name=control_db_table_name,
243            upstream_key=upstream_key,
244            upstream_value=upstream_value,
245            upstream_table_name=upstream_table_name,
246        )
247    else:
248        return SensorUpstreamManager.generate_sensor_table_preprocess_query(
249            sensor_id=sensor_id
250        )

Generates a preprocess query to be used in a sensor configuration.

Arguments:
  • sensor_id: sensor id.
  • filter_exp: expression to filter incoming new data. You can use the placeholder ?default_upstream_key and ?default_upstream_value, so that it can be replaced by the respective values in the control_db_table_name for this specific sensor_id.
  • control_db_table_name: db.table to retrieve the last status change timestamp. This is only relevant for the jdbc sensor.
  • upstream_key: the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream).
  • upstream_value: the upstream value to identify new data from the upstream (e.g., the value of a time present in the upstream).
  • upstream_table_name: value for custom sensor to query new data from the upstream If none we will set the default value, our sensor_new_data view.
Return:

The query string.

def generate_sensor_sap_logchain_query( chain_id: str, dbtable: str = 'SAPPHA.RSPCLOGCHAIN', status: str = 'G', engine_table_name: str = 'sensor_new_data') -> str:
253def generate_sensor_sap_logchain_query(
254    chain_id: str,
255    dbtable: str = SAPLogchain.DBTABLE.value,
256    status: str = SAPLogchain.GREEN_STATUS.value,
257    engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
258) -> str:
259    """Generates a sensor query based in the SAP Logchain table.
260
261    Args:
262        chain_id: chain id to query the status on SAP.
263        dbtable: `db.table` to retrieve the data to
264            check if the sap chain is already finished.
265        status: `db.table` to retrieve the last status change
266            timestamp.
267        engine_table_name: table name exposed with the SAP LOGCHAIN data.
268            This table will be used in the jdbc query.
269
270    Return:
271        The query string.
272    """
273    ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query")
274    return SensorUpstreamManager.generate_sensor_sap_logchain_query(
275        chain_id=chain_id,
276        dbtable=dbtable,
277        status=status,
278        engine_table_name=engine_table_name,
279    )

Generates a sensor query based in the SAP Logchain table.

Arguments:
  • chain_id: chain id to query the status on SAP.
  • dbtable: db.table to retrieve the data to check if the sap chain is already finished.
  • status: db.table to retrieve the last status change timestamp.
  • engine_table_name: table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query.
Return:

The query string.

def send_notification(args: dict) -> None:
282def send_notification(args: dict) -> None:
283    """Send a notification using a notifier.
284
285    Args:
286        args: arguments for the notifier.
287    """
288    notifier = NotifierFactory.get_notifier(
289        spec=TerminatorSpec(function="notify", args=args)
290    )
291
292    notifier.create_notification()
293    notifier.send_notification()

Send a notification using a notifier.

Arguments:
  • args: arguments for the notifier.