lakehouse_engine.algorithms.dq_validator

Module to define Data Validator class.

  1"""Module to define Data Validator class."""
  2
  3from delta.tables import DeltaTable
  4from pyspark.sql import DataFrame
  5from pyspark.sql.utils import StreamingQueryException
  6
  7from lakehouse_engine.algorithms.algorithm import Algorithm
  8from lakehouse_engine.core.definitions import DQSpec, DQValidatorSpec, InputSpec
  9from lakehouse_engine.core.exec_env import ExecEnv
 10from lakehouse_engine.dq_processors.dq_factory import DQFactory
 11from lakehouse_engine.dq_processors.exceptions import DQValidationsFailedException
 12from lakehouse_engine.io.reader_factory import ReaderFactory
 13from lakehouse_engine.utils.logging_handler import LoggingHandler
 14
 15
 16class DQValidator(Algorithm):
 17    """Validate data using an algorithm configuration (ACON represented as dict).
 18
 19    This algorithm focuses on isolate Data Quality Validations from loading,
 20    applying a set of data quality functions to a specific input dataset,
 21    without the need to define any output specification.
 22    You can use any input specification compatible with the lakehouse engine
 23    (dataframe, table, files, etc).
 24    """
 25
 26    _LOGGER = LoggingHandler(__name__).get_logger()
 27
 28    def __init__(self, acon: dict):
 29        """Construct DQValidator algorithm instances.
 30
 31        A data quality validator needs the following specifications to work
 32        properly:
 33            - input specification (mandatory): specify how and what data to
 34            read.
 35            - data quality specification (mandatory): specify how to execute
 36            the data quality process.
 37            - restore_prev_version (optional): specify if, having
 38            delta table/files as input, they should be restored to the
 39            previous version if the data quality process fails. Note: this
 40            is only considered if fail_on_error is kept as True.
 41
 42        Args:
 43            acon: algorithm configuration.
 44        """
 45        self.spec: DQValidatorSpec = DQValidatorSpec(
 46            input_spec=InputSpec(**acon["input_spec"]),
 47            dq_spec=self._get_dq_spec(acon["dq_spec"]),
 48            restore_prev_version=acon.get("restore_prev_version", None),
 49        )
 50
 51    def read(self) -> DataFrame:
 52        """Read data from an input location into a distributed dataframe.
 53
 54        Returns:
 55             Dataframe with data that was read.
 56        """
 57        current_df = ReaderFactory.get_data(self.spec.input_spec)
 58
 59        return current_df
 60
 61    def process_dq(self, data: DataFrame) -> DataFrame:
 62        """Process the data quality tasks for the data that was read.
 63
 64        It supports a single input dataframe.
 65
 66        It is possible to use data quality validators/expectations that will validate
 67        your data and fail the process in case the expectations are not met. The DQ
 68        process also generates and keeps updating a site containing the results of the
 69        expectations that were done on your data. The location of the site is
 70        configurable and can either be on file system or S3. If you define it to be
 71        stored on S3, you can even configure your S3 bucket to serve the site so that
 72        people can easily check the quality of your data. Moreover, it is also
 73        possible to store the result of the DQ process into a defined result sink.
 74
 75        Args:
 76            data: input dataframe on which to run the DQ process.
 77
 78        Returns:
 79            Validated dataframe.
 80        """
 81        return DQFactory.run_dq_process(self.spec.dq_spec, data)
 82
 83    def execute(self) -> None:
 84        """Define the algorithm execution behaviour."""
 85        self._LOGGER.info("Starting read stage...")
 86        read_df = self.read()
 87
 88        self._LOGGER.info("Starting data quality validator...")
 89        try:
 90            if read_df.isStreaming:
 91                # To handle streaming, and although we are not interested in
 92                # writing any data, we still need to start the streaming and
 93                # execute the data quality process in micro batches of data.
 94                def write_dq_validator_micro_batch(
 95                    batch_df: DataFrame, batch_id: int
 96                ) -> None:
 97                    self.process_dq(batch_df)
 98
 99                read_df.writeStream.trigger(once=True).foreachBatch(
100                    write_dq_validator_micro_batch
101                ).start().awaitTermination()
102
103            else:
104                self.process_dq(read_df)
105        except (DQValidationsFailedException, StreamingQueryException):
106            if not self.spec.input_spec.df_name and self.spec.restore_prev_version:
107                self._LOGGER.info("Restoring delta table/files to previous version...")
108
109                self._restore_prev_version()
110
111                raise DQValidationsFailedException(
112                    "Data Quality Validations Failed! The delta "
113                    "table/files were restored to the previous version!"
114                )
115
116            elif self.spec.dq_spec.fail_on_error:
117                raise DQValidationsFailedException("Data Quality Validations Failed!")
118        else:
119            self._LOGGER.info("Execution of the algorithm has finished!")
120
121    @staticmethod
122    def _get_dq_spec(input_dq_spec: dict) -> DQSpec:
123        """Get data quality specification from acon.
124
125        Args:
126            input_dq_spec: data quality specification.
127
128        Returns:
129            Data quality spec.
130        """
131        dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(input_dq_spec)
132
133        dq_spec.dq_functions = dq_functions
134        dq_spec.critical_functions = critical_functions
135
136        return dq_spec
137
138    def _restore_prev_version(self) -> None:
139        """Restore delta table or delta files to previous version."""
140        if self.spec.input_spec.db_table:
141            delta_table = DeltaTable.forName(
142                ExecEnv.SESSION, self.spec.input_spec.db_table
143            )
144        else:
145            delta_table = DeltaTable.forPath(
146                ExecEnv.SESSION, self.spec.input_spec.location
147            )
148
149        previous_version = (
150            delta_table.history().agg({"version": "max"}).collect()[0][0] - 1
151        )
152
153        delta_table.restoreToVersion(previous_version)
class DQValidator(lakehouse_engine.algorithms.algorithm.Algorithm):
 17class DQValidator(Algorithm):
 18    """Validate data using an algorithm configuration (ACON represented as dict).
 19
 20    This algorithm focuses on isolate Data Quality Validations from loading,
 21    applying a set of data quality functions to a specific input dataset,
 22    without the need to define any output specification.
 23    You can use any input specification compatible with the lakehouse engine
 24    (dataframe, table, files, etc).
 25    """
 26
 27    _LOGGER = LoggingHandler(__name__).get_logger()
 28
 29    def __init__(self, acon: dict):
 30        """Construct DQValidator algorithm instances.
 31
 32        A data quality validator needs the following specifications to work
 33        properly:
 34            - input specification (mandatory): specify how and what data to
 35            read.
 36            - data quality specification (mandatory): specify how to execute
 37            the data quality process.
 38            - restore_prev_version (optional): specify if, having
 39            delta table/files as input, they should be restored to the
 40            previous version if the data quality process fails. Note: this
 41            is only considered if fail_on_error is kept as True.
 42
 43        Args:
 44            acon: algorithm configuration.
 45        """
 46        self.spec: DQValidatorSpec = DQValidatorSpec(
 47            input_spec=InputSpec(**acon["input_spec"]),
 48            dq_spec=self._get_dq_spec(acon["dq_spec"]),
 49            restore_prev_version=acon.get("restore_prev_version", None),
 50        )
 51
 52    def read(self) -> DataFrame:
 53        """Read data from an input location into a distributed dataframe.
 54
 55        Returns:
 56             Dataframe with data that was read.
 57        """
 58        current_df = ReaderFactory.get_data(self.spec.input_spec)
 59
 60        return current_df
 61
 62    def process_dq(self, data: DataFrame) -> DataFrame:
 63        """Process the data quality tasks for the data that was read.
 64
 65        It supports a single input dataframe.
 66
 67        It is possible to use data quality validators/expectations that will validate
 68        your data and fail the process in case the expectations are not met. The DQ
 69        process also generates and keeps updating a site containing the results of the
 70        expectations that were done on your data. The location of the site is
 71        configurable and can either be on file system or S3. If you define it to be
 72        stored on S3, you can even configure your S3 bucket to serve the site so that
 73        people can easily check the quality of your data. Moreover, it is also
 74        possible to store the result of the DQ process into a defined result sink.
 75
 76        Args:
 77            data: input dataframe on which to run the DQ process.
 78
 79        Returns:
 80            Validated dataframe.
 81        """
 82        return DQFactory.run_dq_process(self.spec.dq_spec, data)
 83
 84    def execute(self) -> None:
 85        """Define the algorithm execution behaviour."""
 86        self._LOGGER.info("Starting read stage...")
 87        read_df = self.read()
 88
 89        self._LOGGER.info("Starting data quality validator...")
 90        try:
 91            if read_df.isStreaming:
 92                # To handle streaming, and although we are not interested in
 93                # writing any data, we still need to start the streaming and
 94                # execute the data quality process in micro batches of data.
 95                def write_dq_validator_micro_batch(
 96                    batch_df: DataFrame, batch_id: int
 97                ) -> None:
 98                    self.process_dq(batch_df)
 99
