lakehouse_engine.utils.dq_utils

Module containing utils for DQ processing.

  1"""Module containing utils for DQ processing."""
  2
  3from json import loads
  4
  5from pyspark.sql.functions import col, from_json, schema_of_json, struct
  6
  7from lakehouse_engine.core.definitions import DQTableBaseParameters
  8from lakehouse_engine.core.exec_env import ExecEnv
  9from lakehouse_engine.dq_processors.exceptions import DQSpecMalformedException
 10from lakehouse_engine.utils.logging_handler import LoggingHandler
 11
 12_LOGGER = LoggingHandler(__name__).get_logger()
 13
 14
 15class DQUtils:
 16    """Utils related to the data quality process."""
 17
 18    @staticmethod
 19    def import_dq_rules_from_table(
 20        spec: dict,
 21        execution_point: str,
 22        base_expectation_arguments: list,
 23        extra_meta_arguments: list,
 24    ) -> dict:
 25        """Import dq rules from a table.
 26
 27        Args:
 28            spec: data quality specification.
 29            execution_point: if the execution is in_motion or at_rest.
 30            base_expectation_arguments: base arguments for dq functions.
 31            extra_meta_arguments: extra meta arguments for dq functions.
 32
 33        Returns:
 34            The dictionary containing the dq spec with dq functions defined.
 35        """
 36        dq_db_table = spec["dq_db_table"]
 37        dq_functions = []
 38
 39        if spec.get("dq_table_table_filter"):
 40            dq_table_table_filter = spec["dq_table_table_filter"]
 41        else:
 42            raise DQSpecMalformedException(
 43                "When importing rules from a table "
 44                "dq_table_table_filter must be defined."
 45            )
 46
 47        extra_filters_query = (
 48            f""" and {spec["dq_table_extra_filters"]}"""
 49            if spec.get("dq_table_extra_filters")
 50            else ""
 51        )
 52
 53        fields = base_expectation_arguments + extra_meta_arguments
 54
 55        dq_functions_query = f"""
 56            SELECT {", ".join(fields)}
 57            FROM {dq_db_table}
 58            WHERE
 59            execution_point='{execution_point}' and table = '{dq_table_table_filter}'
 60            {extra_filters_query}"""  # nosec: B608
 61
 62        raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query)
 63
 64        arguments = raw_dq_functions.select("arguments").collect()
 65        parsed_arguments = [loads(argument.arguments) for argument in arguments]
 66        combined_dict: dict = {}
 67
 68        for argument in parsed_arguments:
 69            combined_dict = {**combined_dict, **argument}
 70
 71        dq_function_arguments_schema = schema_of_json(str(combined_dict))
 72
 73        processed_dq_functions = (
 74            raw_dq_functions.withColumn(
 75                "json_data", from_json(col("arguments"), dq_function_arguments_schema)
 76            )
 77            .withColumn(
 78                "parsed_arguments",
 79                struct(
 80                    col("json_data.*"),
 81                    struct(extra_meta_arguments).alias("meta"),
 82                ),
 83            )
 84            .drop(col("json_data"))
 85        )
 86
 87        unique_dq_functions = processed_dq_functions.drop_duplicates(
 88            ["dq_tech_function", "arguments"]
 89        )
 90
 91        duplicated_rows = processed_dq_functions.subtract(unique_dq_functions)
 92
 93        if duplicated_rows.count() > 0:
 94            _LOGGER.warn("Found Duplicates Rows:")
 95            duplicated_rows.show(truncate=False)
 96
 97        processed_dq_functions_list = unique_dq_functions.collect()
 98        for processed_dq_function in processed_dq_functions_list:
 99            dq_functions.append(
100                {
101                    "function": f"{processed_dq_function.dq_tech_function}",
102                    "args": {
103                        k: v
104                        for k, v in processed_dq_function.parsed_arguments.asDict(
105                            recursive=True
106                        ).items()
107                        if v is not None
108                    },
109                }
110            )
111
112        spec["dq_functions"] = dq_functions
113
114        return spec
115
116    @staticmethod
117    def validate_dq_functions(
118        spec: dict, execution_point: str = "", extra_meta_arguments: list = None
119    ) -> None:
120        """Function to validate the dq functions defined in the dq_spec.
121
122        This function validates that the defined dq_functions contain all
123        the fields defined in the extra_meta_arguments parameter.
124
125        Args:
126            spec: data quality specification.
127            execution_point: if the execution is in_motion or at_rest.
128            extra_meta_arguments: extra meta arguments for dq functions.
129
130        Raises:
131            DQSpecMalformedException: If the dq spec is malformed.
132        """
133        dq_functions = spec["dq_functions"]
134        if not extra_meta_arguments:
135            _LOGGER.info(
136                "No extra meta parameters defined. "
137                "Skipping validation of imported dq rule."
138            )
139            return
140
141        for dq_function in dq_functions:
142            if not dq_function.get("args").get("meta", None):
143                raise DQSpecMalformedException(
144                    "The dq function must have a meta field containing all "
145                    f"the fields defined: {extra_meta_arguments}."
146                )
147            else:
148
149                meta = dq_function["args"]["meta"]
150                given_keys = meta.keys()
151                missing_keys = sorted(set(extra_meta_arguments) - set(given_keys))
152                if missing_keys:
153                    raise DQSpecMalformedException(
154                        "The dq function meta field must contain all the "
155                        f"fields defined: {extra_meta_arguments}.\n"
156                        f"Found fields: {list(given_keys)}.\n"
157                        f"Diff: {list(missing_keys)}"
158                    )
159                if execution_point and meta["execution_point"] != execution_point:
160                    raise DQSpecMalformedException(
161                        "The dq function execution point must be the same as "
162                        "the execution point of the dq spec."
163                    )
164
165
166class PrismaUtils:
167    """Prisma related utils."""
168
169    @staticmethod
170    def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
171        """Fetch dq functions from given table.
172
173        Args:
174            spec: data quality specification.
175            execution_point: if the execution is in_motion or at_rest.
176
177        Returns:
178            The dictionary containing the dq spec with dq functions defined.
179        """
180        if spec.get("dq_db_table"):
181            spec = DQUtils.import_dq_rules_from_table(
182                spec,
183                execution_point,
184                DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value,
185                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
186            )
187        elif spec.get("dq_functions"):
188            DQUtils.validate_dq_functions(
189                spec,
190                execution_point,
191                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
192            )
193        else:
194            raise DQSpecMalformedException(
195                "When using PRISMA either dq_db_table or "
196                "dq_functions needs to be defined."
197            )
198
199        dq_bucket = (
200            ExecEnv.ENGINE_CONFIG.dq_bucket
201            if ExecEnv.get_environment() == "prod"
202            else ExecEnv.ENGINE_CONFIG.dq_dev_bucket
203        )
204
205        spec["critical_functions"] = []
206        spec["execution_point"] = execution_point
207        spec["result_sink_db_table"] = None
208        spec["result_sink_explode"] = True
209        spec["fail_on_error"] = spec.get("fail_on_error", False)
210        spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1)
211
212        if not spec.get("result_sink_extra_columns", None):
213            spec["result_sink_extra_columns"] = [
214                "validation_results.expectation_config.meta",
215            ]
216        else:
217            spec["result_sink_extra_columns"] = [
218                "validation_results.expectation_config.meta",
219            ] + spec["result_sink_extra_columns"]
220        if not spec.get("data_product_name", None):
221            raise DQSpecMalformedException(
222                "When using PRISMA DQ data_product_name must be defined."
223            )
224        spec["result_sink_location"] = (
225            f"{dq_bucket}/{spec['data_product_name']}/result_sink/"
226        )
227        if not spec.get("tbl_to_derive_pk", None) and not spec.get(
228            "unexpected_rows_pk", None
229        ):
230            raise DQSpecMalformedException(
231                "When using PRISMA DQ either "
232                "tbl_to_derive_pk or unexpected_rows_pk need to be defined."
233            )
234        return spec
class DQUtils:
 16class DQUtils:
 17    """Utils related to the data quality process."""
 18
 19    @staticmethod
 20    def import_dq_rules_from_table(
 21        spec: dict,
 22        execution_point: str,
 23        base_expectation_arguments: list,
 24        extra_meta_arguments: list,
 25    ) -> dict:
 26        """Import dq rules from a table.
 27
 28        Args:
 29            spec: data quality specification.
 30            execution_point: if the execution is in_motion or at_rest.
 31            base_expectation_arguments: base arguments for dq functions.
 32            extra_meta_arguments: extra meta arguments for dq functions.
 33
 34        Returns:
 35            The dictionary containing the dq spec with dq functions defined.
 36        """
 37        dq_db_table = spec["dq_db_table"]
 38        dq_functions = []
 39
 40        if spec.get("dq_table_table_filter"):
 41            dq_table_table_filter = spec["dq_table_table_filter"]
 42        else:
 43            raise DQSpecMalformedException(
 44                "When importing rules from a table "
 45                "dq_table_table_filter must be defined."
 46            )
 47
 48        extra_filters_query = (
 49            f""" and {spec["dq_table_extra_filters"]}"""
 50            if spec.get("dq_table_extra_filters")
 51            else ""
 52        )
 53
 54        fields = base_expectation_arguments + extra_meta_arguments
 55
 56        dq_functions_query = f"""
 57            SELECT {", ".join(fields)}
 58            FROM {dq_db_table}
 59            WHERE
 60            execution_point='{execution_point}' and table = '{dq_table_table_filter}'
 61            {extra_filters_query}"""  # nosec: B608
 62
 63        raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query)
 64
 65        arguments = raw_dq_functions.select("arguments").collect()
 66        parsed_arguments = [loads(argument.arguments) for argument in arguments]
 67        combined_dict: dict = {}
 68
 69        for argument in parsed_arguments:
 70            combined_dict = {**combined_dict, **argument}
 71
 72        dq_function_arguments_schema = schema_of_json(str(combined_dict))
 73
 74        processed_dq_functions = (
 75            raw_dq_functions.withColumn(
 76                "json_data", from_json(col("arguments"), dq_function_arguments_schema)
 77            )
 78            .withColumn(
 79                "parsed_arguments",
 80                struct(
 81                    col("json_data.*"),
 82                    struct(extra_meta_arguments).alias("meta"),
 83                ),
 84            )
 85            .drop(col("json_data"))
 86        )
 87
 88        unique_dq_functions = processed_dq_functions.drop_duplicates(
 89            ["dq_tech_function", "arguments"]
 90        )
 91
 92        duplicated_rows = processed_dq_functions.subtract(unique_dq_functions)
 93
 94        if duplicated_rows.count() > 0:
 95            _LOGGER.warn("Found Duplicates Rows:")
 96            duplicated_rows.show(truncate=False)
 97
 98        processed_dq_functions_list = unique_dq_functions.collect()
 99        for processed_dq_function in processed_dq_functions_list:
