
Module to define Sensor Manager classes.

  1"""Module to define Sensor Manager classes."""
  3from datetime import datetime
  4from typing import List, Optional, Union
  6from delta.tables import DeltaTable
  7from pyspark.sql import DataFrame, Row
  8from pyspark.sql.functions import array, col, lit
 10from lakehouse_engine.core.definitions import (
 13    SAPLogchain,
 14    SensorSpec,
 15    SensorStatus,
 17from lakehouse_engine.core.exec_env import ExecEnv
 18from import ReaderFactory
 19from lakehouse_engine.utils.logging_handler import LoggingHandler
 22class SensorControlTableManager(object):
 23    """Class to control the Sensor execution."""
 25    _LOGGER = LoggingHandler(__name__).get_logger()
 27    @classmethod
 28    def check_if_sensor_has_acquired_data(
 29        cls,
 30        sensor_id: str,
 31        control_db_table_name: str,
 32    ) -> bool:
 33        """Check if sensor has acquired new data.
 35        Args:
 36            sensor_id: sensor id.
 37            control_db_table_name: `db.table` to control sensor runs.
 39        Returns:
 40            True if acquired new data, otherwise False
 41        """
 42        sensor_table_data = cls.read_sensor_table_data(
 43            sensor_id=sensor_id, control_db_table_name=control_db_table_name
 44        )
 45"sensor_table_data = {sensor_table_data}")
 47        return (
 48            sensor_table_data is not None
 49            and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value
 50        )
 52    @classmethod
 53    def update_sensor_status(
 54        cls,
 55        sensor_spec: SensorSpec,
 56        status: str,
 57        upstream_key: str = None,
 58        upstream_value: str = None,
 59    ) -> None:
 60        """Control sensor execution storing the execution data in a delta table.
 62        Args:
 63            sensor_spec: sensor spec containing all sensor
 64                information we need to update the control status.
 65            status: status of the sensor.
 66            upstream_key: upstream key (e.g., used to store an attribute
 67                name from the upstream so that new data can be detected
 68                automatically).
 69            upstream_value: upstream value (e.g., used to store the max
 70                attribute value from the upstream so that new data can be
 71                detected automatically).
 72        """
 74            f"Updating sensor status for sensor {sensor_spec.sensor_id}..."
 75        )
 77        data = cls._convert_sensor_to_data(
 78            spec=sensor_spec,
 79            status=status,
 80            upstream_key=upstream_key,
 81            upstream_value=upstream_value,
 82        )
 84        sensor_update_set = cls._get_sensor_update_set(
 85            assets=sensor_spec.assets,
 86            checkpoint_location=sensor_spec.checkpoint_location,
 87            upstream_key=upstream_key,
 88            upstream_value=upstream_value,
 89        )
 91        cls._update_sensor_control(
 92            data=data,
 93            sensor_update_set=sensor_update_set,
 94            sensor_control_table=sensor_spec.control_db_table_name,
 95            sensor_id=sensor_spec.sensor_id,
 96        )
 98    @classmethod
 99    def _update_sensor_control(
100        cls,
101        data: List[dict],
102        sensor_update_set: dict,
103        sensor_control_table: str,
104        sensor_id: str,
105    ) -> None:
106        """Update sensor control delta table.
108        Args:
109            data: to be updated.
110            sensor_update_set: columns which we had update.
111            sensor_control_table: control table name.
112            sensor_id: sensor_id to be updated.
113        """
114        sensors_delta_table = DeltaTable.forName(
115            ExecEnv.SESSION,
116            sensor_control_table,
117        )
118        sensors_updates = ExecEnv.SESSION.createDataFrame(data, SENSOR_SCHEMA)
119        sensors_delta_table.alias("sensors").merge(
120            sensors_updates.alias("updates"),
121            f"sensors.sensor_id = '{sensor_id}' AND "
122            "sensors.sensor_id = updates.sensor_id",
123        ).whenMatchedUpdate(set=sensor_update_set).whenNotMatchedInsertAll().execute()
125    @classmethod
126    def _convert_sensor_to_data(
127        cls,
128        spec: SensorSpec,
129        status: str,
130        upstream_key: str,
131        upstream_value: str,
132        status_change_timestamp: Optional[datetime] = None,
133    ) -> List[dict]:
134        """Convert sensor data to dataframe input data.
136        Args:
137            spec: sensor spec containing sensor identifier data.
138            status: new sensor data status.
139            upstream_key: key used to acquired data from the upstream.
140            upstream_value: max value from the upstream_key
141                acquired from the upstream.
142            status_change_timestamp: timestamp we commit
143                this change in the sensor control table.
145        Return:
146            Sensor data as list[dict], used to create a
147                dataframe to store the data into the sensor_control_table.
148        """
149        status_change_timestamp = (
151            if status_change_timestamp is None
152            else status_change_timestamp
153        )
154        return [
155            {
156                "sensor_id": spec.sensor_id,
157                "assets": spec.assets,
158                "status": status,
159                "status_change_timestamp": status_change_timestamp,
160                "checkpoint_location": spec.checkpoint_location,
161                "upstream_key": str(upstream_key),
162                "upstream_value": str(upstream_value),
163            }
164        ]
166    @classmethod
167    def _get_sensor_update_set(cls, **kwargs: Union[Optional[str], List[str]]) -> dict:
168        """Get the sensor update set.
170        Args:
171            kwargs: Containing the following keys:
172            - assets
173            - checkpoint_location
174            - upstream_key
175            - upstream_value
177        Return:
178            A set containing the fields to update in the control_table.
179        """
180        sensor_update_set = dict(SENSOR_UPDATE_SET)
181        for key, value in kwargs.items():
182            if value:
183                sensor_update_set[f"sensors.{key}"] = f"updates.{key}"
185        return sensor_update_set
187    @classmethod
188    def read_sensor_table_data(
189        cls,
190        control_db_table_name: str,
191        sensor_id: str = None,
192        assets: list = None,
193    ) -> Optional[Row]:
194        """Read data from delta table containing sensor status info.
196        Args:
197            sensor_id: sensor id. If this parameter is defined search occurs
198                only considering this parameter. Otherwise, it considers sensor
199                assets and checkpoint location.
200            control_db_table_name: db.table to control sensor runs.
201            assets: list of assets that are fueled by the pipeline
202                where this sensor is.
204        Return:
205            Row containing the data for the provided sensor_id.
206        """
207        df = DeltaTable.forName(
208            ExecEnv.SESSION,
209            control_db_table_name,
210        ).toDF()
212        if sensor_id:
213            df = df.where(col("sensor_id") == sensor_id)
214        elif assets:
215            df = df.where(col("assets") == array(*[lit(asset) for asset in assets]))
216        else:
217            raise ValueError(
218                "Either sensor_id or assets need to be provided as arguments."
219            )
221        return df.first()
224class SensorUpstreamManager(object):
225    """Class to deal with Sensor Upstream data."""
227    _LOGGER = LoggingHandler(__name__).get_logger()
229    @classmethod
230    def generate_filter_exp_query(
231        cls,
232        sensor_id: str,
233        filter_exp: str,
234        control_db_table_name: str = None,
235        upstream_key: str = None,
236        upstream_value: str = None,
237        upstream_table_name: str = None,
238    ) -> str:
239        """Generates a sensor preprocess query based on timestamp logic.
241        Args:
242            sensor_id: sensor id.
243            filter_exp: expression to filter incoming new data.
244                You can use the placeholder `?upstream_value` so that
245                it can be replaced by the upstream_value in the
246                control_db_table_name for this specific sensor_id.
247            control_db_table_name: db.table to retrieve the last status change
248                timestamp. This is only relevant for the jdbc sensor.
249            upstream_key: the key of custom sensor information
250                to control how to identify new data from the
251                upstream (e.g., a time column in the upstream).
252            upstream_value: value for custom sensor
253                to identify new data from the upstream
254                (e.g., the value of a time present in the upstream)
255                If none we will set the default value.
256                Note: This parameter is used just to override the
257                default value `-2147483647`.
258            upstream_table_name: value for custom sensor
259                to query new data from the upstream.
260                If none we will set the default value,
261                our `sensor_new_data` view.
263        Return:
264            The query string.
265        """
266        source_table = upstream_table_name if upstream_table_name else "sensor_new_data"
267        select_exp = "SELECT COUNT(1) as count"
268        if control_db_table_name:
269            if not upstream_key:
270                raise ValueError(
271                    "If control_db_table_name is defined, upstream_key should "
272                    "also be defined!"
273                )
275            default_upstream_value: str = "-2147483647"
276            trigger_name = upstream_key
277            trigger_value = (
278                default_upstream_value if upstream_value is None else upstream_value
279            )
280            sensor_table_data = SensorControlTableManager.read_sensor_table_data(
281                sensor_id=sensor_id, control_db_table_name=control_db_table_name
282            )
284            if sensor_table_data and sensor_table_data.upstream_value:
285                trigger_value = sensor_table_data.upstream_value
287            filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace(
288                "?upstream_value", trigger_value
289            )
290            select_exp = (
291                f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, "
292                f"max({trigger_name}) as UPSTREAM_VALUE"
293            )
295        query = (
296            f"{select_exp} "
297            f"FROM {source_table} "
298            f"WHERE {filter_exp} "
299            f"HAVING COUNT(1) > 0"
300        )
302        return query
304    @classmethod
305    def generate_sensor_table_preprocess_query(
306        cls,
307        sensor_id: str,
308    ) -> str:
309        """Generates a query to be used for a sensor having other sensor as upstream.
311        Args:
312            sensor_id: sensor id.
314        Return:
315            The query string.
316        """
317        query = (
318            f"SELECT * "  # nosec
319            f"FROM sensor_new_data "
320            f"WHERE"
321            f" _change_type in ('insert', 'update_postimage')"
322            f" and sensor_id = '{sensor_id}'"
323            f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'"
324        )
326        return query
328    @classmethod
329    def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame:
330        """Read new data from the upstream into the sensor 'new_data_df'.
332        Args:
333            sensor_spec: sensor spec containing all sensor information.
335        Return:
336            An empty dataframe if it doesn't have new data otherwise the new data
337        """
338        new_data_df = ReaderFactory.get_data(sensor_spec.input_spec)
340        if sensor_spec.preprocess_query:
341            new_data_df.createOrReplaceTempView("sensor_new_data")
342            new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query)
344        return new_data_df
346    @classmethod
347    def get_new_data(
348        cls,
349        new_data_df: DataFrame,
350    ) -> Optional[Row]:
351        """Get new data from upstream df if it's present.
353        Args:
354            new_data_df: DataFrame possibly containing new data.
356        Return:
357            Optional row, present if there is new data in the upstream,
358            absent otherwise.
359        """
360        return new_data_df.first()
362    @classmethod
363    def generate_sensor_sap_logchain_query(
364        cls,
365        chain_id: str,
366        dbtable: str = SAPLogchain.DBTABLE.value,
367        status: str = SAPLogchain.GREEN_STATUS.value,
368        engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
369    ) -> str:
370        """Generates a sensor query based in the SAP Logchain table.
372        Args:
373            chain_id: chain id to query the status on SAP.
374            dbtable: db.table to retrieve the data to
375                check if the sap chain is already finished.
376            status: db.table to retrieve the last status change
377                timestamp.
378            engine_table_name: table name exposed with the SAP LOGCHAIN data.
379                This table will be used in the jdbc query.
381        Return:
382            The query string.
383        """
384        if not chain_id:
385            raise ValueError(
386                "To query on log chain SAP table the chain id should be defined!"
387            )
389        select_exp = (
391        )
392        filter_exp = (
393            f"UPPER(CHAIN_ID) = UPPER('{chain_id}') "
394            f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')"
395        )
397        query = (
398            f"WITH {engine_table_name} AS ("
399            f"{select_exp} "
400            f"FROM {dbtable} "
401            f"WHERE {filter_exp}"
402            ")"
403        )
405        return query
class SensorControlTableManager:
 23class SensorControlTableManager(object):
 24    """Class to control the Sensor execution."""
 26    _LOGGER = LoggingHandler(__name__).get_logger()
 28    @classmethod
 29    def check_if_sensor_has_acquired_data(
 30        cls,
 31        sensor_id: str,
 32        control_db_table_name: str,
 33    ) -> bool:
 34        """Check if sensor has acquired new data.
 36        Args:
 37            sensor_id: sensor id.
 38            control_db_table_name: `db.table` to control sensor runs.
 40        Returns:
 41            True if acquired new data, otherwise False
 42        """
 43        sensor_table_data = cls.read_sensor_table_data(
 44            sensor_id=sensor_id, control_db_table_name=control_db_table_name
 45        )
 46"sensor_table_data = {sensor_table_data}")
 48        return (
 49            sensor_table_data is not None
 50            and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value
 51        )
 53    @classmethod
 54    def update_sensor_status(
 55        cls,
 56        sensor_spec: SensorSpec,
 57        status: str,
 58        upstream_key: str = None,
 59        upstream_value: str = None,
 60    ) -> None:
 61        """Control sensor execution storing the execution data in a delta table.
 63        Args:
 64            sensor_spec: sensor spec containing all sensor
 65                information we need to update the control status.
 66            status: status of the sensor.
 67            upstream_key: upstream key (e.g., used to store an attribute
 68                name from the upstream so that new data can be detected
 69                automatically).
 70            upstream_value: upstream value (e.g., used to store the max
 71                attribute value from the upstream so that new data can be
 72                detected automatically).
 73        """
 75            f"Updating sensor status for sensor {sensor_spec.sensor_id}..."
 76        )
 78        data = cls._convert_sensor_to_data(
 79            spec=sensor_spec,
 80            status=status,
 81            upstream_key=upstream_key,
 82            upstream_value=upstream_value,
 83        )
 85        sensor_update_set = cls._get_sensor_update_set(
 86            assets=sensor_spec.assets,
 87            checkpoint_location=sensor_spec.checkpoint_location,
 88            upstream_key=upstream_key,
 89            upstream_value=upstream_value,
 90        )
 92        cls._update_sensor_control(
 93            data=data,
 94            sensor_update_set=sensor_update_set,
 95            sensor_control_table=sensor_spec.control_db_table_name,
 96            sensor_id=sensor_spec.sensor_id,
 97        )
 99    @classmethod
