lakehouse_engine.core.sensor_manager

Module to define Sensor Manager classes.

  1"""Module to define Sensor Manager classes."""
  2
  3from datetime import datetime
  4from typing import List, Optional, Union
  5
  6from delta.tables import DeltaTable
  7from pyspark.sql import DataFrame, Row
  8from pyspark.sql.functions import array, col, lit
  9
 10from lakehouse_engine.core.definitions import (
 11    SENSOR_SCHEMA,
 12    SENSOR_UPDATE_SET,
 13    SAPLogchain,
 14    SensorSpec,
 15    SensorStatus,
 16)
 17from lakehouse_engine.core.exec_env import ExecEnv
 18from lakehouse_engine.io.reader_factory import ReaderFactory
 19from lakehouse_engine.utils.logging_handler import LoggingHandler
 20
 21
 22class SensorControlTableManager(object):
 23    """Class to control the Sensor execution."""
 24
 25    _LOGGER = LoggingHandler(__name__).get_logger()
 26
 27    @classmethod
 28    def check_if_sensor_has_acquired_data(
 29        cls,
 30        sensor_id: str,
 31        control_db_table_name: str,
 32    ) -> bool:
 33        """Check if sensor has acquired new data.
 34
 35        Args:
 36            sensor_id: sensor id.
 37            control_db_table_name: `db.table` to control sensor runs.
 38
 39        Returns:
 40            True if acquired new data, otherwise False
 41        """
 42        sensor_table_data = cls.read_sensor_table_data(
 43            sensor_id=sensor_id, control_db_table_name=control_db_table_name
 44        )
 45        cls._LOGGER.info(f"sensor_table_data = {sensor_table_data}")
 46
 47        return (
 48            sensor_table_data is not None
 49            and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value
 50        )
 51
 52    @classmethod
 53    def update_sensor_status(
 54        cls,
 55        sensor_spec: SensorSpec,
 56        status: str,
 57        upstream_key: str = None,
 58        upstream_value: str = None,
 59    ) -> None:
 60        """Control sensor execution storing the execution data in a delta table.
 61
 62        Args:
 63            sensor_spec: sensor spec containing all sensor
 64                information we need to update the control status.
 65            status: status of the sensor.
 66            upstream_key: upstream key (e.g., used to store an attribute
 67                name from the upstream so that new data can be detected
 68                automatically).
 69            upstream_value: upstream value (e.g., used to store the max
 70                attribute value from the upstream so that new data can be
 71                detected automatically).
 72        """
 73        cls._LOGGER.info(
 74            f"Updating sensor status for sensor {sensor_spec.sensor_id}..."
 75        )
 76
 77        data = cls._convert_sensor_to_data(
 78            spec=sensor_spec,
 79            status=status,
 80            upstream_key=upstream_key,
 81            upstream_value=upstream_value,
 82        )
 83
 84        sensor_update_set = cls._get_sensor_update_set(
 85            assets=sensor_spec.assets,
 86            checkpoint_location=sensor_spec.checkpoint_location,
 87            upstream_key=upstream_key,
 88            upstream_value=upstream_value,
 89        )
 90
 91        cls._update_sensor_control(
 92            data=data,
 93            sensor_update_set=sensor_update_set,
 94            sensor_control_table=sensor_spec.control_db_table_name,
 95            sensor_id=sensor_spec.sensor_id,
 96        )
 97
 98    @classmethod
 99    def _update_sensor_control(
100        cls,
101        data: List[dict],
102        sensor_update_set: dict,
103        sensor_control_table: str,
104        sensor_id: str,
105    ) -> None:
106        """Update sensor control delta table.
107
108        Args:
109            data: to be updated.
110            sensor_update_set: columns which we had update.
111            sensor_control_table: control table name.
112            sensor_id: sensor_id to be updated.
113        """
114        sensors_delta_table = DeltaTable.forName(
115            ExecEnv.SESSION,
116            sensor_control_table,
117        )
118        sensors_updates = ExecEnv.SESSION.createDataFrame(data, SENSOR_SCHEMA)
119        sensors_delta_table.alias("sensors").merge(
120            sensors_updates.alias("updates"),
121            f"sensors.sensor_id = '{sensor_id}' AND "
122            "sensors.sensor_id = updates.sensor_id",
123        ).whenMatchedUpdate(set=sensor_update_set).whenNotMatchedInsertAll().execute()
124
125    @classmethod
126    def _convert_sensor_to_data(
127        cls,
128        spec: SensorSpec,
129        status: str,
130        upstream_key: str,
131        upstream_value: str,
132        status_change_timestamp: Optional[datetime] = None,
133    ) -> List[dict]:
134        """Convert sensor data to dataframe input data.
135
136        Args:
137            spec: sensor spec containing sensor identifier data.
138            status: new sensor data status.
139            upstream_key: key used to acquired data from the upstream.
140            upstream_value: max value from the upstream_key
141                acquired from the upstream.
142            status_change_timestamp: timestamp we commit
143                this change in the sensor control table.
144
145        Return:
146            Sensor data as list[dict], used to create a
147                dataframe to store the data into the sensor_control_table.
148        """
149        status_change_timestamp = (
150            datetime.now()
151            if status_change_timestamp is None
152            else status_change_timestamp
153        )
154        return [
155            {
156                "sensor_id": spec.sensor_id,
157                "assets": spec.assets,
158                "status": status,
159                "status_change_timestamp": status_change_timestamp,
160                "checkpoint_location": spec.checkpoint_location,
161                "upstream_key": str(upstream_key),
162                "upstream_value": str(upstream_value),
163            }
164        ]
165
166    @classmethod
167    def _get_sensor_update_set(cls, **kwargs: Union[Optional[str], List[str]]) -> dict:
168        """Get the sensor update set.
169
170        Args:
171            kwargs: Containing the following keys:
172            - assets
173            - checkpoint_location
174            - upstream_key
175            - upstream_value
176
177        Return:
178            A set containing the fields to update in the control_table.
179        """
180        sensor_update_set = dict(SENSOR_UPDATE_SET)
181        for key, value in kwargs.items():
182            if value:
183                sensor_update_set[f"sensors.{key}"] = f"updates.{key}"
184
185        return sensor_update_set
186
187    @classmethod
188    def read_sensor_table_data(
189        cls,
190        control_db_table_name: str,
191        sensor_id: str = None,
192        assets: list = None,
193    ) -> Optional[Row]:
194        """Read data from delta table containing sensor status info.
195
196        Args:
197            sensor_id: sensor id. If this parameter is defined search occurs
198                only considering this parameter. Otherwise, it considers sensor
199                assets and checkpoint location.
200            control_db_table_name: db.table to control sensor runs.
201            assets: list of assets that are fueled by the pipeline
202                where this sensor is.
203
204        Return:
205            Row containing the data for the provided sensor_id.
206        """
207        df = DeltaTable.forName(
208            ExecEnv.SESSION,
209            control_db_table_name,
210        ).toDF()
211
212        if sensor_id:
213            df = df.where(col("sensor_id") == sensor_id)
214        elif assets:
215            df = df.where(col("assets") == array(*[lit(asset) for asset in assets]))
216        else:
217            raise ValueError(
218                "Either sensor_id or assets need to be provided as arguments."
219            )
220
221        return df.first()
222
223
224class SensorUpstreamManager(object):
225    """Class to deal with Sensor Upstream data."""
226
227    _LOGGER = LoggingHandler(__name__).get_logger()
228
229    @classmethod
230    def generate_filter_exp_query(
231        cls,
232        sensor_id: str,
233        filter_exp: str,
234        control_db_table_name: str = None,
235        upstream_key: str = None,
236        upstream_value: str = None,
237        upstream_table_name: str = None,
238    ) -> str:
239        """Generates a sensor preprocess query based on timestamp logic.
240
241        Args:
242            sensor_id: sensor id.
243            filter_exp: expression to filter incoming new data.
244                You can use the placeholder `?upstream_value` so that
245                it can be replaced by the upstream_value in the
246                control_db_table_name for this specific sensor_id.
247            control_db_table_name: db.table to retrieve the last status change
248                timestamp. This is only relevant for the jdbc sensor.
249            upstream_key: the key of custom sensor information
250                to control how to identify new data from the
251                upstream (e.g., a time column in the upstream).
252            upstream_value: value for custom sensor
253                to identify new data from the upstream
254                (e.g., the value of a time present in the upstream)
255                If none we will set the default value.
256                Note: This parameter is used just to override the
257                default value `-2147483647`.
258            upstream_table_name: value for custom sensor
259                to query new data from the upstream.
260                If none we will set the default value,
261                our `sensor_new_data` view.
262
263        Return:
264            The query string.
265        """
266        source_table = upstream_table_name if upstream_table_name else "sensor_new_data"
267        select_exp = "SELECT COUNT(1) as count"
268        if control_db_table_name:
269            if not upstream_key:
270                raise ValueError(
271                    "If control_db_table_name is defined, upstream_key should "
272                    "also be defined!"
273                )
274
275            default_upstream_value: str = "-2147483647"
276            trigger_name = upstream_key
277            trigger_value = (
278                default_upstream_value if upstream_value is None else upstream_value
279            )
280            sensor_table_data = SensorControlTableManager.read_sensor_table_data(
281                sensor_id=sensor_id, control_db_table_name=control_db_table_name
282            )
283
284            if sensor_table_data and sensor_table_data.upstream_value:
285                trigger_value = sensor_table_data.upstream_value
286
287            filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace(
288                "?upstream_value", trigger_value
289            )
290            select_exp = (
291                f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, "
292                f"max({trigger_name}) as UPSTREAM_VALUE"
293            )
294
295        query = (
296            f"{select_exp} "
297            f"FROM {source_table} "
298            f"WHERE {filter_exp} "
299            f"HAVING COUNT(1) > 0"
300        )
301
302        return query
303
304    @classmethod
305    def generate_sensor_table_preprocess_query(
306        cls,
307        sensor_id: str,
308    ) -> str:
309        """Generates a query to be used for a sensor having other sensor as upstream.
310
311        Args:
312            sensor_id: sensor id.
313
314        Return:
315            The query string.
316        """
317        query = (
318            f"SELECT * "  # nosec
319            f"FROM sensor_new_data "
320            f"WHERE"
321            f" _change_type in ('insert', 'update_postimage')"
322            f" and sensor_id = '{sensor_id}'"
323            f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'"
324        )
325
326        return query
327
328    @classmethod
329    def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame:
330        """Read new data from the upstream into the sensor 'new_data_df'.
331
332        Args:
333            sensor_spec: sensor spec containing all sensor information.
334
335        Return:
336            An empty dataframe if it doesn't have new data otherwise the new data
337        """
338        new_data_df = ReaderFactory.get_data(sensor_spec.input_spec)
339
340        if sensor_spec.preprocess_query:
341            new_data_df.createOrReplaceTempView("sensor_new_data")
342            new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query)
343
344        return new_data_df
345
346    @classmethod
347    def get_new_data(
348        cls,
349        new_data_df: DataFrame,
350    ) -> Optional[Row]:
351        """Get new data from upstream df if it's present.
352
353        Args:
354            new_data_df: DataFrame possibly containing new data.
355
356        Return:
357            Optional row, present if there is new data in the upstream,
358            absent otherwise.
359        """
360        return new_data_df.first()
361
362    @classmethod
363    def generate_sensor_sap_logchain_query(
364        cls,
365        chain_id: str,
366        dbtable: str = SAPLogchain.DBTABLE.value,
367        status: str = SAPLogchain.GREEN_STATUS.value,
368        engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
369    ) -> str:
370        """Generates a sensor query based in the SAP Logchain table.
371
372        Args:
373            chain_id: chain id to query the status on SAP.
374            dbtable: db.table to retrieve the data to
375                check if the sap chain is already finished.
376            status: db.table to retrieve the last status change
377                timestamp.
378            engine_table_name: table name exposed with the SAP LOGCHAIN data.
379                This table will be used in the jdbc query.
380
381        Return:
382            The query string.
383        """
384        if not chain_id:
385            raise ValueError(
386                "To query on log chain SAP table the chain id should be defined!"
387            )
388
389        select_exp = (
390            "SELECT CHAIN_ID, CONCAT(DATUM, ZEIT) AS LOAD_DATE, ANALYZED_STATUS"
391        )
392        filter_exp = (
393            f"UPPER(CHAIN_ID) = UPPER('{chain_id}') "
394            f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')"
395        )
396
397        query = (
398            f"WITH {engine_table_name} AS ("
399            f"{select_exp} "
400            f"FROM {dbtable} "
401            f"WHERE {filter_exp}"
402            ")"
403        )
404
405        return query
class SensorControlTableManager:
 23class SensorControlTableManager(object):
 24    """Class to control the Sensor execution."""
 25
 26    _LOGGER = LoggingHandler(__name__).get_logger()
 27
 28    @classmethod
 29    def check_if_sensor_has_acquired_data(
 30        cls,
 31        sensor_id: str,
 32        control_db_table_name: str,
 33    ) -> bool:
 34        """Check if sensor has acquired new data.
 35
 36        Args:
 37            sensor_id: sensor id.
 38            control_db_table_name: `db.table` to control sensor runs.
 39
 40        Returns:
 41            True if acquired new data, otherwise False
 42        """
 43        sensor_table_data = cls.read_sensor_table_data(
 44            sensor_id=sensor_id, control_db_table_name=control_db_table_name
 45        )
 46        cls._LOGGER.info(f"sensor_table_data = {sensor_table_data}")
 47
 48        return (
 49            sensor_table_data is not None
 50            and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value
 51        )
 52
 53    @classmethod
 54    def update_sensor_status(
 55        cls,
 56        sensor_spec: SensorSpec,
 57        status: str,
 58        upstream_key: str = None,
 59        upstream_value: str = None,
 60    ) -> None:
 61        """Control sensor execution storing the execution data in a delta table.
 62
 63        Args:
 64            sensor_spec: sensor spec containing all sensor
 65                information we need to update the control status.
 66            status: status of the sensor.
 67            upstream_key: upstream key (e.g., used to store an attribute
 68                name from the upstream so that new data can be detected
 69                automatically).
 70            upstream_value: upstream value (e.g., used to store the max
 71                attribute value from the upstream so that new data can be
 72                detected automatically).
 73        """
 74        cls._LOGGER.info(
 75            f"Updating sensor status for sensor {sensor_spec.sensor_id}..."
 76        )
 77
 78        data = cls._convert_sensor_to_data(
 79            spec=sensor_spec,
 80            status=status,
 81            upstream_key=upstream_key,
 82            upstream_value=upstream_value,
 83        )
 84
 85        sensor_update_set = cls._get_sensor_update_set(
 86            assets=sensor_spec.assets,
 87            checkpoint_location=sensor_spec.checkpoint_location,
 88            upstream_key=upstream_key,
 89            upstream_value=upstream_value,
 90        )
 91
 92        cls._update_sensor_control(
 93            data=data,
 94            sensor_update_set=sensor_update_set,
 95            sensor_control_table=sensor_spec.control_db_table_name,
 96            sensor_id=sensor_spec.sensor_id,
 97        )
 98
 99    @classmethod
