lakehouse_engine.algorithms.sensor
Module to define Sensor algorithm behavior.
1"""Module to define Sensor algorithm behavior.""" 2 3from pyspark.sql import DataFrame 4 5from lakehouse_engine.algorithms.algorithm import Algorithm 6from lakehouse_engine.algorithms.exceptions import ( 7 NoNewDataException, 8 SensorAlreadyExistsException, 9) 10from lakehouse_engine.core.definitions import ( 11 SENSOR_ALLOWED_DATA_FORMATS, 12 InputFormat, 13 ReadType, 14 SensorSpec, 15 SensorStatus, 16) 17from lakehouse_engine.core.exec_env import ExecEnv 18from lakehouse_engine.core.sensor_manager import ( 19 SensorControlTableManager, 20 SensorUpstreamManager, 21) 22from lakehouse_engine.utils.logging_handler import LoggingHandler 23 24 25class Sensor(Algorithm): 26 """Class representing a sensor to check if the upstream has new data.""" 27 28 _LOGGER = LoggingHandler(__name__).get_logger() 29 30 def __init__(self, acon: dict): 31 """Construct Sensor instances. 32 33 Args: 34 acon: algorithm configuration. 35 """ 36 self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon) 37 self._validate_sensor_spec() 38 39 if self._check_if_sensor_already_exists(): 40 raise SensorAlreadyExistsException( 41 "There's already a sensor registered with same id or assets!" 42 ) 43 44 def execute(self) -> bool: 45 """Execute the sensor.""" 46 self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...") 47 48 new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec) 49 if self.spec.input_spec.read_type == ReadType.STREAMING.value: 50 Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df) 51 elif self.spec.input_spec.read_type == ReadType.BATCH.value: 52 Sensor._run_batch_sensor( 53 sensor_spec=self.spec, 54 new_data_df=new_data_df, 55 ) 56 57 has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data( 58 self.spec.sensor_id, 59 self.spec.control_db_table_name, 60 ) 61 62 self._LOGGER.info( 63 f"Sensor {self.spec.sensor_id} has previously " 64 f"acquired data? {has_new_data}" 65 ) 66 67 if self.spec.fail_on_empty_result and not has_new_data: 68 raise NoNewDataException( 69 f"No data was acquired by {self.spec.sensor_id} sensor." 70 ) 71 72 return has_new_data 73 74 def _check_if_sensor_already_exists(self) -> bool: 75 """Check if sensor already exists in the table to avoid duplicates.""" 76 row = SensorControlTableManager.read_sensor_table_data( 77 sensor_id=self.spec.sensor_id, 78 control_db_table_name=self.spec.control_db_table_name, 79 ) 80 81 if row and row.assets != self.spec.assets: 82 return True 83 else: 84 row = SensorControlTableManager.read_sensor_table_data( 85 assets=self.spec.assets, 86 control_db_table_name=self.spec.control_db_table_name, 87 ) 88 return row is not None and row.sensor_id != self.spec.sensor_id 89 90 @classmethod 91 def _run_streaming_sensor( 92 cls, sensor_spec: SensorSpec, new_data_df: DataFrame 93 ) -> None: 94 """Run sensor in streaming mode (internally runs in batch mode).""" 95 96 def foreach_batch_check_new_data(df: DataFrame, batch_id: int) -> None: 97 # forcing session to be available inside forEachBatch on 98 # Spark Connect 99 ExecEnv.get_or_create() 100 101 Sensor._run_batch_sensor( 102 sensor_spec=sensor_spec, 103 new_data_df=df, 104 ) 105 106 new_data_df.writeStream.trigger(availableNow=True).option( 107 "checkpointLocation", sensor_spec.checkpoint_location 108 ).foreachBatch(foreach_batch_check_new_data).start().awaitTermination() 109 110 @classmethod 111 def _run_batch_sensor( 112 cls, 113 sensor_spec: SensorSpec, 114 new_data_df: DataFrame, 115 ) -> None: 116 """Run sensor in batch mode. 117 118 Args: 119 sensor_spec: sensor spec containing all sensor information. 120 new_data_df: DataFrame possibly containing new data. 121 """ 122 new_data_first_row = SensorUpstreamManager.get_new_data(new_data_df) 123 124 cls._LOGGER.info( 125 f"Sensor {sensor_spec.sensor_id} has new data from upstream? " 126 f"{new_data_first_row is not None}" 127 ) 128 129 if new_data_first_row: 130 SensorControlTableManager.update_sensor_status( 131 sensor_spec=sensor_spec, 132 status=SensorStatus.ACQUIRED_NEW_DATA.value, 133 upstream_key=( 134 new_data_first_row.UPSTREAM_KEY 135 if "UPSTREAM_KEY" in new_data_df.columns 136 else None 137 ), 138 upstream_value=( 139 new_data_first_row.UPSTREAM_VALUE 140 if "UPSTREAM_VALUE" in new_data_df.columns 141 else None 142 ), 143 ) 144 cls._LOGGER.info( 145 f"Successfully updated sensor status for sensor " 146 f"{sensor_spec.sensor_id}..." 147 ) 148 149 def _validate_sensor_spec(self) -> None: 150 """Validate if sensor spec Read Type is allowed for the selected Data Format.""" 151 if InputFormat.exists(self.spec.input_spec.data_format): 152 if ( 153 self.spec.input_spec.data_format 154 not in SENSOR_ALLOWED_DATA_FORMATS[self.spec.input_spec.read_type] 155 ): 156 raise NotImplementedError( 157 f"A sensor has not been implemented yet for this data format or, " 158 f"this data format is not available for the read_type" 159 f" {self.spec.input_spec.read_type}. " 160 f"Check the allowed combinations of read_type and data_formats:" 161 f" {SENSOR_ALLOWED_DATA_FORMATS}" 162 ) 163 else: 164 raise NotImplementedError( 165 f"Data format {self.spec.input_spec.data_format} isn't implemented yet." 166 )
26class Sensor(Algorithm): 27 """Class representing a sensor to check if the upstream has new data.""" 28 29 _LOGGER = LoggingHandler(__name__).get_logger() 30 31 def __init__(self, acon: dict): 32 """Construct Sensor instances. 33 34 Args: 35 acon: algorithm configuration. 36 """ 37 self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon) 38 self._validate_sensor_spec() 39 40 if self._check_if_sensor_already_exists(): 41 raise SensorAlreadyExistsException( 42 "There's already a sensor registered with same id or assets!" 43 ) 44 45 def execute(self) -> bool: 46 """Execute the sensor.""" 47 self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...") 48 49 new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec) 50 if self.spec.input_spec.read_type == ReadType.STREAMING.value: 51 Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df) 52 elif self.spec.input_spec.read_type == ReadType.BATCH.value: 53 Sensor._run_batch_sensor( 54 sensor_spec=self.spec, 55 new_data_df=new_data_df, 56 ) 57 58 has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data( 59 self.spec.sensor_id, 60 self.spec.control_db_table_name, 61 ) 62 63 self._LOGGER.info( 64 f"Sensor {self.spec.sensor_id} has previously " 65 f"acquired data? {has_new_data}" 66 ) 67 68 if self.spec.fail_on_empty_result and not has_new_data: 69 raise NoNewDataException( 70 f"No data was acquired by {self.spec.sensor_id} sensor." 71 ) 72 73 return has_new_data 74 75 def _check_if_sensor_already_exists(self) -> bool: 76 """Check if sensor already exists in the table to avoid duplicates.""" 77 row = SensorControlTableManager.read_sensor_table_data( 78 sensor_id=self.spec.sensor_id, 79 control_db_table_name=self.spec.control_db_table_name, 80 ) 81 82 if row and row.assets != self.spec.assets: 83 return True 84 else: 85 row = SensorControlTableManager.read_sensor_table_data( 86 assets=self.spec.assets, 87 control_db_table_name=self.spec.control_db_table_name, 88 ) 89 return row is not None and row.sensor_id != self.spec.sensor_id 90 91 @classmethod 92 def _run_streaming_sensor( 93 cls, sensor_spec: SensorSpec, new_data_df: DataFrame 94 ) -> None: 95 """Run sensor in streaming mode (internally runs in batch mode).""" 96 97 def foreach_batch_check_new_data(df: DataFrame, batch_id: int) -> None: 98 # forcing session to be available inside forEachBatch on 99 # Spark Connect 100 ExecEnv.get_or_create() 101 102 Sensor._run_batch_sensor( 103 sensor_spec=sensor_spec, 104 new_data_df=df, 105 ) 106 107 new_data_df.writeStream.trigger(availableNow=True).option( 108 "checkpointLocation", sensor_spec.checkpoint_location 109 ).foreachBatch(foreach_batch_check_new_data).start().awaitTermination() 110 111 @classmethod 112 def _run_batch_sensor( 113 cls, 114 sensor_spec: SensorSpec, 115 new_data_df: DataFrame, 116 ) -> None: 117 """Run sensor in batch mode. 118 119 Args: 120 sensor_spec: sensor spec containing all sensor information. 121 new_data_df: DataFrame possibly containing new data. 122 """ 123 new_data_first_row = SensorUpstreamManager.get_new_data(new_data_df) 124 125 cls._LOGGER.info( 126 f"Sensor {sensor_spec.sensor_id} has new data from upstream? " 127 f"{new_data_first_row is not None}" 128 ) 129 130 if new_data_first_row: 131 SensorControlTableManager.update_sensor_status( 132 sensor_spec=sensor_spec, 133 status=SensorStatus.ACQUIRED_NEW_DATA.value, 134 upstream_key=( 135 new_data_first_row.UPSTREAM_KEY 136 if "UPSTREAM_KEY" in new_data_df.columns 137 else None 138 ), 139 upstream_value=( 140 new_data_first_row.UPSTREAM_VALUE 141 if "UPSTREAM_VALUE" in new_data_df.columns 142 else None 143 ), 144 ) 145 cls._LOGGER.info( 146 f"Successfully updated sensor status for sensor " 147 f"{sensor_spec.sensor_id}..." 148 ) 149 150 def _validate_sensor_spec(self) -> None: 151 """Validate if sensor spec Read Type is allowed for the selected Data Format.""" 152 if InputFormat.exists(self.spec.input_spec.data_format): 153 if ( 154 self.spec.input_spec.data_format 155 not in SENSOR_ALLOWED_DATA_FORMATS[self.spec.input_spec.read_type] 156 ): 157 raise NotImplementedError( 158 f"A sensor has not been implemented yet for this data format or, " 159 f"this data format is not available for the read_type" 160 f" {self.spec.input_spec.read_type}. " 161 f"Check the allowed combinations of read_type and data_formats:" 162 f" {SENSOR_ALLOWED_DATA_FORMATS}" 163 ) 164 else: 165 raise NotImplementedError( 166 f"Data format {self.spec.input_spec.data_format} isn't implemented yet." 167 )
Class representing a sensor to check if the upstream has new data.
Sensor(acon: dict)
31 def __init__(self, acon: dict): 32 """Construct Sensor instances. 33 34 Args: 35 acon: algorithm configuration. 36 """ 37 self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon) 38 self._validate_sensor_spec() 39 40 if self._check_if_sensor_already_exists(): 41 raise SensorAlreadyExistsException( 42 "There's already a sensor registered with same id or assets!" 43 )
Construct Sensor instances.
Arguments:
- acon: algorithm configuration.
def
execute(self) -> bool:
45 def execute(self) -> bool: 46 """Execute the sensor.""" 47 self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...") 48 49 new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec) 50 if self.spec.input_spec.read_type == ReadType.STREAMING.value: 51 Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df) 52 elif self.spec.input_spec.read_type == ReadType.BATCH.value: 53 Sensor._run_batch_sensor( 54 sensor_spec=self.spec, 55 new_data_df=new_data_df, 56 ) 57 58 has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data( 59 self.spec.sensor_id, 60 self.spec.control_db_table_name, 61 ) 62 63 self._LOGGER.info( 64 f"Sensor {self.spec.sensor_id} has previously " 65 f"acquired data? {has_new_data}" 66 ) 67 68 if self.spec.fail_on_empty_result and not has_new_data: 69 raise NoNewDataException( 70 f"No data was acquired by {self.spec.sensor_id} sensor." 71 ) 72 73 return has_new_data
Execute the sensor.