100    def _update_sensor_control(
101        cls,
102        data: List[dict],
103        sensor_update_set: dict,
104        sensor_control_table: str,
105        sensor_id: str,
106    ) -> None:
107        """Update sensor control delta table.
109        Args:
110            data: to be updated.
111            sensor_update_set: columns which we had update.
112            sensor_control_table: control table name.
113            sensor_id: sensor_id to be updated.
114        """
115        sensors_delta_table = DeltaTable.forName(
116            ExecEnv.SESSION,
117            sensor_control_table,
118        )
119        sensors_updates = ExecEnv.SESSION.createDataFrame(data, SENSOR_SCHEMA)
120        sensors_delta_table.alias("sensors").merge(
121            sensors_updates.alias("updates"),
122            f"sensors.sensor_id = '{sensor_id}' AND "
123            "sensors.sensor_id = updates.sensor_id",
124        ).whenMatchedUpdate(set=sensor_update_set).whenNotMatchedInsertAll().execute()
126    @classmethod
127    def _convert_sensor_to_data(
128        cls,
129        spec: SensorSpec,
130        status: str,
131        upstream_key: str,
132        upstream_value: str,
133        status_change_timestamp: Optional[datetime] = None,
134    ) -> List[dict]:
135        """Convert sensor data to dataframe input data.
137        Args:
138            spec: sensor spec containing sensor identifier data.
139            status: new sensor data status.
140            upstream_key: key used to acquired data from the upstream.
141            upstream_value: max value from the upstream_key
142                acquired from the upstream.
143            status_change_timestamp: timestamp we commit
144                this change in the sensor control table.
146        Return:
147            Sensor data as list[dict], used to create a
148                dataframe to store the data into the sensor_control_table.
149        """
150        status_change_timestamp = (
152            if status_change_timestamp is None
153            else status_change_timestamp
154        )
155        return [
156            {
157                "sensor_id": spec.sensor_id,
158                "assets": spec.assets,
159                "status": status,
160                "status_change_timestamp": status_change_timestamp,
161                "checkpoint_location": spec.checkpoint_location,
162                "upstream_key": str(upstream_key),
163                "upstream_value": str(upstream_value),
164            }
165        ]
167    @classmethod
168    def _get_sensor_update_set(cls, **kwargs: Union[Optional[str], List[str]]) -> dict:
169        """Get the sensor update set.
171        Args:
172            kwargs: Containing the following keys:
173            - assets
174            - checkpoint_location
175            - upstream_key
176            - upstream_value
178        Return:
179            A set containing the fields to update in the control_table.
180        """
181        sensor_update_set = dict(SENSOR_UPDATE_SET)
182        for key, value in kwargs.items():
183            if value:
184                sensor_update_set[f"sensors.{key}"] = f"updates.{key}"
186        return sensor_update_set
188    @classmethod
189    def read_sensor_table_data(
190        cls,
191        control_db_table_name: str,
192        sensor_id: str = None,
193        assets: list = None,
194    ) -> Optional[Row]:
195        """Read data from delta table containing sensor status info.
197        Args:
198            sensor_id: sensor id. If this parameter is defined search occurs
199                only considering this parameter. Otherwise, it considers sensor
200                assets and checkpoint location.
201            control_db_table_name: db.table to control sensor runs.
202            assets: list of assets that are fueled by the pipeline
203                where this sensor is.
205        Return:
206            Row containing the data for the provided sensor_id.
207        """
208        df = DeltaTable.forName(
209            ExecEnv.SESSION,
210            control_db_table_name,
211        ).toDF()
213        if sensor_id:
214            df = df.where(col("sensor_id") == sensor_id)
215        elif assets:
216            df = df.where(col("assets") == array(*[lit(asset) for asset in assets]))
217        else:
218            raise ValueError(
219                "Either sensor_id or assets need to be provided as arguments."
220            )
222        return df.first()

