lakehouse_engine.algorithms.sensor

Module to define Sensor algorithm behavior.

  1"""Module to define Sensor algorithm behavior."""
  2
  3from pyspark.sql import DataFrame
  4
  5from lakehouse_engine.algorithms.algorithm import Algorithm
  6from lakehouse_engine.algorithms.exceptions import (
  7    NoNewDataException,
  8    SensorAlreadyExistsException,
  9)
 10from lakehouse_engine.core.definitions import (
 11    SENSOR_ALLOWED_DATA_FORMATS,
 12    InputFormat,
 13    ReadType,
 14    SensorSpec,
 15    SensorStatus,
 16)
 17from lakehouse_engine.core.exec_env import ExecEnv
 18from lakehouse_engine.core.sensor_manager import (
 19    SensorControlTableManager,
 20    SensorUpstreamManager,
 21)
 22from lakehouse_engine.utils.logging_handler import LoggingHandler
 23
 24
 25class Sensor(Algorithm):
 26    """Class representing a sensor to check if the upstream has new data."""
 27
 28    _LOGGER = LoggingHandler(__name__).get_logger()
 29
 30    def __init__(self, acon: dict):
 31        """Construct Sensor instances.
 32
 33        Args:
 34            acon: algorithm configuration.
 35        """
 36        self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon)
 37        self._validate_sensor_spec()
 38
 39        if self._check_if_sensor_already_exists():
 40            raise SensorAlreadyExistsException(
 41                "There's already a sensor registered with same id or assets!"
 42            )
 43
 44    def execute(self) -> bool:
 45        """Execute the sensor."""
 46        self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...")
 47
 48        new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec)
 49        if self.spec.input_spec.read_type == ReadType.STREAMING.value:
 50            Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df)
 51        elif self.spec.input_spec.read_type == ReadType.BATCH.value:
 52            Sensor._run_batch_sensor(
 53                sensor_spec=self.spec,
 54                new_data_df=new_data_df,
 55            )
 56
 57        has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data(
 58            self.spec.sensor_id,
 59            self.spec.control_db_table_name,
 60        )
 61
 62        self._LOGGER.info(
 63            f"Sensor {self.spec.sensor_id} has previously "
 64            f"acquired data? {has_new_data}"
 65        )
 66
 67        if self.spec.fail_on_empty_result and not has_new_data:
 68            raise NoNewDataException(
 69                f"No data was acquired by {self.spec.sensor_id} sensor."
 70            )
 71
 72        return has_new_data
 73
 74    def _check_if_sensor_already_exists(self) -> bool:
 75        """Check if sensor already exists in the table to avoid duplicates."""
 76        row = SensorControlTableManager.read_sensor_table_data(
 77            sensor_id=self.spec.sensor_id,
 78            control_db_table_name=self.spec.control_db_table_name,
 79        )
 80
 81        if row and row.assets != self.spec.assets:
 82            return True
 83        else:
 84            row = SensorControlTableManager.read_sensor_table_data(
 85                assets=self.spec.assets,
 86                control_db_table_name=self.spec.control_db_table_name,
 87            )
 88            return row is not None and row.sensor_id != self.spec.sensor_id
 89
 90    @classmethod
 91    def _run_streaming_sensor(
 92        cls, sensor_spec: SensorSpec, new_data_df: DataFrame
 93    ) -> None:
 94        """Run sensor in streaming mode (internally runs in batch mode)."""
 95
 96        def foreach_batch_check_new_data(df: DataFrame, batch_id: int) -> None:
 97            # forcing session to be available inside forEachBatch on
 98            # Spark Connect
 99            ExecEnv.get_or_create()
