lakehouse_engine.utils.dq_utils
Module containing utils for DQ processing.
1"""Module containing utils for DQ processing.""" 2 3from json import loads 4 5from pyspark.sql.functions import col, from_json, schema_of_json, struct 6 7from lakehouse_engine.core.definitions import DQTableBaseParameters 8from lakehouse_engine.core.exec_env import ExecEnv 9from lakehouse_engine.dq_processors.exceptions import DQSpecMalformedException 10from lakehouse_engine.utils.logging_handler import LoggingHandler 11 12_LOGGER = LoggingHandler(__name__).get_logger() 13 14 15class DQUtils: 16 """Utils related to the data quality process.""" 17 18 @staticmethod 19 def import_dq_rules_from_table( 20 spec: dict, 21 execution_point: str, 22 base_expectation_arguments: list, 23 extra_meta_arguments: list, 24 ) -> dict: 25 """Import dq rules from a table. 26 27 Args: 28 spec: data quality specification. 29 execution_point: if the execution is in_motion or at_rest. 30 base_expectation_arguments: base arguments for dq functions. 31 extra_meta_arguments: extra meta arguments for dq functions. 32 33 Returns: 34 The dictionary containing the dq spec with dq functions defined. 35 """ 36 dq_db_table = spec["dq_db_table"] 37 dq_functions = [] 38 39 if spec.get("dq_table_table_filter"): 40 dq_table_table_filter = spec["dq_table_table_filter"] 41 else: 42 raise DQSpecMalformedException( 43 "When importing rules from a table " 44 "dq_table_table_filter must be defined." 45 ) 46 47 extra_filters_query = ( 48 f""" and {spec["dq_table_extra_filters"]}""" 49 if spec.get("dq_table_extra_filters") 50 else "" 51 ) 52 53 fields = base_expectation_arguments + extra_meta_arguments 54 55 dq_functions_query = f""" 56 SELECT {", ".join(fields)} 57 FROM {dq_db_table} 58 WHERE 59 execution_point='{execution_point}' and table = '{dq_table_table_filter}' 60 {extra_filters_query}""" # nosec: B608 61 62 raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query) 63 64 arguments = raw_dq_functions.select("arguments").collect() 65 parsed_arguments = [loads(argument.arguments) for argument in arguments] 66 combined_dict: dict = {} 67 68 for argument in parsed_arguments: 69 combined_dict = {**combined_dict, **argument} 70 71 dq_function_arguments_schema = schema_of_json(str(combined_dict)) 72 73 processed_dq_functions = ( 74 raw_dq_functions.withColumn( 75 "json_data", from_json(col("arguments"), dq_function_arguments_schema) 76 ) 77 .withColumn( 78 "parsed_arguments", 79 struct( 80 col("json_data.*"), 81 struct(extra_meta_arguments).alias("meta"), 82 ), 83 ) 84 .drop(col("json_data")) 85 ) 86 87 unique_dq_functions = processed_dq_functions.drop_duplicates( 88 ["dq_tech_function", "arguments"] 89 ) 90 91 duplicated_rows = processed_dq_functions.subtract(unique_dq_functions) 92 93 if duplicated_rows.count() > 0: 94 _LOGGER.warn("Found Duplicates Rows:") 95 duplicated_rows.show(truncate=False) 96 97 processed_dq_functions_list = unique_dq_functions.collect() 98 for processed_dq_function in processed_dq_functions_list: 99 dq_functions.append( 100 { 101 "function": f"{processed_dq_function.dq_tech_function}", 102 "args": { 103 k: v 104 for k, v in processed_dq_function.parsed_arguments.asDict( 105 recursive=True 106 ).items() 107 if v is not None 108 }, 109 } 110 ) 111 112 spec["dq_functions"] = dq_functions 113 114 return spec 115 116 @staticmethod 117 def validate_dq_functions( 118 spec: dict, execution_point: str = "", extra_meta_arguments: list = None 119 ) -> None: 120 """Function to validate the dq functions defined in the dq_spec. 121 122 This function validates that the defined dq_functions contain all 123 the fields defined in the extra_meta_arguments parameter. 124 125 Args: 126 spec: data quality specification. 127 execution_point: if the execution is in_motion or at_rest. 128 extra_meta_arguments: extra meta arguments for dq functions. 129 130 Raises: 131 DQSpecMalformedException: If the dq spec is malformed. 132 """ 133 dq_functions = spec["dq_functions"] 134 if not extra_meta_arguments: 135 _LOGGER.info( 136 "No extra meta parameters defined. " 137 "Skipping validation of imported dq rule." 138 ) 139 return 140 141 for dq_function in dq_functions: 142 if not dq_function.get("args").get("meta", None): 143 raise DQSpecMalformedException( 144 "The dq function must have a meta field containing all " 145 f"the fields defined: {extra_meta_arguments}." 146 ) 147 else: 148 149 meta = dq_function["args"]["meta"] 150 given_keys = meta.keys() 151 missing_keys = sorted(set(extra_meta_arguments) - set(given_keys)) 152 if missing_keys: 153 raise DQSpecMalformedException( 154 "The dq function meta field must contain all the " 155 f"fields defined: {extra_meta_arguments}.\n" 156 f"Found fields: {list(given_keys)}.\n" 157 f"Diff: {list(missing_keys)}" 158 ) 159 if execution_point and meta["execution_point"] != execution_point: 160 raise DQSpecMalformedException( 161 "The dq function execution point must be the same as " 162 "the execution point of the dq spec." 163 ) 164 165 166class PrismaUtils: 167 """Prisma related utils.""" 168 169 @staticmethod 170 def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict: 171 """Fetch dq functions from given table. 172 173 Args: 174 spec: data quality specification. 175 execution_point: if the execution is in_motion or at_rest. 176 177 Returns: 178 The dictionary containing the dq spec with dq functions defined. 179 """ 180 if spec.get("dq_db_table"): 181 spec = DQUtils.import_dq_rules_from_table( 182 spec, 183 execution_point, 184 DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value, 185 ExecEnv.ENGINE_CONFIG.dq_functions_column_list, 186 ) 187 elif spec.get("dq_functions"): 188 DQUtils.validate_dq_functions( 189 spec, 190 execution_point, 191 ExecEnv.ENGINE_CONFIG.dq_functions_column_list, 192 ) 193 else: 194 raise DQSpecMalformedException( 195 "When using PRISMA either dq_db_table or " 196 "dq_functions needs to be defined." 197 ) 198 199 dq_bucket = ( 200 ExecEnv.ENGINE_CONFIG.dq_bucket 201 if ExecEnv.get_environment() == "prod" 202 else ExecEnv.ENGINE_CONFIG.dq_dev_bucket 203 ) 204 205 spec["critical_functions"] = [] 206 spec["execution_point"] = execution_point 207 spec["result_sink_db_table"] = None 208 spec["result_sink_explode"] = True 209 spec["fail_on_error"] = spec.get("fail_on_error", False) 210 spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1) 211 212 if not spec.get("result_sink_extra_columns", None): 213 spec["result_sink_extra_columns"] = [ 214 "validation_results.expectation_config.meta", 215 ] 216 else: 217 spec["result_sink_extra_columns"] = [ 218 "validation_results.expectation_config.meta", 219 ] + spec["result_sink_extra_columns"] 220 if not spec.get("data_product_name", None): 221 raise DQSpecMalformedException( 222 "When using PRISMA DQ data_product_name must be defined." 223 ) 224 spec["result_sink_location"] = ( 225 f"{dq_bucket}/{spec['data_product_name']}/result_sink/" 226 ) 227 if not spec.get("tbl_to_derive_pk", None) and not spec.get( 228 "unexpected_rows_pk", None 229 ): 230 raise DQSpecMalformedException( 231 "When using PRISMA DQ either " 232 "tbl_to_derive_pk or unexpected_rows_pk need to be defined." 233 ) 234 return spec
class
DQUtils:
16class DQUtils: 17 """Utils related to the data quality process.""" 18 19 @staticmethod 20 def import_dq_rules_from_table( 21 spec: dict, 22 execution_point: str, 23 base_expectation_arguments: list, 24 extra_meta_arguments: list, 25 ) -> dict: 26 """Import dq rules from a table. 27 28 Args: 29 spec: data quality specification. 30 execution_point: if the execution is in_motion or at_rest. 31 base_expectation_arguments: base arguments for dq functions. 32 extra_meta_arguments: extra meta arguments for dq functions. 33 34 Returns: 35 The dictionary containing the dq spec with dq functions defined. 36 """ 37 dq_db_table = spec["dq_db_table"] 38 dq_functions = [] 39 40 if spec.get("dq_table_table_filter"): 41 dq_table_table_filter = spec["dq_table_table_filter"] 42 else: 43 raise DQSpecMalformedException( 44 "When importing rules from a table " 45 "dq_table_table_filter must be defined." 46 ) 47 48 extra_filters_query = ( 49 f""" and {spec["dq_table_extra_filters"]}""" 50 if spec.get("dq_table_extra_filters") 51 else "" 52 ) 53 54 fields = base_expectation_arguments + extra_meta_arguments 55 56 dq_functions_query = f""" 57 SELECT {", ".join(fields)} 58 FROM {dq_db_table} 59 WHERE 60 execution_point='{execution_point}' and table = '{dq_table_table_filter}' 61 {extra_filters_query}""" # nosec: B608 62 63 raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query) 64 65 arguments = raw_dq_functions.select("arguments").collect() 66 parsed_arguments = [loads(argument.arguments) for argument in arguments] 67 combined_dict: dict = {} 68 69 for argument in parsed_arguments: 70 combined_dict = {**combined_dict, **argument} 71 72 dq_function_arguments_schema = schema_of_json(str(combined_dict)) 73 74 processed_dq_functions = ( 75 raw_dq_functions.withColumn( 76 "json_data", from_json(col("arguments"), dq_function_arguments_schema) 77 ) 78 .withColumn( 79 "parsed_arguments", 80 struct( 81 col("json_data.*"), 82 struct(extra_meta_arguments).alias("meta"), 83 ), 84 ) 85 .drop(col("json_data")) 86 ) 87 88 unique_dq_functions = processed_dq_functions.drop_duplicates( 89 ["dq_tech_function", "arguments"] 90 ) 91 92 duplicated_rows = processed_dq_functions.subtract(unique_dq_functions) 93 94 if duplicated_rows.count() > 0: 95 _LOGGER.warn("Found Duplicates Rows:") 96 duplicated_rows.show(truncate=False) 97 98 processed_dq_functions_list = unique_dq_functions.collect() 99 for processed_dq_function in processed_dq_functions_list: 100 dq_functions.append( 101 { 102 "function": f"{processed_dq_function.dq_tech_function}", 103 "args": { 104 k: v 105 for k, v in processed_dq_function.parsed_arguments.asDict( 106 recursive=True 107 ).items() 108 if v is not None 109 }, 110 } 111 ) 112 113 spec["dq_functions"] = dq_functions 114 115 return spec 116 117 @staticmethod 118 def validate_dq_functions( 119 spec: dict, execution_point: str = "", extra_meta_arguments: list = None 120 ) -> None: 121 """Function to validate the dq functions defined in the dq_spec. 122 123 This function validates that the defined dq_functions contain all 124 the fields defined in the extra_meta_arguments parameter. 125 126 Args: 127 spec: data quality specification. 128 execution_point: if the execution is in_motion or at_rest. 129 extra_meta_arguments: extra meta arguments for dq functions. 130 131 Raises: 132 DQSpecMalformedException: If the dq spec is malformed. 133 """ 134 dq_functions = spec["dq_functions"] 135 if not extra_meta_arguments: 136 _LOGGER.info( 137 "No extra meta parameters defined. " 138 "Skipping validation of imported dq rule." 139 ) 140 return 141 142 for dq_function in dq_functions: 143 if not dq_function.get("args").get("meta", None): 144 raise DQSpecMalformedException( 145 "The dq function must have a meta field containing all " 146 f"the fields defined: {extra_meta_arguments}." 147 ) 148 else: 149 150 meta = dq_function["args"]["meta"] 151 given_keys = meta.keys() 152 missing_keys = sorted(set(extra_meta_arguments) - set(given_keys)) 153 if missing_keys: 154 raise DQSpecMalformedException( 155 "The dq function meta field must contain all the " 156 f"fields defined: {extra_meta_arguments}.\n" 157 f"Found fields: {list(given_keys)}.\n" 158 f"Diff: {list(missing_keys)}" 159 ) 160 if execution_point and meta["execution_point"] != execution_point: 161 raise DQSpecMalformedException( 162 "The dq function execution point must be the same as " 163 "the execution point of the dq spec." 164 )
Utils related to the data quality process.
@staticmethod
def
import_dq_rules_from_table( spec: dict, execution_point: str, base_expectation_arguments: list, extra_meta_arguments: list) -> dict:
19 @staticmethod 20 def import_dq_rules_from_table( 21 spec: dict, 22 execution_point: str, 23 base_expectation_arguments: list, 24 extra_meta_arguments: list, 25 ) -> dict: 26 """Import dq rules from a table. 27 28 Args: 29 spec: data quality specification. 30 execution_point: if the execution is in_motion or at_rest. 31 base_expectation_arguments: base arguments for dq functions. 32 extra_meta_arguments: extra meta arguments for dq functions. 33 34 Returns: 35 The dictionary containing the dq spec with dq functions defined. 36 """ 37 dq_db_table = spec["dq_db_table"] 38 dq_functions = [] 39 40 if spec.get("dq_table_table_filter"): 41 dq_table_table_filter = spec["dq_table_table_filter"] 42 else: 43 raise DQSpecMalformedException( 44 "When importing rules from a table " 45 "dq_table_table_filter must be defined." 46 ) 47 48 extra_filters_query = ( 49 f""" and {spec["dq_table_extra_filters"]}""" 50 if spec.get("dq_table_extra_filters") 51 else "" 52 ) 53 54 fields = base_expectation_arguments + extra_meta_arguments 55 56 dq_functions_query = f""" 57 SELECT {", ".join(fields)} 58 FROM {dq_db_table} 59 WHERE 60 execution_point='{execution_point}' and table = '{dq_table_table_filter}' 61 {extra_filters_query}""" # nosec: B608 62 63 raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query) 64 65 arguments = raw_dq_functions.select("arguments").collect() 66 parsed_arguments = [loads(argument.arguments) for argument in arguments] 67 combined_dict: dict = {} 68 69 for argument in parsed_arguments: 70 combined_dict = {**combined_dict, **argument} 71 72 dq_function_arguments_schema = schema_of_json(str(combined_dict)) 73 74 processed_dq_functions = ( 75 raw_dq_functions.withColumn( 76 "json_data", from_json(col("arguments"), dq_function_arguments_schema) 77 ) 78 .withColumn( 79 "parsed_arguments", 80 struct( 81 col("json_data.*"), 82 struct(extra_meta_arguments).alias("meta"), 83 ), 84 ) 85 .drop(col("json_data")) 86 ) 87 88 unique_dq_functions = processed_dq_functions.drop_duplicates( 89 ["dq_tech_function", "arguments"] 90 ) 91 92 duplicated_rows = processed_dq_functions.subtract(unique_dq_functions) 93 94 if duplicated_rows.count() > 0: 95 _LOGGER.warn("Found Duplicates Rows:") 96 duplicated_rows.show(truncate=False) 97 98 processed_dq_functions_list = unique_dq_functions.collect() 99 for processed_dq_function in processed_dq_functions_list: 100 dq_functions.append( 101 { 102 "function": f"{processed_dq_function.dq_tech_function}", 103 "args": { 104 k: v 105 for k, v in processed_dq_function.parsed_arguments.asDict( 106 recursive=True 107 ).items() 108 if v is not None 109 }, 110 } 111 ) 112 113 spec["dq_functions"] = dq_functions 114 115 return spec
Import dq rules from a table.
Arguments:
- spec: data quality specification.
- execution_point: if the execution is in_motion or at_rest.
- base_expectation_arguments: base arguments for dq functions.
- extra_meta_arguments: extra meta arguments for dq functions.
Returns:
The dictionary containing the dq spec with dq functions defined.
@staticmethod
def
validate_dq_functions( spec: dict, execution_point: str = '', extra_meta_arguments: list = None) -> None:
117 @staticmethod 118 def validate_dq_functions( 119 spec: dict, execution_point: str = "", extra_meta_arguments: list = None 120 ) -> None: 121 """Function to validate the dq functions defined in the dq_spec. 122 123 This function validates that the defined dq_functions contain all 124 the fields defined in the extra_meta_arguments parameter. 125 126 Args: 127 spec: data quality specification. 128 execution_point: if the execution is in_motion or at_rest. 129 extra_meta_arguments: extra meta arguments for dq functions. 130 131 Raises: 132 DQSpecMalformedException: If the dq spec is malformed. 133 """ 134 dq_functions = spec["dq_functions"] 135 if not extra_meta_arguments: 136 _LOGGER.info( 137 "No extra meta parameters defined. " 138 "Skipping validation of imported dq rule." 139 ) 140 return 141 142 for dq_function in dq_functions: 143 if not dq_function.get("args").get("meta", None): 144 raise DQSpecMalformedException( 145 "The dq function must have a meta field containing all " 146 f"the fields defined: {extra_meta_arguments}." 147 ) 148 else: 149 150 meta = dq_function["args"]["meta"] 151 given_keys = meta.keys() 152 missing_keys = sorted(set(extra_meta_arguments) - set(given_keys)) 153 if missing_keys: 154 raise DQSpecMalformedException( 155 "The dq function meta field must contain all the " 156 f"fields defined: {extra_meta_arguments}.\n" 157 f"Found fields: {list(given_keys)}.\n" 158 f"Diff: {list(missing_keys)}" 159 ) 160 if execution_point and meta["execution_point"] != execution_point: 161 raise DQSpecMalformedException( 162 "The dq function execution point must be the same as " 163 "the execution point of the dq spec." 164 )
Function to validate the dq functions defined in the dq_spec.
This function validates that the defined dq_functions contain all the fields defined in the extra_meta_arguments parameter.
Arguments:
- spec: data quality specification.
- execution_point: if the execution is in_motion or at_rest.
- extra_meta_arguments: extra meta arguments for dq functions.
Raises:
- DQSpecMalformedException: If the dq spec is malformed.
class
PrismaUtils:
167class PrismaUtils: 168 """Prisma related utils.""" 169 170 @staticmethod 171 def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict: 172 """Fetch dq functions from given table. 173 174 Args: 175 spec: data quality specification. 176 execution_point: if the execution is in_motion or at_rest. 177 178 Returns: 179 The dictionary containing the dq spec with dq functions defined. 180 """ 181 if spec.get("dq_db_table"): 182 spec = DQUtils.import_dq_rules_from_table( 183 spec, 184 execution_point, 185 DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value, 186 ExecEnv.ENGINE_CONFIG.dq_functions_column_list, 187 ) 188 elif spec.get("dq_functions"): 189 DQUtils.validate_dq_functions( 190 spec, 191 execution_point, 192 ExecEnv.ENGINE_CONFIG.dq_functions_column_list, 193 ) 194 else: 195 raise DQSpecMalformedException( 196 "When using PRISMA either dq_db_table or " 197 "dq_functions needs to be defined." 198 ) 199 200 dq_bucket = ( 201 ExecEnv.ENGINE_CONFIG.dq_bucket 202 if ExecEnv.get_environment() == "prod" 203 else ExecEnv.ENGINE_CONFIG.dq_dev_bucket 204 ) 205 206 spec["critical_functions"] = [] 207 spec["execution_point"] = execution_point 208 spec["result_sink_db_table"] = None 209 spec["result_sink_explode"] = True 210 spec["fail_on_error"] = spec.get("fail_on_error", False) 211 spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1) 212 213 if not spec.get("result_sink_extra_columns", None): 214 spec["result_sink_extra_columns"] = [ 215 "validation_results.expectation_config.meta", 216 ] 217 else: 218 spec["result_sink_extra_columns"] = [ 219 "validation_results.expectation_config.meta", 220 ] + spec["result_sink_extra_columns"] 221 if not spec.get("data_product_name", None): 222 raise DQSpecMalformedException( 223 "When using PRISMA DQ data_product_name must be defined." 224 ) 225 spec["result_sink_location"] = ( 226 f"{dq_bucket}/{spec['data_product_name']}/result_sink/" 227 ) 228 if not spec.get("tbl_to_derive_pk", None) and not spec.get( 229 "unexpected_rows_pk", None 230 ): 231 raise DQSpecMalformedException( 232 "When using PRISMA DQ either " 233 "tbl_to_derive_pk or unexpected_rows_pk need to be defined." 234 ) 235 return spec
Prisma related utils.
@staticmethod
def
build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
170 @staticmethod 171 def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict: 172 """Fetch dq functions from given table. 173 174 Args: 175 spec: data quality specification. 176 execution_point: if the execution is in_motion or at_rest. 177 178 Returns: 179 The dictionary containing the dq spec with dq functions defined. 180 """ 181 if spec.get("dq_db_table"): 182 spec = DQUtils.import_dq_rules_from_table( 183 spec, 184 execution_point, 185 DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value, 186 ExecEnv.ENGINE_CONFIG.dq_functions_column_list, 187 ) 188 elif spec.get("dq_functions"): 189 DQUtils.validate_dq_functions( 190 spec, 191 execution_point, 192 ExecEnv.ENGINE_CONFIG.dq_functions_column_list, 193 ) 194 else: 195 raise DQSpecMalformedException( 196 "When using PRISMA either dq_db_table or " 197 "dq_functions needs to be defined." 198 ) 199 200 dq_bucket = ( 201 ExecEnv.ENGINE_CONFIG.dq_bucket 202 if ExecEnv.get_environment() == "prod" 203 else ExecEnv.ENGINE_CONFIG.dq_dev_bucket 204 ) 205 206 spec["critical_functions"] = [] 207 spec["execution_point"] = execution_point 208 spec["result_sink_db_table"] = None 209 spec["result_sink_explode"] = True 210 spec["fail_on_error"] = spec.get("fail_on_error", False) 211 spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1) 212 213 if not spec.get("result_sink_extra_columns", None): 214 spec["result_sink_extra_columns"] = [ 215 "validation_results.expectation_config.meta", 216 ] 217 else: 218 spec["result_sink_extra_columns"] = [ 219 "validation_results.expectation_config.meta", 220 ] + spec["result_sink_extra_columns"] 221 if not spec.get("data_product_name", None): 222 raise DQSpecMalformedException( 223 "When using PRISMA DQ data_product_name must be defined." 224 ) 225 spec["result_sink_location"] = ( 226 f"{dq_bucket}/{spec['data_product_name']}/result_sink/" 227 ) 228 if not spec.get("tbl_to_derive_pk", None) and not spec.get( 229 "unexpected_rows_pk", None 230 ): 231 raise DQSpecMalformedException( 232 "When using PRISMA DQ either " 233 "tbl_to_derive_pk or unexpected_rows_pk need to be defined." 234 ) 235 return spec
Fetch dq functions from given table.
Arguments:
- spec: data quality specification.
- execution_point: if the execution is in_motion or at_rest.
Returns:
The dictionary containing the dq spec with dq functions defined.