Class to control the Sensor execution.

def check_if_sensor_has_acquired_data(cls, sensor_id: str, control_db_table_name: str) -> bool:
28    @classmethod
29    def check_if_sensor_has_acquired_data(
30        cls,
31        sensor_id: str,
32        control_db_table_name: str,
33    ) -> bool:
34        """Check if sensor has acquired new data.
36        Args:
37            sensor_id: sensor id.
38            control_db_table_name: `db.table` to control sensor runs.
40        Returns:
41            True if acquired new data, otherwise False
42        """
43        sensor_table_data = cls.read_sensor_table_data(
44            sensor_id=sensor_id, control_db_table_name=control_db_table_name
45        )
46"sensor_table_data = {sensor_table_data}")
48        return (
49            sensor_table_data is not None
50            and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value
51        )

Check if sensor has acquired new data.

  • sensor_id: sensor id.
  • control_db_table_name: db.table to control sensor runs.

True if acquired new data, otherwise False

def update_sensor_status( cls, sensor_spec: lakehouse_engine.core.definitions.SensorSpec, status: str, upstream_key: str = None, upstream_value: str = None) -> None:
53    @classmethod
54    def update_sensor_status(
55        cls,
56        sensor_spec: SensorSpec,
57        status: str,
58        upstream_key: str = None,
59        upstream_value: str = None,
60    ) -> None:
61        """Control sensor execution storing the execution data in a delta table.
63        Args:
64            sensor_spec: sensor spec containing all sensor
65                information we need to update the control status.
66            status: status of the sensor.
67            upstream_key: upstream key (e.g., used to store an attribute
68                name from the upstream so that new data can be detected
69                automatically).
70            upstream_value: upstream value (e.g., used to store the max
71                attribute value from the upstream so that new data can be
72                detected automatically).
73        """
75            f"Updating sensor status for sensor {sensor_spec.sensor_id}..."
76        )
78        data = cls._convert_sensor_to_data(
79            spec=sensor_spec,
80            status=status,
81            upstream_key=upstream_key,
82            upstream_value=upstream_value,
83        )
85        sensor_update_set = cls._get_sensor_update_set(
86            assets=sensor_spec.assets,
87            checkpoint_location=sensor_spec.checkpoint_location,
88            upstream_key=upstream_key,
89            upstream_value=upstream_value,
90        )
92        cls._update_sensor_control(
93            data=data,
94            sensor_update_set=sensor_update_set,
95            sensor_control_table=sensor_spec.control_db_table_name,
96            sensor_id=sensor_spec.sensor_id,
97        )

