Skip to content

Sensor

Module to define Sensor algorithm behavior.

Sensor

Bases: Algorithm

Class representing a sensor to check if the upstream has new data.

Source code in mkdocs/lakehouse_engine/packages/algorithms/sensor.py
class Sensor(Algorithm):
    """Class representing a sensor to check if the upstream has new data."""

    _LOGGER = LoggingHandler(__name__).get_logger()

    def __init__(self, acon: dict):
        """Construct Sensor instances.

        Args:
            acon: algorithm configuration.
        """
        self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon)
        self._validate_sensor_spec()

        if self._check_if_sensor_already_exists():
            raise SensorAlreadyExistsException(
                "There's already a sensor registered with same id or assets!"
            )

    def execute(self) -> bool:
        """Execute the sensor."""
        self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...")

        new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec)
        if self.spec.input_spec.read_type == ReadType.STREAMING.value:
            Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df)
        elif self.spec.input_spec.read_type == ReadType.BATCH.value:
            Sensor._run_batch_sensor(
                sensor_spec=self.spec,
                new_data_df=new_data_df,
            )

        has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data(
            self.spec.sensor_id,
            self.spec.control_db_table_name,
        )

        self._LOGGER.info(
            f"Sensor {self.spec.sensor_id} has previously "
            f"acquired data? {has_new_data}"
        )

        if self.spec.fail_on_empty_result and not has_new_data:
            raise NoNewDataException(
                f"No data was acquired by {self.spec.sensor_id} sensor."
            )

        return has_new_data

    def _check_if_sensor_already_exists(self) -> bool:
        """Check if sensor already exists in the table to avoid duplicates."""
        row = SensorControlTableManager.read_sensor_table_data(
            sensor_id=self.spec.sensor_id,
            control_db_table_name=self.spec.control_db_table_name,
        )

        if row and row.assets != self.spec.assets:
            return True
        else:
            row = SensorControlTableManager.read_sensor_table_data(
                assets=self.spec.assets,
                control_db_table_name=self.spec.control_db_table_name,
            )
            return row is not None and row.sensor_id != self.spec.sensor_id

    @classmethod
    def _run_streaming_sensor(
        cls, sensor_spec: SensorSpec, new_data_df: DataFrame
    ) -> None:
        """Run sensor in streaming mode (internally runs in batch mode)."""

        def foreach_batch_check_new_data(df: DataFrame, batch_id: int) -> None:
            # forcing session to be available inside forEachBatch on
            # Spark Connect
            ExecEnv.get_or_create()

            Sensor._run_batch_sensor(
                sensor_spec=sensor_spec,
                new_data_df=df,
            )

        new_data_df.writeStream.trigger(availableNow=True).option(
            "checkpointLocation", sensor_spec.checkpoint_location
        ).foreachBatch(foreach_batch_check_new_data).start().awaitTermination()

    @classmethod
    def _run_batch_sensor(
        cls,
        sensor_spec: SensorSpec,
        new_data_df: DataFrame,
    ) -> None:
        """Run sensor in batch mode.

        Args:
            sensor_spec: sensor spec containing all sensor information.
            new_data_df: DataFrame possibly containing new data.
        """
        new_data_first_row = SensorUpstreamManager.get_new_data(new_data_df)

        cls._LOGGER.info(
            f"Sensor {sensor_spec.sensor_id} has new data from upstream? "
            f"{new_data_first_row is not None}"
        )

        if new_data_first_row:
            SensorControlTableManager.update_sensor_status(
                sensor_spec=sensor_spec,
                status=SensorStatus.ACQUIRED_NEW_DATA.value,
                upstream_key=(
                    new_data_first_row.UPSTREAM_KEY
                    if "UPSTREAM_KEY" in new_data_df.columns
                    else None
                ),
                upstream_value=(
                    new_data_first_row.UPSTREAM_VALUE
                    if "UPSTREAM_VALUE" in new_data_df.columns
                    else None
                ),
            )
            cls._LOGGER.info(
                f"Successfully updated sensor status for sensor "
                f"{sensor_spec.sensor_id}..."
            )

    def _validate_sensor_spec(self) -> None:
        """Validate if sensor spec Read Type is allowed for the selected Data Format."""
        if InputFormat.exists(self.spec.input_spec.data_format):
            if (
                self.spec.input_spec.data_format
                not in SENSOR_ALLOWED_DATA_FORMATS[self.spec.input_spec.read_type]
            ):
                raise NotImplementedError(
                    f"A sensor has not been implemented yet for this data format or, "
                    f"this data format is not available for the read_type"
                    f" {self.spec.input_spec.read_type}. "
                    f"Check the allowed combinations of read_type and data_formats:"
                    f" {SENSOR_ALLOWED_DATA_FORMATS}"
                )
        else:
            raise NotImplementedError(
                f"Data format {self.spec.input_spec.data_format} isn't implemented yet."
            )

__init__(acon)

Construct Sensor instances.

Parameters:

Name Type Description Default
acon dict

algorithm configuration.

required
Source code in mkdocs/lakehouse_engine/packages/algorithms/sensor.py
def __init__(self, acon: dict):
    """Construct Sensor instances.

    Args:
        acon: algorithm configuration.
    """
    self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon)
    self._validate_sensor_spec()

    if self._check_if_sensor_already_exists():
        raise SensorAlreadyExistsException(
            "There's already a sensor registered with same id or assets!"
        )

execute()

Execute the sensor.

Source code in mkdocs/lakehouse_engine/packages/algorithms/sensor.py
def execute(self) -> bool:
    """Execute the sensor."""
    self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...")

    new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec)
    if self.spec.input_spec.read_type == ReadType.STREAMING.value:
        Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df)
    elif self.spec.input_spec.read_type == ReadType.BATCH.value:
        Sensor._run_batch_sensor(
            sensor_spec=self.spec,
            new_data_df=new_data_df,
        )

    has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data(
        self.spec.sensor_id,
        self.spec.control_db_table_name,
    )

    self._LOGGER.info(
        f"Sensor {self.spec.sensor_id} has previously "
        f"acquired data? {has_new_data}"
    )

    if self.spec.fail_on_empty_result and not has_new_data:
        raise NoNewDataException(
            f"No data was acquired by {self.spec.sensor_id} sensor."
        )

    return has_new_data