100                read_df.writeStream.trigger(once=True).foreachBatch(
101                    write_dq_validator_micro_batch
102                ).start().awaitTermination()
103
104            else:
105                self.process_dq(read_df)
106        except (DQValidationsFailedException, StreamingQueryException):
107            if not self.spec.input_spec.df_name and self.spec.restore_prev_version:
108                self._LOGGER.info("Restoring delta table/files to previous version...")
109
110                self._restore_prev_version()
111
112                raise DQValidationsFailedException(
113                    "Data Quality Validations Failed! The delta "
114                    "table/files were restored to the previous version!"
115                )
116
117            elif self.spec.dq_spec.fail_on_error:
118                raise DQValidationsFailedException("Data Quality Validations Failed!")
119        else:
120            self._LOGGER.info("Execution of the algorithm has finished!")
121
122    @staticmethod
123    def _get_dq_spec(input_dq_spec: dict) -> DQSpec:
124        """Get data quality specification from acon.
125
126        Args:
127            input_dq_spec: data quality specification.
128
129        Returns:
130            Data quality spec.
131        """
132        dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(input_dq_spec)
133
134        dq_spec.dq_functions = dq_functions
135        dq_spec.critical_functions = critical_functions
136
137        return dq_spec
138
139    def _restore_prev_version(self) -> None:
140        """Restore delta table or delta files to previous version."""
141        if self.spec.input_spec.db_table:
142            delta_table = DeltaTable.forName(
143                ExecEnv.SESSION, self.spec.input_spec.db_table
144            )
145        else:
146            delta_table = DeltaTable.forPath(
147                ExecEnv.SESSION, self.spec.input_spec.location
148            )
149
150        previous_version = (
151            delta_table.history().agg({"version": "max"}).collect()[0][0] - 1
152        )
153
154        delta_table.restoreToVersion(previous_version)