Control sensor execution storing the execution data in a delta table.

  • sensor_spec: sensor spec containing all sensor information we need to update the control status.
  • status: status of the sensor.
  • upstream_key: upstream key (e.g., used to store an attribute name from the upstream so that new data can be detected automatically).
  • upstream_value: upstream value (e.g., used to store the max attribute value from the upstream so that new data can be detected automatically).
def read_sensor_table_data( cls, control_db_table_name: str, sensor_id: str = None, assets: list = None) -> Optional[pyspark.sql.types.Row]:
188    @classmethod
189    def read_sensor_table_data(
190        cls,
191        control_db_table_name: str,
192        sensor_id: str = None,
193        assets: list = None,
194    ) -> Optional[Row]:
195        """Read data from delta table containing sensor status info.
197        Args:
198            sensor_id: sensor id. If this parameter is defined search occurs
199                only considering this parameter. Otherwise, it considers sensor
200                assets and checkpoint location.
201            control_db_table_name: db.table to control sensor runs.
202            assets: list of assets that are fueled by the pipeline
203                where this sensor is.
205        Return:
206            Row containing the data for the provided sensor_id.
207        """
208        df = DeltaTable.forName(
209            ExecEnv.SESSION,
210            control_db_table_name,
211        ).toDF()
213        if sensor_id:
214            df = df.where(col("sensor_id") == sensor_id)
215        elif assets:
216            df = df.where(col("assets") == array(*[lit(asset) for asset in assets]))
217        else:
218            raise ValueError(
219                "Either sensor_id or assets need to be provided as arguments."
220            )
222        return df.first()