100            dq_functions.append(
101                {
102                    "function": f"{processed_dq_function.dq_tech_function}",
103                    "args": {
104                        k: v
105                        for k, v in processed_dq_function.parsed_arguments.asDict(
106                            recursive=True
107                        ).items()
108                        if v is not None
109                    },
110                }
111            )
112
113        spec["dq_functions"] = dq_functions
114
115        return spec
116
117    @staticmethod
118    def validate_dq_functions(
119        spec: dict, execution_point: str = "", extra_meta_arguments: list = None
120    ) -> None:
121        """Function to validate the dq functions defined in the dq_spec.
122
123        This function validates that the defined dq_functions contain all
124        the fields defined in the extra_meta_arguments parameter.
125
126        Args:
127            spec: data quality specification.
128            execution_point: if the execution is in_motion or at_rest.
129            extra_meta_arguments: extra meta arguments for dq functions.
130
131        Raises:
132            DQSpecMalformedException: If the dq spec is malformed.
133        """
134        dq_functions = spec["dq_functions"]
135        if not extra_meta_arguments:
136            _LOGGER.info(
137                "No extra meta parameters defined. "
138                "Skipping validation of imported dq rule."
139            )
140            return
141
142        for dq_function in dq_functions:
143            if not dq_function.get("args").get("meta", None):
144                raise DQSpecMalformedException(
145                    "The dq function must have a meta field containing all "
146                    f"the fields defined: {extra_meta_arguments}."
147                )
148            else:
149
150                meta = dq_function["args"]["meta"]
151                given_keys = meta.keys()
152                missing_keys = sorted(set(extra_meta_arguments) - set(given_keys))
153                if missing_keys:
154                    raise DQSpecMalformedException(
155                        "The dq function meta field must contain all the "
156                        f"fields defined: {extra_meta_arguments}.\n"
157                        f"Found fields: {list(given_keys)}.\n"
158                        f"Diff: {list(missing_keys)}"
159                    )
160                if execution_point and meta["execution_point"] != execution_point:
161                    raise DQSpecMalformedException(
162                        "The dq function execution point must be the same as "
163                        "the execution point of the dq spec."
164                    )