Validate data using an algorithm configuration (ACON represented as dict).

This algorithm focuses on isolate Data Quality Validations from loading, applying a set of data quality functions to a specific input dataset, without the need to define any output specification. You can use any input specification compatible with the lakehouse engine (dataframe, table, files, etc).

DQValidator(acon: dict)
29    def __init__(self, acon: dict):
30        """Construct DQValidator algorithm instances.
31
32        A data quality validator needs the following specifications to work
33        properly:
34            - input specification (mandatory): specify how and what data to
35            read.
36            - data quality specification (mandatory): specify how to execute
37            the data quality process.
38            - restore_prev_version (optional): specify if, having
39            delta table/files as input, they should be restored to the
40            previous version if the data quality process fails. Note: this
41            is only considered if fail_on_error is kept as True.
42
43        Args:
44            acon: algorithm configuration.
45        """
46        self.spec: DQValidatorSpec = DQValidatorSpec(
47            input_spec=InputSpec(**acon["input_spec"]),
48            dq_spec=self._get_dq_spec(acon["dq_spec"]),
49            restore_prev_version=acon.get("restore_prev_version", None),
50        )

Construct DQValidator algorithm instances.

A data quality validator needs the following specifications to work properly: - input specification (mandatory): specify how and what data to read. - data quality specification (mandatory): specify how to execute the data quality process. - restore_prev_version (optional): specify if, having delta table/files as input, they should be restored to the previous version if the data quality process fails. Note: this is only considered if fail_on_error is kept as True.