Read data from delta table containing sensor status info.

  • sensor_id: sensor id. If this parameter is defined search occurs only considering this parameter. Otherwise, it considers sensor assets and checkpoint location.
  • control_db_table_name: db.table to control sensor runs.
  • assets: list of assets that are fueled by the pipeline where this sensor is.

Row containing the data for the provided sensor_id.

class SensorUpstreamManager:
225class SensorUpstreamManager(object):
226    """Class to deal with Sensor Upstream data."""
228    _LOGGER = LoggingHandler(__name__).get_logger()
230    @classmethod
231    def generate_filter_exp_query(
232        cls,
233        sensor_id: str,
234        filter_exp: str,
235        control_db_table_name: str = None,
236        upstream_key: str = None,
237        upstream_value: str = None,
238        upstream_table_name: str = None,
239    ) -> str:
240        """Generates a sensor preprocess query based on timestamp logic.
242        Args:
243            sensor_id: sensor id.
244            filter_exp: expression to filter incoming new data.
245                You can use the placeholder `?upstream_value` so that
246                it can be replaced by the upstream_value in the
247                control_db_table_name for this specific sensor_id.
248            control_db_table_name: db.table to retrieve the last status change
249                timestamp. This is only relevant for the jdbc sensor.
250            upstream_key: the key of custom sensor information
251                to control how to identify new data from the
252                upstream (e.g., a time column in the upstream).
253            upstream_value: value for custom sensor
254                to identify new data from the upstream
255                (e.g., the value of a time present in the upstream)
256                If none we will set the default value.
257                Note: This parameter is used just to override the
258                default value `-2147483647`.
259            upstream_table_name: value for custom sensor
260                to query new data from the upstream.
261                If none we will set the default value,
262                our `sensor_new_data` view.
264        Return:
265            The query string.
266        """
267        source_table = upstream_table_name if upstream_table_name else "sensor_new_data"
268        select_exp = "SELECT COUNT(1) as count"
269        if control_db_table_name:
270            if not upstream_key:
271                raise ValueError(
272                    "If control_db_table_name is defined, upstream_key should "
273                    "also be defined!"
274                )
276            default_upstream_value: str = "-2147483647"
277            trigger_name = upstream_key
278            trigger_value = (
279                default_upstream_value if upstream_value is None else upstream_value
280            )
281            sensor_table_data = SensorControlTableManager.read_sensor_table_data(
282                sensor_id=sensor_id, control_db_table_name=control_db_table_name
283            )
285            if sensor_table_data and sensor_table_data.upstream_value:
286                trigger_value = sensor_table_data.upstream_value
288            filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace(
289                "?upstream_value", trigger_value
290            )
291            select_exp = (
292                f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, "
293                f"max({trigger_name}) as UPSTREAM_VALUE"
294            )
296        query = (
297            f"{select_exp} "
298            f"FROM {source_table} "
299            f"WHERE {filter_exp} "
300            f"HAVING COUNT(1) > 0"
301        )
303        return query
305    @classmethod
306    def generate_sensor_table_preprocess_query(
307        cls,
308        sensor_id: str,
309    ) -> str:
310        """Generates a query to be used for a sensor having other sensor as upstream.
312        Args:
313            sensor_id: sensor id.
315        Return:
316            The query string.
317        """
318        query = (
319            f"SELECT * "  # nosec
320            f"FROM sensor_new_data "
321            f"WHERE"
322            f" _change_type in ('insert', 'update_postimage')"
323            f" and sensor_id = '{sensor_id}'"
324            f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'"
325        )
327        return query
329    @classmethod
330    def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame:
331        """Read new data from the upstream into the sensor 'new_data_df'.
333        Args:
334            sensor_spec: sensor spec containing all sensor information.
336        Return:
337            An empty dataframe if it doesn't have new data otherwise the new data
338        """
339        new_data_df = ReaderFactory.get_data(sensor_spec.input_spec)
341        if sensor_spec.preprocess_query:
342            new_data_df.createOrReplaceTempView("sensor_new_data")
343            new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query)
345        return new_data_df
347    @classmethod
348    def get_new_data(
349        cls,
350        new_data_df: DataFrame,
351    ) -> Optional[Row]:
352        """Get new data from upstream df if it's present.
354        Args:
355            new_data_df: DataFrame possibly containing new data.
357        Return:
358            Optional row, present if there is new data in the upstream,
359            absent otherwise.
360        """
361        return new_data_df.first()
363    @classmethod
364    def generate_sensor_sap_logchain_query(
365        cls,
366        chain_id: str,
367        dbtable: str = SAPLogchain.DBTABLE.value,
368        status: str = SAPLogchain.GREEN_STATUS.value,
369        engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
370    ) -> str:
371        """Generates a sensor query based in the SAP Logchain table.
373        Args:
374            chain_id: chain id to query the status on SAP.
375            dbtable: db.table to retrieve the data to
376                check if the sap chain is already finished.
377            status: db.table to retrieve the last status change
378                timestamp.
379            engine_table_name: table name exposed with the SAP LOGCHAIN data.
380                This table will be used in the jdbc query.
382        Return:
383            The query string.
384        """
385        if not chain_id:
386            raise ValueError(
387                "To query on log chain SAP table the chain id should be defined!"
388            )
390        select_exp = (
392        )
393        filter_exp = (
394            f"UPPER(CHAIN_ID) = UPPER('{chain_id}') "
395            f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')"
396        )
398        query = (
399            f"WITH {engine_table_name} AS ("
400            f"{select_exp} "
401            f"FROM {dbtable} "
402            f"WHERE {filter_exp}"
403            ")"
404        )
406        return query