100    def _update_sensor_control(
101        cls,
102        data: List[dict],
103        sensor_update_set: dict,
104        sensor_control_table: str,
105        sensor_id: str,
106    ) -> None:
107        """Update sensor control delta table.
108
109        Args:
110            data: to be updated.
111            sensor_update_set: columns which we had update.
112            sensor_control_table: control table name.
113            sensor_id: sensor_id to be updated.
114        """
115        sensors_delta_table = DeltaTable.forName(
116            ExecEnv.SESSION,
117            sensor_control_table,
118        )
119        sensors_updates = ExecEnv.SESSION.createDataFrame(data, SENSOR_SCHEMA)
120        sensors_delta_table.alias("sensors").merge(
121            sensors_updates.alias("updates"),
122            f"sensors.sensor_id = '{sensor_id}' AND "
123            "sensors.sensor_id = updates.sensor_id",
124        ).whenMatchedUpdate(set=sensor_update_set).whenNotMatchedInsertAll().execute()
125
126    @classmethod
127    def _convert_sensor_to_data(
128        cls,
129        spec: SensorSpec,
130        status: str,
131        upstream_key: str,
132        upstream_value: str,
133        status_change_timestamp: Optional[datetime] = None,
134    ) -> List[dict]:
135        """Convert sensor data to dataframe input data.
136
137        Args:
138            spec: sensor spec containing sensor identifier data.
139            status: new sensor data status.
140            upstream_key: key used to acquired data from the upstream.
141            upstream_value: max value from the upstream_key
142                acquired from the upstream.
143            status_change_timestamp: timestamp we commit
144                this change in the sensor control table.
145
146        Return:
147            Sensor data as list[dict], used to create a
148                dataframe to store the data into the sensor_control_table.
149        """
150        status_change_timestamp = (
151            datetime.now()
152            if status_change_timestamp is None
153            else status_change_timestamp
154        )
155        return [
156            {
157                "sensor_id": spec.sensor_id,
158                "assets": spec.assets,
159                "status": status,
160                "status_change_timestamp": status_change_timestamp,
161                "checkpoint_location": spec.checkpoint_location,
162                "upstream_key": str(upstream_key),
163                "upstream_value": str(upstream_value),
164            }
165        ]
166
167    @classmethod
168    def _get_sensor_update_set(cls, **kwargs: Union[Optional[str], List[str]]) -> dict:
169        """Get the sensor update set.
170
171        Args:
172            kwargs: Containing the following keys:
173            - assets
174            - checkpoint_location
175            - upstream_key
176            - upstream_value
177
178        Return:
179            A set containing the fields to update in the control_table.
180        """
181        sensor_update_set = dict(SENSOR_UPDATE_SET)
182        for key, value in kwargs.items():
183            if value:
184                sensor_update_set[f"sensors.{key}"] = f"updates.{key}"
185
186        return sensor_update_set
187
188    @classmethod
189    def read_sensor_table_data(
190        cls,
191        control_db_table_name: str,
192        sensor_id: str = None,
193        assets: list = None,
194    ) -> Optional[Row]:
195        """Read data from delta table containing sensor status info.
196
197        Args:
198            sensor_id: sensor id. If this parameter is defined search occurs
199                only considering this parameter. Otherwise, it considers sensor
200                assets and checkpoint location.
201            control_db_table_name: db.table to control sensor runs.
202            assets: list of assets that are fueled by the pipeline
203                where this sensor is.
204
205        Return:
206            Row containing the data for the provided sensor_id.
207        """
208        df = DeltaTable.forName(
209            ExecEnv.SESSION,
210            control_db_table_name,
211        ).toDF()
212
213        if sensor_id:
214            df = df.where(col("sensor_id") == sensor_id)
215        elif assets:
216            df = df.where(col("assets") == array(*[lit(asset) for asset in assets]))
217        else:
218            raise ValueError(
219                "Either sensor_id or assets need to be provided as arguments."
220            )
221
222        return df.first()

