lakehouse_engine.core.sensor_manager
Module to define Sensor Manager classes.
1"""Module to define Sensor Manager classes.""" 2 3from datetime import datetime 4from typing import List, Optional, Union 5 6from delta.tables import DeltaTable 7from pyspark.sql import DataFrame, Row 8from pyspark.sql.functions import array, col, lit 9 10from lakehouse_engine.core.definitions import ( 11 SENSOR_SCHEMA, 12 SENSOR_UPDATE_SET, 13 SAPLogchain, 14 SensorSpec, 15 SensorStatus, 16) 17from lakehouse_engine.core.exec_env import ExecEnv 18from lakehouse_engine.io.reader_factory import ReaderFactory 19from lakehouse_engine.utils.logging_handler import LoggingHandler 20 21 22class SensorControlTableManager(object): 23 """Class to control the Sensor execution.""" 24 25 _LOGGER = LoggingHandler(__name__).get_logger() 26 27 @classmethod 28 def check_if_sensor_has_acquired_data( 29 cls, 30 sensor_id: str, 31 control_db_table_name: str, 32 ) -> bool: 33 """Check if sensor has acquired new data. 34 35 Args: 36 sensor_id: sensor id. 37 control_db_table_name: `db.table` to control sensor runs. 38 39 Returns: 40 True if acquired new data, otherwise False 41 """ 42 sensor_table_data = cls.read_sensor_table_data( 43 sensor_id=sensor_id, control_db_table_name=control_db_table_name 44 ) 45 cls._LOGGER.info(f"sensor_table_data = {sensor_table_data}") 46 47 return ( 48 sensor_table_data is not None 49 and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value 50 ) 51 52 @classmethod 53 def update_sensor_status( 54 cls, 55 sensor_spec: SensorSpec, 56 status: str, 57 upstream_key: str = None, 58 upstream_value: str = None, 59 ) -> None: 60 """Control sensor execution storing the execution data in a delta table. 61 62 Args: 63 sensor_spec: sensor spec containing all sensor 64 information we need to update the control status. 65 status: status of the sensor. 66 upstream_key: upstream key (e.g., used to store an attribute 67 name from the upstream so that new data can be detected 68 automatically). 69 upstream_value: upstream value (e.g., used to store the max 70 attribute value from the upstream so that new data can be 71 detected automatically). 72 """ 73 cls._LOGGER.info( 74 f"Updating sensor status for sensor {sensor_spec.sensor_id}..." 75 ) 76 77 data = cls._convert_sensor_to_data( 78 spec=sensor_spec, 79 status=status, 80 upstream_key=upstream_key, 81 upstream_value=upstream_value, 82 ) 83 84 sensor_update_set = cls._get_sensor_update_set( 85 assets=sensor_spec.assets, 86 checkpoint_location=sensor_spec.checkpoint_location, 87 upstream_key=upstream_key, 88 upstream_value=upstream_value, 89 ) 90 91 cls._update_sensor_control( 92 data=data, 93 sensor_update_set=sensor_update_set, 94 sensor_control_table=sensor_spec.control_db_table_name, 95 sensor_id=sensor_spec.sensor_id, 96 ) 97 98 @classmethod 99 def _update_sensor_control( 100 cls, 101 data: List[dict], 102 sensor_update_set: dict, 103 sensor_control_table: str, 104 sensor_id: str, 105 ) -> None: 106 """Update sensor control delta table. 107 108 Args: 109 data: to be updated. 110 sensor_update_set: columns which we had update. 111 sensor_control_table: control table name. 112 sensor_id: sensor_id to be updated. 113 """ 114 sensors_delta_table = DeltaTable.forName( 115 ExecEnv.SESSION, 116 sensor_control_table, 117 ) 118 sensors_updates = ExecEnv.SESSION.createDataFrame(data, SENSOR_SCHEMA) 119 sensors_delta_table.alias("sensors").merge( 120 sensors_updates.alias("updates"), 121 f"sensors.sensor_id = '{sensor_id}' AND " 122 "sensors.sensor_id = updates.sensor_id", 123 ).whenMatchedUpdate(set=sensor_update_set).whenNotMatchedInsertAll().execute() 124 125 @classmethod 126 def _convert_sensor_to_data( 127 cls, 128 spec: SensorSpec, 129 status: str, 130 upstream_key: str, 131 upstream_value: str, 132 status_change_timestamp: Optional[datetime] = None, 133 ) -> List[dict]: 134 """Convert sensor data to dataframe input data. 135 136 Args: 137 spec: sensor spec containing sensor identifier data. 138 status: new sensor data status. 139 upstream_key: key used to acquired data from the upstream. 140 upstream_value: max value from the upstream_key 141 acquired from the upstream. 142 status_change_timestamp: timestamp we commit 143 this change in the sensor control table. 144 145 Return: 146 Sensor data as list[dict], used to create a 147 dataframe to store the data into the sensor_control_table. 148 """ 149 status_change_timestamp = ( 150 datetime.now() 151 if status_change_timestamp is None 152 else status_change_timestamp 153 ) 154 return [ 155 { 156 "sensor_id": spec.sensor_id, 157 "assets": spec.assets, 158 "status": status, 159 "status_change_timestamp": status_change_timestamp, 160 "checkpoint_location": spec.checkpoint_location, 161 "upstream_key": str(upstream_key), 162 "upstream_value": str(upstream_value), 163 } 164 ] 165 166 @classmethod 167 def _get_sensor_update_set(cls, **kwargs: Union[Optional[str], List[str]]) -> dict: 168 """Get the sensor update set. 169 170 Args: 171 kwargs: Containing the following keys: 172 - assets 173 - checkpoint_location 174 - upstream_key 175 - upstream_value 176 177 Return: 178 A set containing the fields to update in the control_table. 179 """ 180 sensor_update_set = dict(SENSOR_UPDATE_SET) 181 for key, value in kwargs.items(): 182 if value: 183 sensor_update_set[f"sensors.{key}"] = f"updates.{key}" 184 185 return sensor_update_set 186 187 @classmethod 188 def read_sensor_table_data( 189 cls, 190 control_db_table_name: str, 191 sensor_id: str = None, 192 assets: list = None, 193 ) -> Optional[Row]: 194 """Read data from delta table containing sensor status info. 195 196 Args: 197 sensor_id: sensor id. If this parameter is defined search occurs 198 only considering this parameter. Otherwise, it considers sensor 199 assets and checkpoint location. 200 control_db_table_name: db.table to control sensor runs. 201 assets: list of assets that are fueled by the pipeline 202 where this sensor is. 203 204 Return: 205 Row containing the data for the provided sensor_id. 206 """ 207 df = DeltaTable.forName( 208 ExecEnv.SESSION, 209 control_db_table_name, 210 ).toDF() 211 212 if sensor_id: 213 df = df.where(col("sensor_id") == sensor_id) 214 elif assets: 215 df = df.where(col("assets") == array(*[lit(asset) for asset in assets])) 216 else: 217 raise ValueError( 218 "Either sensor_id or assets need to be provided as arguments." 219 ) 220 221 return df.first() 222 223 224class SensorUpstreamManager(object): 225 """Class to deal with Sensor Upstream data.""" 226 227 _LOGGER = LoggingHandler(__name__).get_logger() 228 229 @classmethod 230 def generate_filter_exp_query( 231 cls, 232 sensor_id: str, 233 filter_exp: str, 234 control_db_table_name: str = None, 235 upstream_key: str = None, 236 upstream_value: str = None, 237 upstream_table_name: str = None, 238 ) -> str: 239 """Generates a sensor preprocess query based on timestamp logic. 240 241 Args: 242 sensor_id: sensor id. 243 filter_exp: expression to filter incoming new data. 244 You can use the placeholder `?upstream_value` so that 245 it can be replaced by the upstream_value in the 246 control_db_table_name for this specific sensor_id. 247 control_db_table_name: db.table to retrieve the last status change 248 timestamp. This is only relevant for the jdbc sensor. 249 upstream_key: the key of custom sensor information 250 to control how to identify new data from the 251 upstream (e.g., a time column in the upstream). 252 upstream_value: value for custom sensor 253 to identify new data from the upstream 254 (e.g., the value of a time present in the upstream) 255 If none we will set the default value. 256 Note: This parameter is used just to override the 257 default value `-2147483647`. 258 upstream_table_name: value for custom sensor 259 to query new data from the upstream. 260 If none we will set the default value, 261 our `sensor_new_data` view. 262 263 Return: 264 The query string. 265 """ 266 source_table = upstream_table_name if upstream_table_name else "sensor_new_data" 267 select_exp = "SELECT COUNT(1) as count" 268 if control_db_table_name: 269 if not upstream_key: 270 raise ValueError( 271 "If control_db_table_name is defined, upstream_key should " 272 "also be defined!" 273 ) 274 275 default_upstream_value: str = "-2147483647" 276 trigger_name = upstream_key 277 trigger_value = ( 278 default_upstream_value if upstream_value is None else upstream_value 279 ) 280 sensor_table_data = SensorControlTableManager.read_sensor_table_data( 281 sensor_id=sensor_id, control_db_table_name=control_db_table_name 282 ) 283 284 if sensor_table_data and sensor_table_data.upstream_value: 285 trigger_value = sensor_table_data.upstream_value 286 287 filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace( 288 "?upstream_value", trigger_value 289 ) 290 select_exp = ( 291 f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, " 292 f"max({trigger_name}) as UPSTREAM_VALUE" 293 ) 294 295 query = ( 296 f"{select_exp} " 297 f"FROM {source_table} " 298 f"WHERE {filter_exp} " 299 f"HAVING COUNT(1) > 0" 300 ) 301 302 return query 303 304 @classmethod 305 def generate_sensor_table_preprocess_query( 306 cls, 307 sensor_id: str, 308 ) -> str: 309 """Generates a query to be used for a sensor having other sensor as upstream. 310 311 Args: 312 sensor_id: sensor id. 313 314 Return: 315 The query string. 316 """ 317 query = ( 318 f"SELECT * " # nosec 319 f"FROM sensor_new_data " 320 f"WHERE" 321 f" _change_type in ('insert', 'update_postimage')" 322 f" and sensor_id = '{sensor_id}'" 323 f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'" 324 ) 325 326 return query 327 328 @classmethod 329 def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame: 330 """Read new data from the upstream into the sensor 'new_data_df'. 331 332 Args: 333 sensor_spec: sensor spec containing all sensor information. 334 335 Return: 336 An empty dataframe if it doesn't have new data otherwise the new data 337 """ 338 new_data_df = ReaderFactory.get_data(sensor_spec.input_spec) 339 340 if sensor_spec.preprocess_query: 341 new_data_df.createOrReplaceTempView("sensor_new_data") 342 new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query) 343 344 return new_data_df 345 346 @classmethod 347 def get_new_data( 348 cls, 349 new_data_df: DataFrame, 350 ) -> Optional[Row]: 351 """Get new data from upstream df if it's present. 352 353 Args: 354 new_data_df: DataFrame possibly containing new data. 355 356 Return: 357 Optional row, present if there is new data in the upstream, 358 absent otherwise. 359 """ 360 return new_data_df.first() 361 362 @classmethod 363 def generate_sensor_sap_logchain_query( 364 cls, 365 chain_id: str, 366 dbtable: str = SAPLogchain.DBTABLE.value, 367 status: str = SAPLogchain.GREEN_STATUS.value, 368 engine_table_name: str = SAPLogchain.ENGINE_TABLE.value, 369 ) -> str: 370 """Generates a sensor query based in the SAP Logchain table. 371 372 Args: 373 chain_id: chain id to query the status on SAP. 374 dbtable: db.table to retrieve the data to 375 check if the sap chain is already finished. 376 status: db.table to retrieve the last status change 377 timestamp. 378 engine_table_name: table name exposed with the SAP LOGCHAIN data. 379 This table will be used in the jdbc query. 380 381 Return: 382 The query string. 383 """ 384 if not chain_id: 385 raise ValueError( 386 "To query on log chain SAP table the chain id should be defined!" 387 ) 388 389 select_exp = ( 390 "SELECT CHAIN_ID, CONCAT(DATUM, ZEIT) AS LOAD_DATE, ANALYZED_STATUS" 391 ) 392 filter_exp = ( 393 f"UPPER(CHAIN_ID) = UPPER('{chain_id}') " 394 f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')" 395 ) 396 397 query = ( 398 f"WITH {engine_table_name} AS (" 399 f"{select_exp} " 400 f"FROM {dbtable} " 401 f"WHERE {filter_exp}" 402 ")" 403 ) 404 405 return query
23class SensorControlTableManager(object): 24 """Class to control the Sensor execution.""" 25 26 _LOGGER = LoggingHandler(__name__).get_logger() 27 28 @classmethod 29 def check_if_sensor_has_acquired_data( 30 cls, 31 sensor_id: str, 32 control_db_table_name: str, 33 ) -> bool: 34 """Check if sensor has acquired new data. 35 36 Args: 37 sensor_id: sensor id. 38 control_db_table_name: `db.table` to control sensor runs. 39 40 Returns: 41 True if acquired new data, otherwise False 42 """ 43 sensor_table_data = cls.read_sensor_table_data( 44 sensor_id=sensor_id, control_db_table_name=control_db_table_name 45 ) 46 cls._LOGGER.info(f"sensor_table_data = {sensor_table_data}") 47 48 return ( 49 sensor_table_data is not None 50 and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value 51 ) 52 53 @classmethod 54 def update_sensor_status( 55 cls, 56 sensor_spec: SensorSpec, 57 status: str, 58 upstream_key: str = None, 59 upstream_value: str = None, 60 ) -> None: 61 """Control sensor execution storing the execution data in a delta table. 62 63 Args: 64 sensor_spec: sensor spec containing all sensor 65 information we need to update the control status. 66 status: status of the sensor. 67 upstream_key: upstream key (e.g., used to store an attribute 68 name from the upstream so that new data can be detected 69 automatically). 70 upstream_value: upstream value (e.g., used to store the max 71 attribute value from the upstream so that new data can be 72 detected automatically). 73 """ 74 cls._LOGGER.info( 75 f"Updating sensor status for sensor {sensor_spec.sensor_id}..." 76 ) 77 78 data = cls._convert_sensor_to_data( 79 spec=sensor_spec, 80 status=status, 81 upstream_key=upstream_key, 82 upstream_value=upstream_value, 83 ) 84 85 sensor_update_set = cls._get_sensor_update_set( 86 assets=sensor_spec.assets, 87 checkpoint_location=sensor_spec.checkpoint_location, 88 upstream_key=upstream_key, 89 upstream_value=upstream_value, 90 ) 91 92 cls._update_sensor_control( 93 data=data, 94 sensor_update_set=sensor_update_set, 95 sensor_control_table=sensor_spec.control_db_table_name, 96 sensor_id=sensor_spec.sensor_id, 97 ) 98 99 @classmethod 100 def _update_sensor_control( 101 cls, 102 data: List[dict], 103 sensor_update_set: dict, 104 sensor_control_table: str, 105 sensor_id: str, 106 ) -> None: 107 """Update sensor control delta table. 108 109 Args: 110 data: to be updated. 111 sensor_update_set: columns which we had update. 112 sensor_control_table: control table name. 113 sensor_id: sensor_id to be updated. 114 """ 115 sensors_delta_table = DeltaTable.forName( 116 ExecEnv.SESSION, 117 sensor_control_table, 118 ) 119 sensors_updates = ExecEnv.SESSION.createDataFrame(data, SENSOR_SCHEMA) 120 sensors_delta_table.alias("sensors").merge( 121 sensors_updates.alias("updates"), 122 f"sensors.sensor_id = '{sensor_id}' AND " 123 "sensors.sensor_id = updates.sensor_id", 124 ).whenMatchedUpdate(set=sensor_update_set).whenNotMatchedInsertAll().execute() 125 126 @classmethod 127 def _convert_sensor_to_data( 128 cls, 129 spec: SensorSpec, 130 status: str, 131 upstream_key: str, 132 upstream_value: str, 133 status_change_timestamp: Optional[datetime] = None, 134 ) -> List[dict]: 135 """Convert sensor data to dataframe input data. 136 137 Args: 138 spec: sensor spec containing sensor identifier data. 139 status: new sensor data status. 140 upstream_key: key used to acquired data from the upstream. 141 upstream_value: max value from the upstream_key 142 acquired from the upstream. 143 status_change_timestamp: timestamp we commit 144 this change in the sensor control table. 145 146 Return: 147 Sensor data as list[dict], used to create a 148 dataframe to store the data into the sensor_control_table. 149 """ 150 status_change_timestamp = ( 151 datetime.now() 152 if status_change_timestamp is None 153 else status_change_timestamp 154 ) 155 return [ 156 { 157 "sensor_id": spec.sensor_id, 158 "assets": spec.assets, 159 "status": status, 160 "status_change_timestamp": status_change_timestamp, 161 "checkpoint_location": spec.checkpoint_location, 162 "upstream_key": str(upstream_key), 163 "upstream_value": str(upstream_value), 164 } 165 ] 166 167 @classmethod 168 def _get_sensor_update_set(cls, **kwargs: Union[Optional[str], List[str]]) -> dict: 169 """Get the sensor update set. 170 171 Args: 172 kwargs: Containing the following keys: 173 - assets 174 - checkpoint_location 175 - upstream_key 176 - upstream_value 177 178 Return: 179 A set containing the fields to update in the control_table. 180 """ 181 sensor_update_set = dict(SENSOR_UPDATE_SET) 182 for key, value in kwargs.items(): 183 if value: 184 sensor_update_set[f"sensors.{key}"] = f"updates.{key}" 185 186 return sensor_update_set 187 188 @classmethod 189 def read_sensor_table_data( 190 cls, 191 control_db_table_name: str, 192 sensor_id: str = None, 193 assets: list = None, 194 ) -> Optional[Row]: 195 """Read data from delta table containing sensor status info. 196 197 Args: 198 sensor_id: sensor id. If this parameter is defined search occurs 199 only considering this parameter. Otherwise, it considers sensor 200 assets and checkpoint location. 201 control_db_table_name: db.table to control sensor runs. 202 assets: list of assets that are fueled by the pipeline 203 where this sensor is. 204 205 Return: 206 Row containing the data for the provided sensor_id. 207 """ 208 df = DeltaTable.forName( 209 ExecEnv.SESSION, 210 control_db_table_name, 211 ).toDF() 212 213 if sensor_id: 214 df = df.where(col("sensor_id") == sensor_id) 215 elif assets: 216 df = df.where(col("assets") == array(*[lit(asset) for asset in assets])) 217 else: 218 raise ValueError( 219 "Either sensor_id or assets need to be provided as arguments." 220 ) 221 222 return df.first()
Class to control the Sensor execution.
28 @classmethod 29 def check_if_sensor_has_acquired_data( 30 cls, 31 sensor_id: str, 32 control_db_table_name: str, 33 ) -> bool: 34 """Check if sensor has acquired new data. 35 36 Args: 37 sensor_id: sensor id. 38 control_db_table_name: `db.table` to control sensor runs. 39 40 Returns: 41 True if acquired new data, otherwise False 42 """ 43 sensor_table_data = cls.read_sensor_table_data( 44 sensor_id=sensor_id, control_db_table_name=control_db_table_name 45 ) 46 cls._LOGGER.info(f"sensor_table_data = {sensor_table_data}") 47 48 return ( 49 sensor_table_data is not None 50 and sensor_table_data.status == SensorStatus.ACQUIRED_NEW_DATA.value 51 )
Check if sensor has acquired new data.
Arguments:
- sensor_id: sensor id.
- control_db_table_name:
db.table
to control sensor runs.
Returns:
True if acquired new data, otherwise False
53 @classmethod 54 def update_sensor_status( 55 cls, 56 sensor_spec: SensorSpec, 57 status: str, 58 upstream_key: str = None, 59 upstream_value: str = None, 60 ) -> None: 61 """Control sensor execution storing the execution data in a delta table. 62 63 Args: 64 sensor_spec: sensor spec containing all sensor 65 information we need to update the control status. 66 status: status of the sensor. 67 upstream_key: upstream key (e.g., used to store an attribute 68 name from the upstream so that new data can be detected 69 automatically). 70 upstream_value: upstream value (e.g., used to store the max 71 attribute value from the upstream so that new data can be 72 detected automatically). 73 """ 74 cls._LOGGER.info( 75 f"Updating sensor status for sensor {sensor_spec.sensor_id}..." 76 ) 77 78 data = cls._convert_sensor_to_data( 79 spec=sensor_spec, 80 status=status, 81 upstream_key=upstream_key, 82 upstream_value=upstream_value, 83 ) 84 85 sensor_update_set = cls._get_sensor_update_set( 86 assets=sensor_spec.assets, 87 checkpoint_location=sensor_spec.checkpoint_location, 88 upstream_key=upstream_key, 89 upstream_value=upstream_value, 90 ) 91 92 cls._update_sensor_control( 93 data=data, 94 sensor_update_set=sensor_update_set, 95 sensor_control_table=sensor_spec.control_db_table_name, 96 sensor_id=sensor_spec.sensor_id, 97 )
Control sensor execution storing the execution data in a delta table.
Arguments:
- sensor_spec: sensor spec containing all sensor information we need to update the control status.
- status: status of the sensor.
- upstream_key: upstream key (e.g., used to store an attribute name from the upstream so that new data can be detected automatically).
- upstream_value: upstream value (e.g., used to store the max attribute value from the upstream so that new data can be detected automatically).
188 @classmethod 189 def read_sensor_table_data( 190 cls, 191 control_db_table_name: str, 192 sensor_id: str = None, 193 assets: list = None, 194 ) -> Optional[Row]: 195 """Read data from delta table containing sensor status info. 196 197 Args: 198 sensor_id: sensor id. If this parameter is defined search occurs 199 only considering this parameter. Otherwise, it considers sensor 200 assets and checkpoint location. 201 control_db_table_name: db.table to control sensor runs. 202 assets: list of assets that are fueled by the pipeline 203 where this sensor is. 204 205 Return: 206 Row containing the data for the provided sensor_id. 207 """ 208 df = DeltaTable.forName( 209 ExecEnv.SESSION, 210 control_db_table_name, 211 ).toDF() 212 213 if sensor_id: 214 df = df.where(col("sensor_id") == sensor_id) 215 elif assets: 216 df = df.where(col("assets") == array(*[lit(asset) for asset in assets])) 217 else: 218 raise ValueError( 219 "Either sensor_id or assets need to be provided as arguments." 220 ) 221 222 return df.first()
Read data from delta table containing sensor status info.
Arguments:
- sensor_id: sensor id. If this parameter is defined search occurs only considering this parameter. Otherwise, it considers sensor assets and checkpoint location.
- control_db_table_name: db.table to control sensor runs.
- assets: list of assets that are fueled by the pipeline where this sensor is.
Return:
Row containing the data for the provided sensor_id.
225class SensorUpstreamManager(object): 226 """Class to deal with Sensor Upstream data.""" 227 228 _LOGGER = LoggingHandler(__name__).get_logger() 229 230 @classmethod 231 def generate_filter_exp_query( 232 cls, 233 sensor_id: str, 234 filter_exp: str, 235 control_db_table_name: str = None, 236 upstream_key: str = None, 237 upstream_value: str = None, 238 upstream_table_name: str = None, 239 ) -> str: 240 """Generates a sensor preprocess query based on timestamp logic. 241 242 Args: 243 sensor_id: sensor id. 244 filter_exp: expression to filter incoming new data. 245 You can use the placeholder `?upstream_value` so that 246 it can be replaced by the upstream_value in the 247 control_db_table_name for this specific sensor_id. 248 control_db_table_name: db.table to retrieve the last status change 249 timestamp. This is only relevant for the jdbc sensor. 250 upstream_key: the key of custom sensor information 251 to control how to identify new data from the 252 upstream (e.g., a time column in the upstream). 253 upstream_value: value for custom sensor 254 to identify new data from the upstream 255 (e.g., the value of a time present in the upstream) 256 If none we will set the default value. 257 Note: This parameter is used just to override the 258 default value `-2147483647`. 259 upstream_table_name: value for custom sensor 260 to query new data from the upstream. 261 If none we will set the default value, 262 our `sensor_new_data` view. 263 264 Return: 265 The query string. 266 """ 267 source_table = upstream_table_name if upstream_table_name else "sensor_new_data" 268 select_exp = "SELECT COUNT(1) as count" 269 if control_db_table_name: 270 if not upstream_key: 271 raise ValueError( 272 "If control_db_table_name is defined, upstream_key should " 273 "also be defined!" 274 ) 275 276 default_upstream_value: str = "-2147483647" 277 trigger_name = upstream_key 278 trigger_value = ( 279 default_upstream_value if upstream_value is None else upstream_value 280 ) 281 sensor_table_data = SensorControlTableManager.read_sensor_table_data( 282 sensor_id=sensor_id, control_db_table_name=control_db_table_name 283 ) 284 285 if sensor_table_data and sensor_table_data.upstream_value: 286 trigger_value = sensor_table_data.upstream_value 287 288 filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace( 289 "?upstream_value", trigger_value 290 ) 291 select_exp = ( 292 f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, " 293 f"max({trigger_name}) as UPSTREAM_VALUE" 294 ) 295 296 query = ( 297 f"{select_exp} " 298 f"FROM {source_table} " 299 f"WHERE {filter_exp} " 300 f"HAVING COUNT(1) > 0" 301 ) 302 303 return query 304 305 @classmethod 306 def generate_sensor_table_preprocess_query( 307 cls, 308 sensor_id: str, 309 ) -> str: 310 """Generates a query to be used for a sensor having other sensor as upstream. 311 312 Args: 313 sensor_id: sensor id. 314 315 Return: 316 The query string. 317 """ 318 query = ( 319 f"SELECT * " # nosec 320 f"FROM sensor_new_data " 321 f"WHERE" 322 f" _change_type in ('insert', 'update_postimage')" 323 f" and sensor_id = '{sensor_id}'" 324 f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'" 325 ) 326 327 return query 328 329 @classmethod 330 def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame: 331 """Read new data from the upstream into the sensor 'new_data_df'. 332 333 Args: 334 sensor_spec: sensor spec containing all sensor information. 335 336 Return: 337 An empty dataframe if it doesn't have new data otherwise the new data 338 """ 339 new_data_df = ReaderFactory.get_data(sensor_spec.input_spec) 340 341 if sensor_spec.preprocess_query: 342 new_data_df.createOrReplaceTempView("sensor_new_data") 343 new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query) 344 345 return new_data_df 346 347 @classmethod 348 def get_new_data( 349 cls, 350 new_data_df: DataFrame, 351 ) -> Optional[Row]: 352 """Get new data from upstream df if it's present. 353 354 Args: 355 new_data_df: DataFrame possibly containing new data. 356 357 Return: 358 Optional row, present if there is new data in the upstream, 359 absent otherwise. 360 """ 361 return new_data_df.first() 362 363 @classmethod 364 def generate_sensor_sap_logchain_query( 365 cls, 366 chain_id: str, 367 dbtable: str = SAPLogchain.DBTABLE.value, 368 status: str = SAPLogchain.GREEN_STATUS.value, 369 engine_table_name: str = SAPLogchain.ENGINE_TABLE.value, 370 ) -> str: 371 """Generates a sensor query based in the SAP Logchain table. 372 373 Args: 374 chain_id: chain id to query the status on SAP. 375 dbtable: db.table to retrieve the data to 376 check if the sap chain is already finished. 377 status: db.table to retrieve the last status change 378 timestamp. 379 engine_table_name: table name exposed with the SAP LOGCHAIN data. 380 This table will be used in the jdbc query. 381 382 Return: 383 The query string. 384 """ 385 if not chain_id: 386 raise ValueError( 387 "To query on log chain SAP table the chain id should be defined!" 388 ) 389 390 select_exp = ( 391 "SELECT CHAIN_ID, CONCAT(DATUM, ZEIT) AS LOAD_DATE, ANALYZED_STATUS" 392 ) 393 filter_exp = ( 394 f"UPPER(CHAIN_ID) = UPPER('{chain_id}') " 395 f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')" 396 ) 397 398 query = ( 399 f"WITH {engine_table_name} AS (" 400 f"{select_exp} " 401 f"FROM {dbtable} " 402 f"WHERE {filter_exp}" 403 ")" 404 ) 405 406 return query
Class to deal with Sensor Upstream data.
230 @classmethod 231 def generate_filter_exp_query( 232 cls, 233 sensor_id: str, 234 filter_exp: str, 235 control_db_table_name: str = None, 236 upstream_key: str = None, 237 upstream_value: str = None, 238 upstream_table_name: str = None, 239 ) -> str: 240 """Generates a sensor preprocess query based on timestamp logic. 241 242 Args: 243 sensor_id: sensor id. 244 filter_exp: expression to filter incoming new data. 245 You can use the placeholder `?upstream_value` so that 246 it can be replaced by the upstream_value in the 247 control_db_table_name for this specific sensor_id. 248 control_db_table_name: db.table to retrieve the last status change 249 timestamp. This is only relevant for the jdbc sensor. 250 upstream_key: the key of custom sensor information 251 to control how to identify new data from the 252 upstream (e.g., a time column in the upstream). 253 upstream_value: value for custom sensor 254 to identify new data from the upstream 255 (e.g., the value of a time present in the upstream) 256 If none we will set the default value. 257 Note: This parameter is used just to override the 258 default value `-2147483647`. 259 upstream_table_name: value for custom sensor 260 to query new data from the upstream. 261 If none we will set the default value, 262 our `sensor_new_data` view. 263 264 Return: 265 The query string. 266 """ 267 source_table = upstream_table_name if upstream_table_name else "sensor_new_data" 268 select_exp = "SELECT COUNT(1) as count" 269 if control_db_table_name: 270 if not upstream_key: 271 raise ValueError( 272 "If control_db_table_name is defined, upstream_key should " 273 "also be defined!" 274 ) 275 276 default_upstream_value: str = "-2147483647" 277 trigger_name = upstream_key 278 trigger_value = ( 279 default_upstream_value if upstream_value is None else upstream_value 280 ) 281 sensor_table_data = SensorControlTableManager.read_sensor_table_data( 282 sensor_id=sensor_id, control_db_table_name=control_db_table_name 283 ) 284 285 if sensor_table_data and sensor_table_data.upstream_value: 286 trigger_value = sensor_table_data.upstream_value 287 288 filter_exp = filter_exp.replace("?upstream_key", trigger_name).replace( 289 "?upstream_value", trigger_value 290 ) 291 select_exp = ( 292 f"SELECT COUNT(1) as count, '{trigger_name}' as UPSTREAM_KEY, " 293 f"max({trigger_name}) as UPSTREAM_VALUE" 294 ) 295 296 query = ( 297 f"{select_exp} " 298 f"FROM {source_table} " 299 f"WHERE {filter_exp} " 300 f"HAVING COUNT(1) > 0" 301 ) 302 303 return query
Generates a sensor preprocess query based on timestamp logic.
Arguments:
- sensor_id: sensor id.
- filter_exp: expression to filter incoming new data.
You can use the placeholder
?upstream_value
so that it can be replaced by the upstream_value in the control_db_table_name for this specific sensor_id. - control_db_table_name: db.table to retrieve the last status change timestamp. This is only relevant for the jdbc sensor.
- upstream_key: the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream).
- upstream_value: value for custom sensor
to identify new data from the upstream
(e.g., the value of a time present in the upstream)
If none we will set the default value.
Note: This parameter is used just to override the
default value
-2147483647
. - upstream_table_name: value for custom sensor
to query new data from the upstream.
If none we will set the default value,
our
sensor_new_data
view.
Return:
The query string.
305 @classmethod 306 def generate_sensor_table_preprocess_query( 307 cls, 308 sensor_id: str, 309 ) -> str: 310 """Generates a query to be used for a sensor having other sensor as upstream. 311 312 Args: 313 sensor_id: sensor id. 314 315 Return: 316 The query string. 317 """ 318 query = ( 319 f"SELECT * " # nosec 320 f"FROM sensor_new_data " 321 f"WHERE" 322 f" _change_type in ('insert', 'update_postimage')" 323 f" and sensor_id = '{sensor_id}'" 324 f" and status = '{SensorStatus.PROCESSED_NEW_DATA.value}'" 325 ) 326 327 return query
Generates a query to be used for a sensor having other sensor as upstream.
Arguments:
- sensor_id: sensor id.
Return:
The query string.
329 @classmethod 330 def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame: 331 """Read new data from the upstream into the sensor 'new_data_df'. 332 333 Args: 334 sensor_spec: sensor spec containing all sensor information. 335 336 Return: 337 An empty dataframe if it doesn't have new data otherwise the new data 338 """ 339 new_data_df = ReaderFactory.get_data(sensor_spec.input_spec) 340 341 if sensor_spec.preprocess_query: 342 new_data_df.createOrReplaceTempView("sensor_new_data") 343 new_data_df = ExecEnv.SESSION.sql(sensor_spec.preprocess_query) 344 345 return new_data_df
Read new data from the upstream into the sensor 'new_data_df'.
Arguments:
- sensor_spec: sensor spec containing all sensor information.
Return:
An empty dataframe if it doesn't have new data otherwise the new data
347 @classmethod 348 def get_new_data( 349 cls, 350 new_data_df: DataFrame, 351 ) -> Optional[Row]: 352 """Get new data from upstream df if it's present. 353 354 Args: 355 new_data_df: DataFrame possibly containing new data. 356 357 Return: 358 Optional row, present if there is new data in the upstream, 359 absent otherwise. 360 """ 361 return new_data_df.first()
Get new data from upstream df if it's present.
Arguments:
- new_data_df: DataFrame possibly containing new data.
Return:
Optional row, present if there is new data in the upstream, absent otherwise.
363 @classmethod 364 def generate_sensor_sap_logchain_query( 365 cls, 366 chain_id: str, 367 dbtable: str = SAPLogchain.DBTABLE.value, 368 status: str = SAPLogchain.GREEN_STATUS.value, 369 engine_table_name: str = SAPLogchain.ENGINE_TABLE.value, 370 ) -> str: 371 """Generates a sensor query based in the SAP Logchain table. 372 373 Args: 374 chain_id: chain id to query the status on SAP. 375 dbtable: db.table to retrieve the data to 376 check if the sap chain is already finished. 377 status: db.table to retrieve the last status change 378 timestamp. 379 engine_table_name: table name exposed with the SAP LOGCHAIN data. 380 This table will be used in the jdbc query. 381 382 Return: 383 The query string. 384 """ 385 if not chain_id: 386 raise ValueError( 387 "To query on log chain SAP table the chain id should be defined!" 388 ) 389 390 select_exp = ( 391 "SELECT CHAIN_ID, CONCAT(DATUM, ZEIT) AS LOAD_DATE, ANALYZED_STATUS" 392 ) 393 filter_exp = ( 394 f"UPPER(CHAIN_ID) = UPPER('{chain_id}') " 395 f"AND UPPER(ANALYZED_STATUS) = UPPER('{status}')" 396 ) 397 398 query = ( 399 f"WITH {engine_table_name} AS (" 400 f"{select_exp} " 401 f"FROM {dbtable} " 402 f"WHERE {filter_exp}" 403 ")" 404 ) 405 406 return query
Generates a sensor query based in the SAP Logchain table.
Arguments:
- chain_id: chain id to query the status on SAP.
- dbtable: db.table to retrieve the data to check if the sap chain is already finished.
- status: db.table to retrieve the last status change timestamp.
- engine_table_name: table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query.
Return:
The query string.