Arguments:
  • acon: algorithm configuration.
def read(self) -> pyspark.sql.dataframe.DataFrame:
52    def read(self) -> DataFrame:
53        """Read data from an input location into a distributed dataframe.
54
55        Returns:
56             Dataframe with data that was read.
57        """
58        current_df = ReaderFactory.get_data(self.spec.input_spec)
59
60        return current_df

Read data from an input location into a distributed dataframe.

Returns:

Dataframe with data that was read.

def process_dq( self, data: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
62    def process_dq(self, data: DataFrame) -> DataFrame:
63        """Process the data quality tasks for the data that was read.
64
65        It supports a single input dataframe.
66
67        It is possible to use data quality validators/expectations that will validate
68        your data and fail the process in case the expectations are not met. The DQ
69        process also generates and keeps updating a site containing the results of the
70        expectations that were done on your data. The location of the site is
71        configurable and can either be on file system or S3. If you define it to be
72        stored on S3, you can even configure your S3 bucket to serve the site so that
73        people can easily check the quality of your data. Moreover, it is also
74        possible to store the result of the DQ process into a defined result sink.
75
76        Args:
77            data: input dataframe on which to run the DQ process.
78
79        Returns:
80            Validated dataframe.
81        """
82        return DQFactory.run_dq_process(self.spec.dq_spec, data)

Process the data quality tasks for the data that was read.

It supports a single input dataframe.

It is possible to use data quality validators/expectations that will validate your data and fail the process in case the expectations are not met. The DQ process also generates and keeps updating a site containing the results of the expectations that were done on your data. The location of the site is configurable and can either be on file system or S3. If you define it to be stored on S3, you can even configure your S3 bucket to serve the site so that people can easily check the quality of your data. Moreover, it is also possible to store the result of the DQ process into a defined result sink.

Arguments:
  • data: input dataframe on which to run the DQ process.
Returns:

Validated dataframe.

def execute(self) -> None:
 84    def execute(self) -> None:
 85        """Define the algorithm execution behaviour."""
 86        self._LOGGER.info("Starting read stage...")
 87        read_df = self.read()
 88
 89        self._LOGGER.info("Starting data quality validator...")
 90        try:
 91            if read_df.isStreaming:
 92                # To handle streaming, and although we are not interested in
 93                # writing any data, we still need to start the streaming and
 94                # execute the data quality process in micro batches of data.
 95                def write_dq_validator_micro_batch(
 96                    batch_df: DataFrame, batch_id: int
 97                ) -> None:
 98                    self.process_dq(batch_df)
 99
100                read_df.writeStream.trigger(once=True).foreachBatch(
101                    write_dq_validator_micro_batch
102                ).start().awaitTermination()
103
104            else:
105                self.process_dq(read_df)
106        except (DQValidationsFailedException, StreamingQueryException):
107            if not self.spec.input_spec.df_name and self.spec.restore_prev_version:
108                self._LOGGER.info("Restoring delta table/files to previous version...")
109
110                self._restore_prev_version()
111
112                raise DQValidationsFailedException(
113                    "Data Quality Validations Failed! The delta "
114                    "table/files were restored to the previous version!"
115                )
116
117            elif self.spec.dq_spec.fail_on_error:
118                raise DQValidationsFailedException("Data Quality Validations Failed!")
119        else:
120            self._LOGGER.info("Execution of the algorithm has finished!")

Define the algorithm execution behaviour.