Class to control the Sensor execution.

@classmethod
def check_if_sensor_has_acquired_data(cls, sensor_id: str, control_db_table_name: str) -> bool:
28    @classmethod
29    def check_if_sensor_has_acquired_data(
30        cls,
31        sensor_id: str,
32        control_db_table_name: str,
33    ) -> bool:
34        """Check if sensor has acquired new data.
35
36        Args:
37            sensor_id: sensor id.
38            control_db_table_name: `db.table` to control sensor runs.
39
40        Returns:
41            True if acquired new data, otherwise False
42        """
43        sensor_table_data = cls.read_sensor_table_data(
44            sensor_id=sensor_id, control_db_table_name=control_db_table_name
45        )
46        cls._LOGGER.info(f"sensor_table_data = {sensor_table_data}")
47
48        return (
49            sensor_table_data is not None
50            and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value
51        )

Check if sensor has acquired new data.

Arguments:
  • sensor_id: sensor id.
  • control_db_table_name: db.table to control sensor runs.
Returns:

True if acquired new data, otherwise False

@classmethod
def update_sensor_status( cls, sensor_spec: lakehouse_engine.core.definitions.SensorSpec, status: str, upstream_key: str = None, upstream_value: str = None) -> None:
53    @classmethod
54    def update_sensor_status(
55        cls,
56        sensor_spec: SensorSpec,
57        status: str,
58        upstream_key: str = None,
59        upstream_value: str = None,
60    ) -> None:
61        """Control sensor execution storing the execution data in a delta table.
62
63        Args:
64            sensor_spec: sensor spec containing all sensor
65                information we need to update the control status.
66            status: status of the sensor.
67            upstream_key: upstream key (e.g., used to store an attribute
68                name from the upstream so that new data can be detected
69                automatically).
70            upstream_value: upstream value (e.g., used to store the max
71                attribute value from the upstream so that new data can be
72                detected automatically).
73        """
74        cls._LOGGER.info(
75            f"Updating sensor status for sensor {sensor_spec.sensor_id}..."
76        )
77
78        data = cls._convert_sensor_to_data(
79            spec=sensor_spec,
80            status=status,
81            upstream_key=upstream_key,
82            upstream_value=upstream_value,
83        )
84
85        sensor_update_set = cls._get_sensor_update_set(
86            assets=sensor_spec.assets,
87            checkpoint_location=sensor_spec.checkpoint_location,
88            upstream_key=upstream_key,
89            upstream_value=upstream_value,
90        )
91
92        cls._update_sensor_control(
93            data=data,
94            sensor_update_set=sensor_update_set,
95            sensor_control_table=sensor_spec.control_db_table_name,
96            sensor_id=sensor_spec.sensor_id,
97        )