Class to deal with Sensor Upstream data.

def generate_filter_exp_query( cls, sensor_id: str, filter_exp: str, control_db_table_name: str = None, upstream_key: str = None, upstream_value: str = None, upstream_table_name: str = None) -> str:
230    @classmethod
231    def generate_filter_exp_query(
232        cls,
233        sensor_id: str,
234        filter_exp: str,
235        control_db_table_name: str = None,
236        upstream_key: str = None,
237        upstream_value: str = None,
238        upstream_table_name: str = None,
239    ) -> str:
240        """Generates a sensor preprocess query based on timestamp logic.
242        Args:
243            sensor_id: sensor id.
244            filter_exp: expression to filter incoming new data.
245                You can use the placeholder `?upstream_value` so that
246                it can be replaced by the upstream_value in the
247                control_db_table_name for this specific sensor_id.
248            control_db_table_name: db.table to retrieve the last status change
249                timestamp. This is only relevant for the jdbc sensor.
250            upstream_key: the key of custom sensor information
251                to control how to identify new data from the
252                upstream (e.g., a time column in the upstream).
253            upstream_value: value for custom sensor
254                to identify new data from the upstream
255                (e.g., the value of a time present in the upstream)
256                If none we will set the default value.
257                Note: This parameter is used just to override the
258                default value `-2147483647`.
259            upstream_table_name: value for custom sensor
260                to query new data from the upstream.
261                If none we will set the default value,
262                our `sensor_new_data` view.
264        Return:
265            The query string.
266        """
267        source_table = upstream_table_name if upstream_table_name else "sensor_new_data"
268        select_exp = "SELECT COUNT(1) as count"
269        if control_db_table_name:
270            if not upstream_key:
271                raise ValueError(
272                    "If control_db_table_name is defined, upstream_key should "
273                    "also be defined!"
274                )
276            default_upstream_value: str = "-2147483647"
277            trigger_name = upstream_key
278            trigger_value = (
279                default_upstream_value if upstream_value is None else upstream_value
280            )
281            sensor_table_data = SensorControlTableManager.read_sensor_table_data(
282                sensor_id=sensor_id, control_db_table_name=control_db_table_name
283            )
285            if sensor_table_data and sensor_table_data.upstream_value:
286                trigger_value = sensor_table_data.upstream_value
288            filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace(
289                "?upstream_value", trigger_value
290            )
291            select_exp = (
292                f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, "
293                f"max({trigger_name}) as UPSTREAM_VALUE"
294            )
296        query = (
297            f"{select_exp} "
298            f"FROM {source_table} "
299            f"WHERE {filter_exp} "
300            f"HAVING COUNT(1) > 0"
301        )
303        return query

