lakehouse_engine.algorithms.dq_validator
Module to define Data Validator class.
1"""Module to define Data Validator class.""" 2 3from delta.tables import DeltaTable 4from pyspark.sql import DataFrame 5from pyspark.sql.utils import StreamingQueryException 6 7from lakehouse_engine.algorithms.algorithm import Algorithm 8from lakehouse_engine.core.definitions import DQSpec, DQValidatorSpec, InputSpec 9from lakehouse_engine.core.exec_env import ExecEnv 10from lakehouse_engine.dq_processors.dq_factory import DQFactory 11from lakehouse_engine.dq_processors.exceptions import DQValidationsFailedException 12from lakehouse_engine.io.reader_factory import ReaderFactory 13from lakehouse_engine.utils.logging_handler import LoggingHandler 14 15 16class DQValidator(Algorithm): 17 """Validate data using an algorithm configuration (ACON represented as dict). 18 19 This algorithm focuses on isolate Data Quality Validations from loading, 20 applying a set of data quality functions to a specific input dataset, 21 without the need to define any output specification. 22 You can use any input specification compatible with the lakehouse engine 23 (dataframe, table, files, etc). 24 """ 25 26 _LOGGER = LoggingHandler(__name__).get_logger() 27 28 def __init__(self, acon: dict): 29 """Construct DQValidator algorithm instances. 30 31 A data quality validator needs the following specifications to work 32 properly: 33 - input specification (mandatory): specify how and what data to 34 read. 35 - data quality specification (mandatory): specify how to execute 36 the data quality process. 37 - restore_prev_version (optional): specify if, having 38 delta table/files as input, they should be restored to the 39 previous version if the data quality process fails. Note: this 40 is only considered if fail_on_error is kept as True. 41 42 Args: 43 acon: algorithm configuration. 44 """ 45 self.spec: DQValidatorSpec = DQValidatorSpec( 46 input_spec=InputSpec(**acon["input_spec"]), 47 dq_spec=self._get_dq_spec(acon["dq_spec"]), 48 restore_prev_version=acon.get("restore_prev_version", None), 49 ) 50 51 def read(self) -> DataFrame: 52 """Read data from an input location into a distributed dataframe. 53 54 Returns: 55 Dataframe with data that was read. 56 """ 57 current_df = ReaderFactory.get_data(self.spec.input_spec) 58 59 return current_df 60 61 def process_dq(self, data: DataFrame) -> DataFrame: 62 """Process the data quality tasks for the data that was read. 63 64 It supports a single input dataframe. 65 66 It is possible to use data quality validators/expectations that will validate 67 your data and fail the process in case the expectations are not met. The DQ 68 process also generates and keeps updating a site containing the results of the 69 expectations that were done on your data. The location of the site is 70 configurable and can either be on file system or S3. If you define it to be 71 stored on S3, you can even configure your S3 bucket to serve the site so that 72 people can easily check the quality of your data. Moreover, it is also 73 possible to store the result of the DQ process into a defined result sink. 74 75 Args: 76 data: input dataframe on which to run the DQ process. 77 78 Returns: 79 Validated dataframe. 80 """ 81 return DQFactory.run_dq_process(self.spec.dq_spec, data) 82 83 def execute(self) -> None: 84 """Define the algorithm execution behaviour.""" 85 self._LOGGER.info("Starting read stage...") 86 read_df = self.read() 87 88 self._LOGGER.info("Starting data quality validator...") 89 try: 90 if read_df.isStreaming: 91 # To handle streaming, and although we are not interested in 92 # writing any data, we still need to start the streaming and 93 # execute the data quality process in micro batches of data. 94 def write_dq_validator_micro_batch( 95 batch_df: DataFrame, batch_id: int 96 ) -> None: 97 self.process_dq(batch_df) 98 99 read_df.writeStream.trigger(once=True).foreachBatch( 100 write_dq_validator_micro_batch 101 ).start().awaitTermination() 102 103 else: 104 self.process_dq(read_df) 105 except (DQValidationsFailedException, StreamingQueryException): 106 if not self.spec.input_spec.df_name and self.spec.restore_prev_version: 107 self._LOGGER.info("Restoring delta table/files to previous version...") 108 109 self._restore_prev_version() 110 111 raise DQValidationsFailedException( 112 "Data Quality Validations Failed! The delta " 113 "table/files were restored to the previous version!" 114 ) 115 116 elif self.spec.dq_spec.fail_on_error: 117 raise DQValidationsFailedException("Data Quality Validations Failed!") 118 else: 119 self._LOGGER.info("Execution of the algorithm has finished!") 120 121 @staticmethod 122 def _get_dq_spec(input_dq_spec: dict) -> DQSpec: 123 """Get data quality specification from acon. 124 125 Args: 126 input_dq_spec: data quality specification. 127 128 Returns: 129 Data quality spec. 130 """ 131 dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(input_dq_spec) 132 133 dq_spec.dq_functions = dq_functions 134 dq_spec.critical_functions = critical_functions 135 136 return dq_spec 137 138 def _restore_prev_version(self) -> None: 139 """Restore delta table or delta files to previous version.""" 140 if self.spec.input_spec.db_table: 141 delta_table = DeltaTable.forName( 142 ExecEnv.SESSION, self.spec.input_spec.db_table 143 ) 144 else: 145 delta_table = DeltaTable.forPath( 146 ExecEnv.SESSION, self.spec.input_spec.location 147 ) 148 149 previous_version = ( 150 delta_table.history().agg({"version": "max"}).collect()[0][0] - 1 151 ) 152 153 delta_table.restoreToVersion(previous_version)
17class DQValidator(Algorithm): 18 """Validate data using an algorithm configuration (ACON represented as dict). 19 20 This algorithm focuses on isolate Data Quality Validations from loading, 21 applying a set of data quality functions to a specific input dataset, 22 without the need to define any output specification. 23 You can use any input specification compatible with the lakehouse engine 24 (dataframe, table, files, etc). 25 """ 26 27 _LOGGER = LoggingHandler(__name__).get_logger() 28 29 def __init__(self, acon: dict): 30 """Construct DQValidator algorithm instances. 31 32 A data quality validator needs the following specifications to work 33 properly: 34 - input specification (mandatory): specify how and what data to 35 read. 36 - data quality specification (mandatory): specify how to execute 37 the data quality process. 38 - restore_prev_version (optional): specify if, having 39 delta table/files as input, they should be restored to the 40 previous version if the data quality process fails. Note: this 41 is only considered if fail_on_error is kept as True. 42 43 Args: 44 acon: algorithm configuration. 45 """ 46 self.spec: DQValidatorSpec = DQValidatorSpec( 47 input_spec=InputSpec(**acon["input_spec"]), 48 dq_spec=self._get_dq_spec(acon["dq_spec"]), 49 restore_prev_version=acon.get("restore_prev_version", None), 50 ) 51 52 def read(self) -> DataFrame: 53 """Read data from an input location into a distributed dataframe. 54 55 Returns: 56 Dataframe with data that was read. 57 """ 58 current_df = ReaderFactory.get_data(self.spec.input_spec) 59 60 return current_df 61 62 def process_dq(self, data: DataFrame) -> DataFrame: 63 """Process the data quality tasks for the data that was read. 64 65 It supports a single input dataframe. 66 67 It is possible to use data quality validators/expectations that will validate 68 your data and fail the process in case the expectations are not met. The DQ 69 process also generates and keeps updating a site containing the results of the 70 expectations that were done on your data. The location of the site is 71 configurable and can either be on file system or S3. If you define it to be 72 stored on S3, you can even configure your S3 bucket to serve the site so that 73 people can easily check the quality of your data. Moreover, it is also 74 possible to store the result of the DQ process into a defined result sink. 75 76 Args: 77 data: input dataframe on which to run the DQ process. 78 79 Returns: 80 Validated dataframe. 81 """ 82 return DQFactory.run_dq_process(self.spec.dq_spec, data) 83 84 def execute(self) -> None: 85 """Define the algorithm execution behaviour.""" 86 self._LOGGER.info("Starting read stage...") 87 read_df = self.read() 88 89 self._LOGGER.info("Starting data quality validator...") 90 try: 91 if read_df.isStreaming: 92 # To handle streaming, and although we are not interested in 93 # writing any data, we still need to start the streaming and 94 # execute the data quality process in micro batches of data. 95 def write_dq_validator_micro_batch( 96 batch_df: DataFrame, batch_id: int 97 ) -> None: 98 self.process_dq(batch_df) 99 100 read_df.writeStream.trigger(once=True).foreachBatch( 101 write_dq_validator_micro_batch 102 ).start().awaitTermination() 103 104 else: 105 self.process_dq(read_df) 106 except (DQValidationsFailedException, StreamingQueryException): 107 if not self.spec.input_spec.df_name and self.spec.restore_prev_version: 108 self._LOGGER.info("Restoring delta table/files to previous version...") 109 110 self._restore_prev_version() 111 112 raise DQValidationsFailedException( 113 "Data Quality Validations Failed! The delta " 114 "table/files were restored to the previous version!" 115 ) 116 117 elif self.spec.dq_spec.fail_on_error: 118 raise DQValidationsFailedException("Data Quality Validations Failed!") 119 else: 120 self._LOGGER.info("Execution of the algorithm has finished!") 121 122 @staticmethod 123 def _get_dq_spec(input_dq_spec: dict) -> DQSpec: 124 """Get data quality specification from acon. 125 126 Args: 127 input_dq_spec: data quality specification. 128 129 Returns: 130 Data quality spec. 131 """ 132 dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(input_dq_spec) 133 134 dq_spec.dq_functions = dq_functions 135 dq_spec.critical_functions = critical_functions 136 137 return dq_spec 138 139 def _restore_prev_version(self) -> None: 140 """Restore delta table or delta files to previous version.""" 141 if self.spec.input_spec.db_table: 142 delta_table = DeltaTable.forName( 143 ExecEnv.SESSION, self.spec.input_spec.db_table 144 ) 145 else: 146 delta_table = DeltaTable.forPath( 147 ExecEnv.SESSION, self.spec.input_spec.location 148 ) 149 150 previous_version = ( 151 delta_table.history().agg({"version": "max"}).collect()[0][0] - 1 152 ) 153 154 delta_table.restoreToVersion(previous_version)
Validate data using an algorithm configuration (ACON represented as dict).
This algorithm focuses on isolate Data Quality Validations from loading, applying a set of data quality functions to a specific input dataset, without the need to define any output specification. You can use any input specification compatible with the lakehouse engine (dataframe, table, files, etc).
29 def __init__(self, acon: dict): 30 """Construct DQValidator algorithm instances. 31 32 A data quality validator needs the following specifications to work 33 properly: 34 - input specification (mandatory): specify how and what data to 35 read. 36 - data quality specification (mandatory): specify how to execute 37 the data quality process. 38 - restore_prev_version (optional): specify if, having 39 delta table/files as input, they should be restored to the 40 previous version if the data quality process fails. Note: this 41 is only considered if fail_on_error is kept as True. 42 43 Args: 44 acon: algorithm configuration. 45 """ 46 self.spec: DQValidatorSpec = DQValidatorSpec( 47 input_spec=InputSpec(**acon["input_spec"]), 48 dq_spec=self._get_dq_spec(acon["dq_spec"]), 49 restore_prev_version=acon.get("restore_prev_version", None), 50 )
Construct DQValidator algorithm instances.
A data quality validator needs the following specifications to work properly: - input specification (mandatory): specify how and what data to read. - data quality specification (mandatory): specify how to execute the data quality process. - restore_prev_version (optional): specify if, having delta table/files as input, they should be restored to the previous version if the data quality process fails. Note: this is only considered if fail_on_error is kept as True.
Arguments:
- acon: algorithm configuration.
52 def read(self) -> DataFrame: 53 """Read data from an input location into a distributed dataframe. 54 55 Returns: 56 Dataframe with data that was read. 57 """ 58 current_df = ReaderFactory.get_data(self.spec.input_spec) 59 60 return current_df
Read data from an input location into a distributed dataframe.
Returns:
Dataframe with data that was read.
62 def process_dq(self, data: DataFrame) -> DataFrame: 63 """Process the data quality tasks for the data that was read. 64 65 It supports a single input dataframe. 66 67 It is possible to use data quality validators/expectations that will validate 68 your data and fail the process in case the expectations are not met. The DQ 69 process also generates and keeps updating a site containing the results of the 70 expectations that were done on your data. The location of the site is 71 configurable and can either be on file system or S3. If you define it to be 72 stored on S3, you can even configure your S3 bucket to serve the site so that 73 people can easily check the quality of your data. Moreover, it is also 74 possible to store the result of the DQ process into a defined result sink. 75 76 Args: 77 data: input dataframe on which to run the DQ process. 78 79 Returns: 80 Validated dataframe. 81 """ 82 return DQFactory.run_dq_process(self.spec.dq_spec, data)
Process the data quality tasks for the data that was read.
It supports a single input dataframe.
It is possible to use data quality validators/expectations that will validate your data and fail the process in case the expectations are not met. The DQ process also generates and keeps updating a site containing the results of the expectations that were done on your data. The location of the site is configurable and can either be on file system or S3. If you define it to be stored on S3, you can even configure your S3 bucket to serve the site so that people can easily check the quality of your data. Moreover, it is also possible to store the result of the DQ process into a defined result sink.
Arguments:
- data: input dataframe on which to run the DQ process.
Returns:
Validated dataframe.
84 def execute(self) -> None: 85 """Define the algorithm execution behaviour.""" 86 self._LOGGER.info("Starting read stage...") 87 read_df = self.read() 88 89 self._LOGGER.info("Starting data quality validator...") 90 try: 91 if read_df.isStreaming: 92 # To handle streaming, and although we are not interested in 93 # writing any data, we still need to start the streaming and 94 # execute the data quality process in micro batches of data. 95 def write_dq_validator_micro_batch( 96 batch_df: DataFrame, batch_id: int 97 ) -> None: 98 self.process_dq(batch_df) 99 100 read_df.writeStream.trigger(once=True).foreachBatch( 101 write_dq_validator_micro_batch 102 ).start().awaitTermination() 103 104 else: 105 self.process_dq(read_df) 106 except (DQValidationsFailedException, StreamingQueryException): 107 if not self.spec.input_spec.df_name and self.spec.restore_prev_version: 108 self._LOGGER.info("Restoring delta table/files to previous version...") 109 110 self._restore_prev_version() 111 112 raise DQValidationsFailedException( 113 "Data Quality Validations Failed! The delta " 114 "table/files were restored to the previous version!" 115 ) 116 117 elif self.spec.dq_spec.fail_on_error: 118 raise DQValidationsFailedException("Data Quality Validations Failed!") 119 else: 120 self._LOGGER.info("Execution of the algorithm has finished!")
Define the algorithm execution behaviour.