Utils related to the data quality process.

@staticmethod
def import_dq_rules_from_table( spec: dict, execution_point: str, base_expectation_arguments: list, extra_meta_arguments: list) -> dict:
 19    @staticmethod
 20    def import_dq_rules_from_table(
 21        spec: dict,
 22        execution_point: str,
 23        base_expectation_arguments: list,
 24        extra_meta_arguments: list,
 25    ) -> dict:
 26        """Import dq rules from a table.
 27
 28        Args:
 29            spec: data quality specification.
 30            execution_point: if the execution is in_motion or at_rest.
 31            base_expectation_arguments: base arguments for dq functions.
 32            extra_meta_arguments: extra meta arguments for dq functions.
 33
 34        Returns:
 35            The dictionary containing the dq spec with dq functions defined.
 36        """
 37        dq_db_table = spec["dq_db_table"]
 38        dq_functions = []
 39
 40        if spec.get("dq_table_table_filter"):
 41            dq_table_table_filter = spec["dq_table_table_filter"]
 42        else:
 43            raise DQSpecMalformedException(
 44                "When importing rules from a table "
 45                "dq_table_table_filter must be defined."
 46            )
 47
 48        extra_filters_query = (
 49            f""" and {spec["dq_table_extra_filters"]}"""
 50            if spec.get("dq_table_extra_filters")
 51            else ""
 52        )
 53
 54        fields = base_expectation_arguments + extra_meta_arguments
 55
 56        dq_functions_query = f"""
 57            SELECT {", ".join(fields)}
 58            FROM {dq_db_table}
 59            WHERE
 60            execution_point='{execution_point}' and table = '{dq_table_table_filter}'
 61            {extra_filters_query}"""  # nosec: B608
 62
 63        raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query)
 64
 65        arguments = raw_dq_functions.select("arguments").collect()
 66        parsed_arguments = [loads(argument.arguments) for argument in arguments]
 67        combined_dict: dict = {}
 68
 69        for argument in parsed_arguments:
 70            combined_dict = {**combined_dict, **argument}
 71
 72        dq_function_arguments_schema = schema_of_json(str(combined_dict))
 73
 74        processed_dq_functions = (
 75            raw_dq_functions.withColumn(
 76                "json_data", from_json(col("arguments"), dq_function_arguments_schema)
 77            )
 78            .withColumn(
 79                "parsed_arguments",
 80                struct(
 81                    col("json_data.*"),
 82                    struct(extra_meta_arguments).alias("meta"),
 83                ),
 84            )
 85            .drop(col("json_data"))
 86        )
 87
 88        unique_dq_functions = processed_dq_functions.drop_duplicates(
 89            ["dq_tech_function", "arguments"]
 90        )
 91
 92        duplicated_rows = processed_dq_functions.subtract(unique_dq_functions)
 93
 94        if duplicated_rows.count() > 0:
 95            _LOGGER.warn("Found Duplicates Rows:")
 96            duplicated_rows.show(truncate=False)
 97
 98        processed_dq_functions_list = unique_dq_functions.collect()
 99        for processed_dq_function in processed_dq_functions_list:
100            dq_functions.append(
101                {
102                    "function": f"{processed_dq_function.dq_tech_function}",
103                    "args": {
104                        k: v
105                        for k, v in processed_dq_function.parsed_arguments.asDict(
106                            recursive=True
107                        ).items()
108                        if v is not None
109                    },
110                }
111            )
112
113        spec["dq_functions"] = dq_functions
114
115        return spec

Import dq rules from a table.

Arguments:
  • spec: data quality specification.
  • execution_point: if the execution is in_motion or at_rest.
  • base_expectation_arguments: base arguments for dq functions.
  • extra_meta_arguments: extra meta arguments for dq functions.
Returns:

The dictionary containing the dq spec with dq functions defined.

@staticmethod
def validate_dq_functions( spec: dict, execution_point: str = '', extra_meta_arguments: list = None) -> None:
117    @staticmethod
118    def validate_dq_functions(
119        spec: dict, execution_point: str = "", extra_meta_arguments: list = None
120    ) -> None:
121        """Function to validate the dq functions defined in the dq_spec.
122
123        This function validates that the defined dq_functions contain all
124        the fields defined in the extra_meta_arguments parameter.
125
126        Args:
127            spec: data quality specification.
128            execution_point: if the execution is in_motion or at_rest.
129            extra_meta_arguments: extra meta arguments for dq functions.
130
131        Raises:
132            DQSpecMalformedException: If the dq spec is malformed.
133        """
134        dq_functions = spec["dq_functions"]
135        if not extra_meta_arguments:
136            _LOGGER.info(
137                "No extra meta parameters defined. "
138                "Skipping validation of imported dq rule."
139            )
140            return
141
142        for dq_function in dq_functions:
143            if not dq_function.get("args").get("meta", None):
144                raise DQSpecMalformedException(
145                    "The dq function must have a meta field containing all "
146                    f"the fields defined: {extra_meta_arguments}."
147                )
148            else:
149
150                meta = dq_function["args"]["meta"]
151                given_keys = meta.keys()
152                missing_keys = sorted(set(extra_meta_arguments) - set(given_keys))
153                if missing_keys:
154                    raise DQSpecMalformedException(
155                        "The dq function meta field must contain all the "
156                        f"fields defined: {extra_meta_arguments}.\n"
157                        f"Found fields: {list(given_keys)}.\n"
158                        f"Diff: {list(missing_keys)}"
159                    )
160                if execution_point and meta["execution_point"] != execution_point:
161                    raise DQSpecMalformedException(
162                        "The dq function execution point must be the same as "
163                        "the execution point of the dq spec."
164                    )