Control sensor execution storing the execution data in a delta table.

Arguments:
  • sensor_spec: sensor spec containing all sensor information we need to update the control status.
  • status: status of the sensor.
  • upstream_key: upstream key (e.g., used to store an attribute name from the upstream so that new data can be detected automatically).
  • upstream_value: upstream value (e.g., used to store the max attribute value from the upstream so that new data can be detected automatically).
@classmethod
def read_sensor_table_data( cls, control_db_table_name: str, sensor_id: str = None, assets: list = None) -> Optional[pyspark.sql.types.Row]:
188    @classmethod
189    def read_sensor_table_data(
190        cls,
191        control_db_table_name: str,
192        sensor_id: str = None,
193        assets: list = None,
194    ) -> Optional[Row]:
195        """Read data from delta table containing sensor status info.
196
197        Args:
198            sensor_id: sensor id. If this parameter is defined search occurs
199                only considering this parameter. Otherwise, it considers sensor
200                assets and checkpoint location.
201            control_db_table_name: db.table to control sensor runs.
202            assets: list of assets that are fueled by the pipeline
203                where this sensor is.
204
205        Return:
206            Row containing the data for the provided sensor_id.
207        """
208        df = DeltaTable.forName(
209            ExecEnv.SESSION,
210            control_db_table_name,
211        ).toDF()
212
213        if sensor_id:
214            df = df.where(col("sensor_id") == sensor_id)
215        elif assets:
216            df = df.where(col("assets") == array(*[lit(asset) for asset in assets]))
217        else:
218            raise ValueError(
219                "Either sensor_id or assets need to be provided as arguments."
220            )
221
222        return df.first()