Generates a sensor preprocess query based on timestamp logic.

  • sensor_id: sensor id.
  • filter_exp: expression to filter incoming new data. You can use the placeholder ?upstream_value so that it can be replaced by the upstream_value in the control_db_table_name for this specific sensor_id.
  • control_db_table_name: db.table to retrieve the last status change timestamp. This is only relevant for the jdbc sensor.
  • upstream_key: the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream).
  • upstream_value: value for custom sensor to identify new data from the upstream (e.g., the value of a time present in the upstream) If none we will set the default value. Note: This parameter is used just to override the default value -2147483647.
  • upstream_table_name: value for custom sensor to query new data from the upstream. If none we will set the default value, our sensor_new_data view.

The query string.

def generate_sensor_table_preprocess_query(cls, sensor_id: str) -> str:
305    @classmethod
306    def generate_sensor_table_preprocess_query(
307        cls,
308        sensor_id: str,
309    ) -> str:
310        """Generates a query to be used for a sensor having other sensor as upstream.
312        Args:
313            sensor_id: sensor id.
315        Return:
316            The query string.
317        """
318        query = (
319            f"SELECT * "  # nosec
320            f"FROM sensor_new_data "
321            f"WHERE"
322            f" _change_type in ('insert', 'update_postimage')"
323            f" and sensor_id = '{sensor_id}'"
324            f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'"
325        )
327        return query