Function to validate the dq functions defined in the dq_spec.

This function validates that the defined dq_functions contain all the fields defined in the extra_meta_arguments parameter.

Arguments:
  • spec: data quality specification.
  • execution_point: if the execution is in_motion or at_rest.
  • extra_meta_arguments: extra meta arguments for dq functions.
Raises:
  • DQSpecMalformedException: If the dq spec is malformed.
class PrismaUtils:
167class PrismaUtils:
168    """Prisma related utils."""
169
170    @staticmethod
171    def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
172        """Fetch dq functions from given table.
173
174        Args:
175            spec: data quality specification.
176            execution_point: if the execution is in_motion or at_rest.
177
178        Returns:
179            The dictionary containing the dq spec with dq functions defined.
180        """
181        if spec.get("dq_db_table"):
182            spec = DQUtils.import_dq_rules_from_table(
183                spec,
184                execution_point,
185                DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value,
186                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
187            )
188        elif spec.get("dq_functions"):
189            DQUtils.validate_dq_functions(
190                spec,
191                execution_point,
192                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
193            )
194        else:
195            raise DQSpecMalformedException(
196                "When using PRISMA either dq_db_table or "
197                "dq_functions needs to be defined."
198            )
199
200        dq_bucket = (
201            ExecEnv.ENGINE_CONFIG.dq_bucket
202            if ExecEnv.get_environment() == "prod"
203            else ExecEnv.ENGINE_CONFIG.dq_dev_bucket
204        )
205
206        spec["critical_functions"] = []
207        spec["execution_point"] = execution_point
208        spec["result_sink_db_table"] = None
209        spec["result_sink_explode"] = True
210        spec["fail_on_error"] = spec.get("fail_on_error", False)
211        spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1)
212
213        if not spec.get("result_sink_extra_columns", None):
214            spec["result_sink_extra_columns"] = [
215                "validation_results.expectation_config.meta",
216            ]
217        else:
218            spec["result_sink_extra_columns"] = [
219                "validation_results.expectation_config.meta",
220            ] + spec["result_sink_extra_columns"]
221        if not spec.get("data_product_name", None):
222            raise DQSpecMalformedException(
223                "When using PRISMA DQ data_product_name must be defined."
224            )
225        spec["result_sink_location"] = (
226            f"{dq_bucket}/{spec['data_product_name']}/result_sink/"
227        )
228        if not spec.get("tbl_to_derive_pk", None) and not spec.get(
229            "unexpected_rows_pk", None
230        ):
231            raise DQSpecMalformedException(
232                "When using PRISMA DQ either "
233                "tbl_to_derive_pk or unexpected_rows_pk need to be defined."
234            )
235        return spec