Read data from delta table containing sensor status info.

Arguments:
  • sensor_id: sensor id. If this parameter is defined search occurs only considering this parameter. Otherwise, it considers sensor assets and checkpoint location.
  • control_db_table_name: db.table to control sensor runs.
  • assets: list of assets that are fueled by the pipeline where this sensor is.
Return:

Row containing the data for the provided sensor_id.

class SensorUpstreamManager:
225class SensorUpstreamManager(object):
226    """Class to deal with Sensor Upstream data."""
227
228    _LOGGER = LoggingHandler(__name__).get_logger()
229
230    @classmethod
231    def generate_filter_exp_query(
232        cls,
233        sensor_id: str,
234        filter_exp: str,
235        control_db_table_name: str = None,
236        upstream_key: str = None,
237        upstream_value: str = None,
238        upstream_table_name: str = None,
239    ) -> str:
240        """Generates a sensor preprocess query based on timestamp logic.
241
242        Args:
243            sensor_id: sensor id.
244            filter_exp: expression to filter incoming new data.
245                You can use the placeholder `?upstream_value` so that
246                it can be replaced by the upstream_value in the
247                control_db_table_name for this specific sensor_id.
248            control_db_table_name: db.table to retrieve the last status change
249                timestamp. This is only relevant for the jdbc sensor.
250            upstream_key: the key of custom sensor information
251                to control how to identify new data from the
252                upstream (e.g., a time column in the upstream).
253            upstream_value: value for custom sensor
254                to identify new data from the upstream
255                (e.g., the value of a time present in the upstream)
256                If none we will set the default value.
257                Note: This parameter is used just to override the
258                default value `-2147483647`.
259            upstream_table_name: value for custom sensor
260                to query new data from the upstream.
261                If none we will set the default value,
262                our `sensor_new_data` view.
263
264        Return:
265            The query string.
266        """
267        source_table = upstream_table_name if upstream_table_name else "sensor_new_data"
268        select_exp = "SELECT COUNT(1) as count"
269        if control_db_table_name:
270            if not upstream_key:
271                raise ValueError(
272                    "If control_db_table_name is defined, upstream_key should "
273                    "also be defined!"
274                )
275
276            default_upstream_value: str = "-2147483647"
277            trigger_name = upstream_key
278            trigger_value = (
279                default_upstream_value if upstream_value is None else upstream_value
280            )
281            sensor_table_data = SensorControlTableManager.read_sensor_table_data(
282                sensor_id=sensor_id, control_db_table_name=control_db_table_name
283            )
284
285            if sensor_table_data and sensor_table_data.upstream_value:
286                trigger_value = sensor_table_data.upstream_value
287
288            filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace(
289                "?upstream_value", trigger_value
290            )
291            select_exp = (
292                f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, "
293                f"max({trigger_name}) as UPSTREAM_VALUE"
294            )
295
296        query = (
297            f"{select_exp} "
298            f"FROM {source_table} "
299            f"WHERE {filter_exp} "
300            f"HAVING COUNT(1) > 0"
301        )
302
303        return query
304
305    @classmethod
306    def generate_sensor_table_preprocess_query(
307        cls,
308        sensor_id: str,
309    ) -> str:
310        """Generates a query to be used for a sensor having other sensor as upstream.
311
312        Args:
313            sensor_id: sensor id.
314
315        Return:
316            The query string.
317        """
318        query = (
319            f"SELECT * "  # nosec
320            f"FROM sensor_new_data "
321            f"WHERE"
322            f" _change_type in ('insert', 'update_postimage')"
323            f" and sensor_id = '{sensor_id}'"
324            f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'"
325        )
326
327        return query
328
329    @classmethod
330    def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame:
331        """Read new data from the upstream into the sensor 'new_data_df'.
332
333        Args:
334            sensor_spec: sensor spec containing all sensor information.
335
336        Return:
337            An empty dataframe if it doesn't have new data otherwise the new data
338        """
339        new_data_df = ReaderFactory.get_data(sensor_spec.input_spec)
340
341        if sensor_spec.preprocess_query:
342            new_data_df.createOrReplaceTempView("sensor_new_data")
343            new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query)
344
345        return new_data_df
346
347    @classmethod
348    def get_new_data(
349        cls,
350        new_data_df: DataFrame,
351    ) -> Optional[Row]:
352        """Get new data from upstream df if it's present.
353
354        Args:
355            new_data_df: DataFrame possibly containing new data.
356
357        Return:
358            Optional row, present if there is new data in the upstream,
359            absent otherwise.
360        """
361        return new_data_df.first()
362
363    @classmethod
364    def generate_sensor_sap_logchain_query(
365        cls,
366        chain_id: str,
367        dbtable: str = SAPLogchain.DBTABLE.value,
368        status: str = SAPLogchain.GREEN_STATUS.value,
369        engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
370    ) -> str:
371        """Generates a sensor query based in the SAP Logchain table.
372
373        Args:
374            chain_id: chain id to query the status on SAP.
375            dbtable: db.table to retrieve the data to
376                check if the sap chain is already finished.
377            status: db.table to retrieve the last status change
378                timestamp.
379            engine_table_name: table name exposed with the SAP LOGCHAIN data.
380                This table will be used in the jdbc query.
381
382        Return:
383            The query string.
384        """
385        if not chain_id:
386            raise ValueError(
387                "To query on log chain SAP table the chain id should be defined!"
388            )
389
390        select_exp = (
391            "SELECT CHAIN_ID, CONCAT(DATUM, ZEIT) AS LOAD_DATE, ANALYZED_STATUS"
392        )
393        filter_exp = (
394            f"UPPER(CHAIN_ID) = UPPER('{chain_id}') "
395            f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')"
396        )
397
398        query = (
399            f"WITH {engine_table_name} AS ("
400            f"{select_exp} "
401            f"FROM {dbtable} "
402            f"WHERE {filter_exp}"
403            ")"
404        )
405
406        return query