Generates a query to be used for a sensor having other sensor as upstream.

  • sensor_id: sensor id.

The query string.

def read_new_data( cls, sensor_spec: lakehouse_engine.core.definitions.SensorSpec) -> pyspark.sql.dataframe.DataFrame:
329    @classmethod
330    def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame:
331        """Read new data from the upstream into the sensor 'new_data_df'.
333        Args:
334            sensor_spec: sensor spec containing all sensor information.
336        Return:
337            An empty dataframe if it doesn't have new data otherwise the new data
338        """
339        new_data_df = ReaderFactory.get_data(sensor_spec.input_spec)
341        if sensor_spec.preprocess_query:
342            new_data_df.createOrReplaceTempView("sensor_new_data")
343            new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query)
345        return new_data_df

Read new data from the upstream into the sensor 'new_data_df'.

  • sensor_spec: sensor spec containing all sensor information.

An empty dataframe if it doesn't have new data otherwise the new data

def get_new_data( cls, new_data_df: pyspark.sql.dataframe.DataFrame) -> Optional[pyspark.sql.types.Row]:
347    @classmethod
348    def get_new_data(
349        cls,
350        new_data_df: DataFrame,
351    ) -> Optional[Row]:
352        """Get new data from upstream df if it's present.
354        Args:
355            new_data_df: DataFrame possibly containing new data.
357        Return:
358            Optional row, present if there is new data in the upstream,
359            absent otherwise.
360        """
361        return new_data_df.first()

Get new data from upstream df if it's present.

  • new_data_df: DataFrame possibly containing new data.

Optional row, present if there is new data in the upstream, absent otherwise.

def generate_sensor_sap_logchain_query( cls, chain_id: str, dbtable: str = 'SAPPHA.RSPCLOGCHAIN', status: str = 'G', engine_table_name: str = 'sensor_new_data') -> str:
363    @classmethod
364    def generate_sensor_sap_logchain_query(
365        cls,
366        chain_id: str,
367        dbtable: str = SAPLogchain.DBTABLE.value,
368        status: str = SAPLogchain.GREEN_STATUS.value,
369        engine_table_name: str = SAPLogchain.ENGINE_TABLE.value,
370    ) -> str:
371        """Generates a sensor query based in the SAP Logchain table.
373        Args:
374            chain_id: chain id to query the status on SAP.
375            dbtable: db.table to retrieve the data to
376                check if the sap chain is already finished.
377            status: db.table to retrieve the last status change
378                timestamp.
379            engine_table_name: table name exposed with the SAP LOGCHAIN data.
380                This table will be used in the jdbc query.
382        Return:
383            The query string.
384        """
385        if not chain_id:
386            raise ValueError(
387                "To query on log chain SAP table the chain id should be defined!"
388            )
390        select_exp = (
392        )
393        filter_exp = (
394            f"UPPER(CHAIN_ID) = UPPER('{chain_id}') "
395            f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')"
396        )
398        query = (
399            f"WITH {engine_table_name} AS ("
400            f"{select_exp} "
401            f"FROM {dbtable} "
402            f"WHERE {filter_exp}"
403            ")"
404        )
406        return query

Generates a sensor query based in the SAP Logchain table.

  • chain_id: chain id to query the status on SAP.
  • dbtable: db.table to retrieve the data to check if the sap chain is already finished.
  • status: db.table to retrieve the last status change timestamp.
  • engine_table_name: table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query.

The query string.