Prisma related utils.

@staticmethod
def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
170    @staticmethod
171    def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
172        """Fetch dq functions from given table.
173
174        Args:
175            spec: data quality specification.
176            execution_point: if the execution is in_motion or at_rest.
177
178        Returns:
179            The dictionary containing the dq spec with dq functions defined.
180        """
181        if spec.get("dq_db_table"):
182            spec = DQUtils.import_dq_rules_from_table(
183                spec,
184                execution_point,
185                DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value,
186                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
187            )
188        elif spec.get("dq_functions"):
189            DQUtils.validate_dq_functions(
190                spec,
191                execution_point,
192                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
193            )
194        else:
195            raise DQSpecMalformedException(
196                "When using PRISMA either dq_db_table or "
197                "dq_functions needs to be defined."
198            )
199
200        dq_bucket = (
201            ExecEnv.ENGINE_CONFIG.dq_bucket
202            if ExecEnv.get_environment() == "prod"
203            else ExecEnv.ENGINE_CONFIG.dq_dev_bucket
204        )
205
206        spec["critical_functions"] = []
207        spec["execution_point"] = execution_point
208        spec["result_sink_db_table"] = None
209        spec["result_sink_explode"] = True
210        spec["fail_on_error"] = spec.get("fail_on_error", False)
211        spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1)
212
213        if not spec.get("result_sink_extra_columns", None):
214            spec["result_sink_extra_columns"] = [
215                "validation_results.expectation_config.meta",
216            ]
217        else:
218            spec["result_sink_extra_columns"] = [
219                "validation_results.expectation_config.meta",
220            ] + spec["result_sink_extra_columns"]
221        if not spec.get("data_product_name", None):
222            raise DQSpecMalformedException(
223                "When using PRISMA DQ data_product_name must be defined."
224            )
225        spec["result_sink_location"] = (
226            f"{dq_bucket}/{spec['data_product_name']}/result_sink/"
227        )
228        if not spec.get("tbl_to_derive_pk", None) and not spec.get(
229            "unexpected_rows_pk", None
230        ):
231            raise DQSpecMalformedException(
232                "When using PRISMA DQ either "
233                "tbl_to_derive_pk or unexpected_rows_pk need to be defined."
234            )
235        return spec

Fetch dq functions from given table.

Arguments:
  • spec: data quality specification.
  • execution_point: if the execution is in_motion or at_rest.
Returns:

The dictionary containing the dq spec with dq functions defined.