Class to deal with Sensor Upstream data.

@classmethod
def generate_filter_exp_query( cls, sensor_id: str, filter_exp: str, control_db_table_name: str = None, upstream_key: str = None, upstream_value: str = None, upstream_table_name: str = None) -> str:
230    @classmethod
231    def generate_filter_exp_query(
232        cls,
233        sensor_id: str,
234        filter_exp: str,
235        control_db_table_name: str = None,
236        upstream_key: str = None,
237        upstream_value: str = None,
238        upstream_table_name: str = None,
239    ) -> str:
240        """Generates a sensor preprocess query based on timestamp logic.
241
242        Args:
243            sensor_id: sensor id.
244            filter_exp: expression to filter incoming new data.
245                You can use the placeholder `?upstream_value` so that
246                it can be replaced by the upstream_value in the
247                control_db_table_name for this specific sensor_id.
248            control_db_table_name: db.table to retrieve the last status change
249                timestamp. This is only relevant for the jdbc sensor.
250            upstream_key: the key of custom sensor information
251                to control how to identify new data from the
252                upstream (e.g., a time column in the upstream).
253            upstream_value: value for custom sensor
254                to identify new data from the upstream
255                (e.g., the value of a time present in the upstream)
256                If none we will set the default value.
257                Note: This parameter is used just to override the
258                default value `-2147483647`.
259            upstream_table_name: value for custom sensor
260                to query new data from the upstream.
261                If none we will set the default value,
262                our `sensor_new_data` view.
263
264        Return:
265            The query string.
266        """
267        source_table = upstream_table_name if upstream_table_name else "sensor_new_data"
268        select_exp = "SELECT COUNT(1) as count"
269        if control_db_table_name:
270            if not upstream_key:
271                raise ValueError(
272                    "If control_db_table_name is defined, upstream_key should "
273                    "also be defined!"
274                )
275
276            default_upstream_value: str = "-2147483647"
277            trigger_name = upstream_key
278            trigger_value = (
279                default_upstream_value if upstream_value is None else upstream_value
280            )
281            sensor_table_data = SensorControlTableManager.read_sensor_table_data(
282                sensor_id=sensor_id, control_db_table_name=control_db_table_name
283            )
284
285            if sensor_table_data and sensor_table_data.upstream_value:
286                trigger_value = sensor_table_data.upstream_value
287
288            filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace(
289                "?upstream_value", trigger_value
290            )
291            select_exp = (
292                f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, "
293                f"max({trigger_name}) as UPSTREAM_VALUE"
294            )
295
296        query = (
297            f"{select_exp} "
298            f"FROM {source_table} "
299            f"WHERE {filter_exp} "
300            f"HAVING COUNT(1) > 0"
301        )
302
303        return query

