lakehouse_engine.terminators.sensor_terminator

Module with sensor terminator.

 1"""Module with sensor terminator."""
 2
 3from typing import List
 4
 5from lakehouse_engine.core.definitions import SensorSpec, SensorStatus
 6from lakehouse_engine.core.exec_env import ExecEnv
 7from lakehouse_engine.core.sensor_manager import SensorControlTableManager
 8from lakehouse_engine.utils.logging_handler import LoggingHandler
 9
10
11class SensorTerminator(object):
12    """Sensor Terminator class."""
13
14    _logger = LoggingHandler(__name__).get_logger()
15
16    @classmethod
17    def update_sensor_status(
18        cls,
19        sensor_id: str,
20        control_db_table_name: str,
21        status: str = SensorStatus.PROCESSED_NEW_DATA.value,
22        assets: List[str] = None,
23    ) -> None:
24        """Update internal sensor status.
25
26        Update the sensor status in the control table, it should be used to tell the
27        system that the sensor has processed all new data that was previously
28        identified, hence updating the shifted sensor status.
29        Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to
30        `SensorStatus.PROCESSED_NEW_DATA`, but there might be scenarios - still
31        to identify - where we can update the sensor status from/to different statuses.
32
33        Args:
34            sensor_id: sensor id.
35            control_db_table_name: `db.table` to store sensor checkpoints.
36            status: status of the sensor.
37            assets: a list of assets that are considered as available to
38                consume downstream after this sensor has status
39                PROCESSED_NEW_DATA.
40        """
41        if status not in [s.value for s in SensorStatus]:
42            raise NotImplementedError(f"Status {status} not accepted in sensor.")
43
44        ExecEnv.get_or_create(app_name="update_sensor_status")
45        SensorControlTableManager.update_sensor_status(
46            sensor_spec=SensorSpec(
47                sensor_id=sensor_id,
48                control_db_table_name=control_db_table_name,
49                assets=assets,
50                input_spec=None,
51                preprocess_query=None,
52                checkpoint_location=None,
53            ),
54            status=status,
55        )
class SensorTerminator:
12class SensorTerminator(object):
13    """Sensor Terminator class."""
14
15    _logger = LoggingHandler(__name__).get_logger()
16
17    @classmethod
18    def update_sensor_status(
19        cls,
20        sensor_id: str,
21        control_db_table_name: str,
22        status: str = SensorStatus.PROCESSED_NEW_DATA.value,
23        assets: List[str] = None,
24    ) -> None:
25        """Update internal sensor status.
26
27        Update the sensor status in the control table, it should be used to tell the
28        system that the sensor has processed all new data that was previously
29        identified, hence updating the shifted sensor status.
30        Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to
31        `SensorStatus.PROCESSED_NEW_DATA`, but there might be scenarios - still
32        to identify - where we can update the sensor status from/to different statuses.
33
34        Args:
35            sensor_id: sensor id.
36            control_db_table_name: `db.table` to store sensor checkpoints.
37            status: status of the sensor.
38            assets: a list of assets that are considered as available to
39                consume downstream after this sensor has status
40                PROCESSED_NEW_DATA.
41        """
42        if status not in [s.value for s in SensorStatus]:
43            raise NotImplementedError(f"Status {status} not accepted in sensor.")
44
45        ExecEnv.get_or_create(app_name="update_sensor_status")
46        SensorControlTableManager.update_sensor_status(
47            sensor_spec=SensorSpec(
48                sensor_id=sensor_id,
49                control_db_table_name=control_db_table_name,
50                assets=assets,
51                input_spec=None,
52                preprocess_query=None,
53                checkpoint_location=None,
54            ),
55            status=status,
56        )

Sensor Terminator class.

@classmethod
def update_sensor_status( cls, sensor_id: str, control_db_table_name: str, status: str = 'PROCESSED_NEW_DATA', assets: List[str] = None) -> None:
17    @classmethod
18    def update_sensor_status(
19        cls,
20        sensor_id: str,
21        control_db_table_name: str,
22        status: str = SensorStatus.PROCESSED_NEW_DATA.value,
23        assets: List[str] = None,
24    ) -> None:
25        """Update internal sensor status.
26
27        Update the sensor status in the control table, it should be used to tell the
28        system that the sensor has processed all new data that was previously
29        identified, hence updating the shifted sensor status.
30        Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to
31        `SensorStatus.PROCESSED_NEW_DATA`, but there might be scenarios - still
32        to identify - where we can update the sensor status from/to different statuses.
33
34        Args:
35            sensor_id: sensor id.
36            control_db_table_name: `db.table` to store sensor checkpoints.
37            status: status of the sensor.
38            assets: a list of assets that are considered as available to
39                consume downstream after this sensor has status
40                PROCESSED_NEW_DATA.
41        """
42        if status not in [s.value for s in SensorStatus]:
43            raise NotImplementedError(f"Status {status} not accepted in sensor.")
44
45        ExecEnv.get_or_create(app_name="update_sensor_status")
46        SensorControlTableManager.update_sensor_status(
47            sensor_spec=SensorSpec(
48                sensor_id=sensor_id,
49                control_db_table_name=control_db_table_name,
50                assets=assets,
51                input_spec=None,
52                preprocess_query=None,
53                checkpoint_location=None,
54            ),
55            status=status,
56        )

Update internal sensor status.

Update the sensor status in the control table, it should be used to tell the system that the sensor has processed all new data that was previously identified, hence updating the shifted sensor status. Usually used to move from SensorStatus.ACQUIRED_NEW_DATA to SensorStatus.PROCESSED_NEW_DATA, but there might be scenarios - still to identify - where we can update the sensor status from/to different statuses.

Arguments:
  • sensor_id: sensor id.
  • control_db_table_name: db.table to store sensor checkpoints.
  • status: status of the sensor.
  • assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.