lakehouse_engine.terminators.sensor_terminator
Module with sensor terminator.
1"""Module with sensor terminator.""" 2 3from typing import List 4 5from lakehouse_engine.core.definitions import SensorSpec, SensorStatus 6from lakehouse_engine.core.exec_env import ExecEnv 7from lakehouse_engine.core.sensor_manager import SensorControlTableManager 8from lakehouse_engine.utils.logging_handler import LoggingHandler 9 10 11class SensorTerminator(object): 12 """Sensor Terminator class.""" 13 14 _logger = LoggingHandler(__name__).get_logger() 15 16 @classmethod 17 def update_sensor_status( 18 cls, 19 sensor_id: str, 20 control_db_table_name: str, 21 status: str = SensorStatus.PROCESSED_NEW_DATA.value, 22 assets: List[str] = None, 23 ) -> None: 24 """Update internal sensor status. 25 26 Update the sensor status in the control table, it should be used to tell the 27 system that the sensor has processed all new data that was previously 28 identified, hence updating the shifted sensor status. 29 Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to 30 `SensorStatus.PROCESSED_NEW_DATA`, but there might be scenarios - still 31 to identify - where we can update the sensor status from/to different statuses. 32 33 Args: 34 sensor_id: sensor id. 35 control_db_table_name: `db.table` to store sensor checkpoints. 36 status: status of the sensor. 37 assets: a list of assets that are considered as available to 38 consume downstream after this sensor has status 39 PROCESSED_NEW_DATA. 40 """ 41 if status not in [s.value for s in SensorStatus]: 42 raise NotImplementedError(f"Status {status} not accepted in sensor.") 43 44 ExecEnv.get_or_create(app_name="update_sensor_status") 45 SensorControlTableManager.update_sensor_status( 46 sensor_spec=SensorSpec( 47 sensor_id=sensor_id, 48 control_db_table_name=control_db_table_name, 49 assets=assets, 50 input_spec=None, 51 preprocess_query=None, 52 checkpoint_location=None, 53 ), 54 status=status, 55 )
class
SensorTerminator:
12class SensorTerminator(object): 13 """Sensor Terminator class.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def update_sensor_status( 19 cls, 20 sensor_id: str, 21 control_db_table_name: str, 22 status: str = SensorStatus.PROCESSED_NEW_DATA.value, 23 assets: List[str] = None, 24 ) -> None: 25 """Update internal sensor status. 26 27 Update the sensor status in the control table, it should be used to tell the 28 system that the sensor has processed all new data that was previously 29 identified, hence updating the shifted sensor status. 30 Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to 31 `SensorStatus.PROCESSED_NEW_DATA`, but there might be scenarios - still 32 to identify - where we can update the sensor status from/to different statuses. 33 34 Args: 35 sensor_id: sensor id. 36 control_db_table_name: `db.table` to store sensor checkpoints. 37 status: status of the sensor. 38 assets: a list of assets that are considered as available to 39 consume downstream after this sensor has status 40 PROCESSED_NEW_DATA. 41 """ 42 if status not in [s.value for s in SensorStatus]: 43 raise NotImplementedError(f"Status {status} not accepted in sensor.") 44 45 ExecEnv.get_or_create(app_name="update_sensor_status") 46 SensorControlTableManager.update_sensor_status( 47 sensor_spec=SensorSpec( 48 sensor_id=sensor_id, 49 control_db_table_name=control_db_table_name, 50 assets=assets, 51 input_spec=None, 52 preprocess_query=None, 53 checkpoint_location=None, 54 ), 55 status=status, 56 )
Sensor Terminator class.
@classmethod
def
update_sensor_status( cls, sensor_id: str, control_db_table_name: str, status: str = 'PROCESSED_NEW_DATA', assets: List[str] = None) -> None:
17 @classmethod 18 def update_sensor_status( 19 cls, 20 sensor_id: str, 21 control_db_table_name: str, 22 status: str = SensorStatus.PROCESSED_NEW_DATA.value, 23 assets: List[str] = None, 24 ) -> None: 25 """Update internal sensor status. 26 27 Update the sensor status in the control table, it should be used to tell the 28 system that the sensor has processed all new data that was previously 29 identified, hence updating the shifted sensor status. 30 Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to 31 `SensorStatus.PROCESSED_NEW_DATA`, but there might be scenarios - still 32 to identify - where we can update the sensor status from/to different statuses. 33 34 Args: 35 sensor_id: sensor id. 36 control_db_table_name: `db.table` to store sensor checkpoints. 37 status: status of the sensor. 38 assets: a list of assets that are considered as available to 39 consume downstream after this sensor has status 40 PROCESSED_NEW_DATA. 41 """ 42 if status not in [s.value for s in SensorStatus]: 43 raise NotImplementedError(f"Status {status} not accepted in sensor.") 44 45 ExecEnv.get_or_create(app_name="update_sensor_status") 46 SensorControlTableManager.update_sensor_status( 47 sensor_spec=SensorSpec( 48 sensor_id=sensor_id, 49 control_db_table_name=control_db_table_name, 50 assets=assets, 51 input_spec=None, 52 preprocess_query=None, 53 checkpoint_location=None, 54 ), 55 status=status, 56 )
Update internal sensor status.
Update the sensor status in the control table, it should be used to tell the
system that the sensor has processed all new data that was previously
identified, hence updating the shifted sensor status.
Usually used to move from SensorStatus.ACQUIRED_NEW_DATA
to
SensorStatus.PROCESSED_NEW_DATA
, but there might be scenarios - still
to identify - where we can update the sensor status from/to different statuses.
Arguments:
- sensor_id: sensor id.
- control_db_table_name:
db.table
to store sensor checkpoints. - status: status of the sensor.
- assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.