Generates a sensor preprocess query based on timestamp logic.

Arguments:
  • sensor_id: sensor id.
  • filter_exp: expression to filter incoming new data. You can use the placeholder ?upstream_value so that it can be replaced by the upstream_value in the control_db_table_name for this specific sensor_id.
  • control_db_table_name: db.table to retrieve the last status change timestamp. This is only relevant for the jdbc sensor.
  • upstream_key: the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream).
  • upstream_value: value for custom sensor to identify new data from the upstream (e.g., the value of a time present in the upstream) If none we will set the default value. Note: This parameter is used just to override the default value -2147483647.
  • upstream_table_name: value for custom sensor to query new data from the upstream. If none we will set the default value, our sensor_new_data view.
Return:

The query string.

@classmethod
def generate_sensor_table_preprocess_query(cls, sensor_id: str) -> str:
305    @classmethod
306    def generate_sensor_table_preprocess_query(
307        cls,
308        sensor_id: str,
309    ) -> str:
310        """Generates a query to be used for a sensor having other sensor as upstream.
311
312        Args:
313            sensor_id: sensor id.
314
315        Return:
316            The query string.
317        """
318        query = (
319            f"SELECT * "  # nosec
320            f"FROM sensor_new_data "
321            f"WHERE"
322            f" _change_type in ('insert', 'update_postimage')"
323            f" and sensor_id = '{sensor_id}'"
324            f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'"
325        )
326
327        return query