100
101            Sensor._run_batch_sensor(
102                sensor_spec=sensor_spec,
103                new_data_df=df,
104            )
105
106        new_data_df.writeStream.trigger(availableNow=True).option(
107            "checkpointLocation", sensor_spec.checkpoint_location
108        ).foreachBatch(foreach_batch_check_new_data).start().awaitTermination()
109
110    @classmethod
111    def _run_batch_sensor(
112        cls,
113        sensor_spec: SensorSpec,
114        new_data_df: DataFrame,
115    ) -> None:
116        """Run sensor in batch mode.
117
118        Args:
119            sensor_spec: sensor spec containing all sensor information.
120            new_data_df: DataFrame possibly containing new data.
121        """
122        new_data_first_row = SensorUpstreamManager.get_new_data(new_data_df)
123
124        cls._LOGGER.info(
125            f"Sensor {sensor_spec.sensor_id} has new data from upstream? "
126            f"{new_data_first_row is not None}"
127        )
128
129        if new_data_first_row:
130            SensorControlTableManager.update_sensor_status(
131                sensor_spec=sensor_spec,
132                status=SensorStatus.ACQUIRED_NEW_DATA.value,
133                upstream_key=(
134                    new_data_first_row.UPSTREAM_KEY
135                    if "UPSTREAM_KEY" in new_data_df.columns
136                    else None
137                ),
138                upstream_value=(
139                    new_data_first_row.UPSTREAM_VALUE
140                    if "UPSTREAM_VALUE" in new_data_df.columns
141                    else None
142                ),
143            )
144            cls._LOGGER.info(
145                f"Successfully updated sensor status for sensor "
146                f"{sensor_spec.sensor_id}..."
147            )
148
149    def _validate_sensor_spec(self) -> None:
150        """Validate if sensor spec Read Type is allowed for the selected Data Format."""
151        if InputFormat.exists(self.spec.input_spec.data_format):
152            if (
153                self.spec.input_spec.data_format
154                not in SENSOR_ALLOWED_DATA_FORMATS[self.spec.input_spec.read_type]
155            ):
156                raise NotImplementedError(
157                    f"A sensor has not been implemented yet for this data format or, "
158                    f"this data format is not available for the read_type"
159                    f" {self.spec.input_spec.read_type}. "
160                    f"Check the allowed combinations of read_type and data_formats:"
161                    f" {SENSOR_ALLOWED_DATA_FORMATS}"
162                )
163        else:
164            raise NotImplementedError(
165                f"Data format {self.spec.input_spec.data_format} isn't implemented yet."
166            )
 26class Sensor(Algorithm):
 27    """Class representing a sensor to check if the upstream has new data."""
 28
 29    _LOGGER = LoggingHandler(__name__).get_logger()
 30
 31    def __init__(self, acon: dict):
 32        """Construct Sensor instances.
 33
 34        Args:
 35            acon: algorithm configuration.
 36        """
 37        self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon)
 38        self._validate_sensor_spec()
 39
 40        if self._check_if_sensor_already_exists():
 41            raise SensorAlreadyExistsException(
 42                "There's already a sensor registered with same id or assets!"
 43            )
 44
 45    def execute(self) -> bool:
 46        """Execute the sensor."""
 47        self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...")
 48
 49        new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec)
 50        if self.spec.input_spec.read_type == ReadType.STREAMING.value:
 51            Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df)
 52        elif self.spec.input_spec.read_type == ReadType.BATCH.value:
 53            Sensor._run_batch_sensor(
 54                sensor_spec=self.spec,
 55                new_data_df=new_data_df,
 56            )
 57
 58        has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data(
 59            self.spec.sensor_id,
 60            self.spec.control_db_table_name,
 61        )
 62
 63        self._LOGGER.info(
 64            f"Sensor {self.spec.sensor_id} has previously "
 65            f"acquired data? {has_new_data}"
 66        )
 67
 68        if self.spec.fail_on_empty_result and not has_new_data:
 69            raise NoNewDataException(
 70                f"No data was acquired by {self.spec.sensor_id} sensor."
 71            )
 72
 73        return has_new_data
 74
 75    def _check_if_sensor_already_exists(self) -> bool:
 76        """Check if sensor already exists in the table to avoid duplicates."""
 77        row = SensorControlTableManager.read_sensor_table_data(
 78            sensor_id=self.spec.sensor_id,
 79            control_db_table_name=self.spec.control_db_table_name,
 80        )
 81
 82        if row and row.assets != self.spec.assets:
 83            return True
 84        else:
 85            row = SensorControlTableManager.read_sensor_table_data(
 86                assets=self.spec.assets,
 87                control_db_table_name=self.spec.control_db_table_name,
 88            )
 89            return row is not None and row.sensor_id != self.spec.sensor_id
 90
 91    @classmethod
 92    def _run_streaming_sensor(
 93        cls, sensor_spec: SensorSpec, new_data_df: DataFrame
 94    ) -> None:
 95        """Run sensor in streaming mode (internally runs in batch mode)."""
 96
 97        def foreach_batch_check_new_data(df: DataFrame, batch_id: int) -> None:
 98            # forcing session to be available inside forEachBatch on
 99            # Spark Connect