Generates a query to be used for a sensor having other sensor as upstream.

Arguments:
  • sensor_id: sensor id.
Return:

The query string.

@classmethod
def read_new_data( cls, sensor_spec: lakehouse_engine.core.definitions.SensorSpec) -> pyspark.sql.dataframe.DataFrame:
329    @classmethod
330    def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame:
331        """Read new data from the upstream into the sensor 'new_data_df'.
332
333        Args:
334            sensor_spec: sensor spec containing all sensor information.
335
336        Return:
337            An empty dataframe if it doesn't have new data otherwise the new data
338        """
339        new_data_df = ReaderFactory.get_data(sensor_spec.input_spec)
340
341        if sensor_spec.preprocess_query:
342            new_data_df.createOrReplaceTempView("sensor_new_data")
343            new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query)
344
345        return new_data_df

Read new data from the upstream into the sensor 'new_data_df'.

Arguments:
  • sensor_spec: sensor spec containing all sensor information.
Return:

An empty dataframe if it doesn't have new data otherwise the new data

@classmethod
def get_new_data( cls, new_data_df: pyspark.sql.dataframe.DataFrame) -> Optional[pyspark.sql.types.Row]:
347    @classmethod
348    def get_new_data(
349        cls,
350        new_data_df: DataFrame,
351    ) -> Optional[Row]:
352        """Get new data from upstream df if it's present.
353
354        Args:
355            new_data_df: DataFrame possibly containing new data.
356
357        Return:
358            Optional row, present if there is new data in the upstream,
359            absent otherwise.
360        """
361        return new_data_df.first()

Get new data from upstream df if it's present.

Arguments:
  • new_data_df: DataFrame possibly containing new data.
Return:

Optional row, present if there is new data in the upstream, absent otherwise.

@classmethod
def generate_sensor_sap_logchain_query( cls, chain_id: str, dbtable: str = 'SAPPHA.RSPCLOGCHAIN', status: str = 'G', engine_table_name: str = 'sensor_new_data') -> str:
363    @classmethod
364    def generate_sensor_sap_logchain_query(
365        cls,
366        chain_id: str,
367        dbtable: str = SAPLogchain.DBTABLE.value,
368        status: str = SAPLogchain.GREEN_STATUS.value,
369        engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
370    ) -> str:
371        """Generates a sensor query based in the SAP Logchain table.
372
373        Args:
374            chain_id: chain id to query the status on SAP.
375            dbtable: db.table to retrieve the data to
376                check if the sap chain is already finished.
377            status: db.table to retrieve the last status change
378                timestamp.
379            engine_table_name: table name exposed with the SAP LOGCHAIN data.
380                This table will be used in the jdbc query.
381
382        Return:
383            The query string.
384        """
385        if not chain_id:
386            raise ValueError(
387                "To query on log chain SAP table the chain id should be defined!"
388            )
389
390        select_exp = (
391            "SELECT CHAIN_ID, CONCAT(DATUM, ZEIT) AS LOAD_DATE, ANALYZED_STATUS"
392        )
393        filter_exp = (
394            f"UPPER(CHAIN_ID) = UPPER('{chain_id}') "
395            f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')"
396        )
397
398        query = (
399            f"WITH {engine_table_name} AS ("
400            f"{select_exp} "
401            f"FROM {dbtable} "
402            f"WHERE {filter_exp}"
403            ")"
404        )
405
406        return query

Generates a sensor query based in the SAP Logchain table.

Arguments:
  • chain_id: chain id to query the status on SAP.
  • dbtable: db.table to retrieve the data to check if the sap chain is already finished.
  • status: db.table to retrieve the last status change timestamp.
  • engine_table_name: table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query.
Return:

The query string.