100            ExecEnv.get_or_create()
101
102            Sensor._run_batch_sensor(
103                sensor_spec=sensor_spec,
104                new_data_df=df,
105            )
106
107        new_data_df.writeStream.trigger(availableNow=True).option(
108            "checkpointLocation", sensor_spec.checkpoint_location
109        ).foreachBatch(foreach_batch_check_new_data).start().awaitTermination()
110
111    @classmethod
112    def _run_batch_sensor(
113        cls,
114        sensor_spec: SensorSpec,
115        new_data_df: DataFrame,
116    ) -> None:
117        """Run sensor in batch mode.
118
119        Args:
120            sensor_spec: sensor spec containing all sensor information.
121            new_data_df: DataFrame possibly containing new data.
122        """
123        new_data_first_row = SensorUpstreamManager.get_new_data(new_data_df)
124
125        cls._LOGGER.info(
126            f"Sensor {sensor_spec.sensor_id} has new data from upstream? "
127            f"{new_data_first_row is not None}"
128        )
129
130        if new_data_first_row:
131            SensorControlTableManager.update_sensor_status(
132                sensor_spec=sensor_spec,
133                status=SensorStatus.ACQUIRED_NEW_DATA.value,
134                upstream_key=(
135                    new_data_first_row.UPSTREAM_KEY
136                    if "UPSTREAM_KEY" in new_data_df.columns
137                    else None
138                ),
139                upstream_value=(
140                    new_data_first_row.UPSTREAM_VALUE
141                    if "UPSTREAM_VALUE" in new_data_df.columns
142                    else None
143                ),
144            )
145            cls._LOGGER.info(
146                f"Successfully updated sensor status for sensor "
147                f"{sensor_spec.sensor_id}..."
148            )
149
150    def _validate_sensor_spec(self) -> None:
151        """Validate if sensor spec Read Type is allowed for the selected Data Format."""
152        if InputFormat.exists(self.spec.input_spec.data_format):
153            if (
154                self.spec.input_spec.data_format
155                not in SENSOR_ALLOWED_DATA_FORMATS[self.spec.input_spec.read_type]
156            ):
157                raise NotImplementedError(
158                    f"A sensor has not been implemented yet for this data format or, "
159                    f"this data format is not available for the read_type"
160                    f" {self.spec.input_spec.read_type}. "
161                    f"Check the allowed combinations of read_type and data_formats:"
162                    f" {SENSOR_ALLOWED_DATA_FORMATS}"
163                )
164        else:
165            raise NotImplementedError(
166                f"Data format {self.spec.input_spec.data_format} isn't implemented yet."
167            )

Class representing a sensor to check if the upstream has new data.

Sensor(acon: dict)
31    def __init__(self, acon: dict):
32        """Construct Sensor instances.
33
34        Args:
35            acon: algorithm configuration.
36        """
37        self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon)
38        self._validate_sensor_spec()
39
40        if self._check_if_sensor_already_exists():
41            raise SensorAlreadyExistsException(
42                "There's already a sensor registered with same id or assets!"
43            )

Construct Sensor instances.

Arguments:
  • acon: algorithm configuration.
def execute(self) -> bool:
45    def execute(self) -> bool:
46        """Execute the sensor."""
47        self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...")
48
49        new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec)
50        if self.spec.input_spec.read_type == ReadType.STREAMING.value:
51            Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df)
52        elif self.spec.input_spec.read_type == ReadType.BATCH.value:
53            Sensor._run_batch_sensor(
54                sensor_spec=self.spec,
55                new_data_df=new_data_df,
56            )
57
58        has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data(
59            self.spec.sensor_id,
60            self.spec.control_db_table_name,
61        )
62
63        self._LOGGER.info(
64            f"Sensor {self.spec.sensor_id} has previously "
65            f"acquired data? {has_new_data}"
66        )
67
68        if self.spec.fail_on_empty_result and not has_new_data:
69            raise NoNewDataException(
70                f"No data was acquired by {self.spec.sensor_id} sensor."
71            )
72
73        return has_new_data

Execute the sensor.