Custom Expectations
Defining Custom Expectations
Custom expectations are defined in python and need to follow a structure to correctly integrate with Great Expectations.
Follow the documentation of GX on Creating Custom Expectations and find information about the existing types of expectations.
Here is an example of custom expectation.
As for other cases, the acon configuration should be executed with load_data
using:
from lakehouse_engine.engine import load_data
acon = {...}
load_data(acon=acon)
Example of ACON configuration:
"""Expectation to check if column 'a' is lower or equal than column 'b'."""
from typing import Any, Dict, Optional
from great_expectations.core import ExpectationConfiguration
from great_expectations.execution_engine import ExecutionEngine, SparkDFExecutionEngine
from great_expectations.expectations.expectation import ColumnPairMapExpectation
from great_expectations.expectations.metrics.map_metric_provider import (
ColumnPairMapMetricProvider,
column_pair_condition_partial,
)
from lakehouse_engine.utils.expectations_utils import validate_result
class ColumnPairCustom(ColumnPairMapMetricProvider):
"""Asserts that column 'A' is lower or equal than column 'B'.
Additionally, the 'margin' parameter can be used to add a margin to the
check between column 'A' and 'B': 'A' <= 'B' + 'margin'.
"""
condition_metric_name = "column_pair_values.a_smaller_or_equal_than_b"
condition_domain_keys = (
"batch_id",
"table",
"column_A",
"column_B",
"ignore_row_if",
)
condition_value_keys = ("margin",)
@column_pair_condition_partial(engine=SparkDFExecutionEngine)
def _spark(
self: ColumnPairMapMetricProvider,
column_A: Any,
column_B: Any,
margin: Any,
**kwargs: dict,
) -> Any:
"""Implementation of the expectation's logic.
Args:
column_A: Value of the row of column_A.
column_B: Value of the row of column_B.
margin: margin value to be added to column_b.
kwargs: dict with additional parameters.
Returns:
If the condition is met.
"""
if margin is None:
approx = 0
elif not isinstance(margin, (int, float, complex)):
raise TypeError(
f"margin must be one of int, float, complex."
f" Found: {margin} as {type(margin)}"
)
else:
approx = margin # type: ignore
return column_A <= column_B + approx # type: ignore
class ExpectColumnPairAToBeSmallerOrEqualThanB(ColumnPairMapExpectation):
"""Expect values in column A to be lower or equal than column B.
Args:
column_A: The first column name.
column_B: The second column name.
margin: additional approximation to column B value.
Keyword Args:
- allow_cross_type_comparisons: If True, allow
comparisons between types (e.g. integer and string).
Otherwise, attempting such comparisons will raise an exception.
- ignore_row_if: "both_values_are_missing",
"either_value_is_missing", "neither" (default).
- result_format: Which output mode to use:
`BOOLEAN_ONLY`, `BASIC` (default), `COMPLETE`, or `SUMMARY`.
- include_config: If True (default), then include the expectation config
as part of the result object.
- catch_exceptions: If True, then catch exceptions and
include them as part of the result object. Default: False.
- meta: A JSON-serializable dictionary (nesting allowed)
that will be included in the output without modification.
Returns:
An ExpectationSuiteValidationResult.
"""
examples = [
{
"dataset_name": "Test Dataset",
"data": [
{
"data": {
"a": [11, 22, 50],
"b": [10, 21, 100],
"c": [9, 21, 30],
},
"schemas": {
"spark": {
"a": "IntegerType",
"b": "IntegerType",
"c": "IntegerType",
}
},
}
],
"tests": [
{
"title": "negative_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column_A": "a",
"column_B": "c",
"result_format": {
"result_format": "COMPLETE",
"unexpected_index_column_names": ["c"],
},
},
"out": {
"success": False,
"unexpected_index_list": [
{"c": 9, "a": 11},
{"c": 21, "a": 22},
{"c": 30, "a": 50},
],
},
},
{
"title": "positive_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column_A": "a",
"column_B": "b",
"margin": 1,
"result_format": {
"result_format": "COMPLETE",
"unexpected_index_column_names": ["a"],
},
},
"out": {
"success": True,
"unexpected_index_list": [],
},
},
],
},
]
map_metric = "column_pair_values.a_smaller_or_equal_than_b"
success_keys = (
"column_A",
"column_B",
"ignore_row_if",
"margin",
"mostly",
)
default_kwarg_values = {
"mostly": 1.0,
"ignore_row_if": "neither",
"result_format": "BASIC",
"include_config": True,
"catch_exceptions": False,
}
def _validate(
self,
configuration: ExpectationConfiguration,
metrics: Dict,
runtime_configuration: Optional[dict] = None,
execution_engine: Optional[ExecutionEngine] = None,
) -> Any:
"""Custom implementation of the GE _validate method.
This method is used on the tests to validate both the result
of the tests themselves and if the unexpected index list
is correctly generated.
The GE test logic does not do this validation, and thus
we need to make it manually.
Args:
configuration: Configuration used in the test.
metrics: Test result metrics.
runtime_configuration: Configuration used when running the expectation.
execution_engine: Execution Engine where the expectation was run.
Returns:
Dictionary with the result of the validation.
"""
return validate_result(
self,
configuration,
metrics,
runtime_configuration,
execution_engine,
ColumnPairMapExpectation,
)
"""Mandatory block of code. If it is removed the expectation will not be available."""
if __name__ == "__main__":
# test the custom expectation with the function `print_diagnostic_checklist()`
ExpectColumnPairAToBeSmallerOrEqualThanB().print_diagnostic_checklist()
Naming Conventions
Your expectation's name should start with expect.
The name of the file must be the name of the expectation written in snake case. Ex: expect_column_length_match_input_length
The name of the class must be the name of the expectation written in camel case. Ex: ExpectColumnLengthMatchInputLength
File Structure
The file contains two main sections:
- the definition of the metric that we are tracking (where we define the logic of the expectation);
- the definition of the expectation
Metric Definition
In this section we define the logic of the expectation. This needs to follow a certain structure:
Code Structure
1) The class you define needs to extend one of the Metric Providers defined by Great Expectations that corresponds to your expectation's type. More info on the metric providers.
2) You need to define the name of your metric. This name must be unique and must follow the following structure:
type of expectation.name of metric. Ex.: column_pair_values.a_smaller_or_equal_than_b
Types of expectations: column_values
, multicolumn_values
, column_pair_values
, table_rows
, table_columns
.
3) Any GX default parameters that are necessary to calculate your metric must be defined as "condition_domain_keys".
4) Any additional parameters that are necessary to calculate your metric must be defined as "condition_value_keys".
5) The logic of your expectation must be defined for the SparkDFExecutionEngine in order to be run on the Lakehouse.
1) class ColumnMapMetric(ColumnMapMetricProvider):
"""Asserts that a column matches a pattern."""
2) condition_metric_name = "column_pair_values.a_smaller_or_equal_than_b"
3) condition_domain_keys = (
"batch_id",
"table",
"column_A",
"column_B",
"ignore_row_if",
)
4) condition_value_keys = ("margin",)
5) @column_pair_condition_partial(engine=SparkDFExecutionEngine)
def _spark(
self: ColumnPairMapMetricProvider,
column_A: Any,
column_B: Any,
margin: Any,
**kwargs: dict,
) -> Any:
"""Implementation of the expectation's logic.
Args:
column_A: Value of the row of column_A.
column_B: Value of the row of column_B.
margin: margin value to be added to column_b.
kwargs: dict with additional parameters.
Returns:
If the condition is met.
"""
if margin is None:
approx = 0
elif not isinstance(margin, (int, float, complex)):
raise TypeError(
f"margin must be one of int, float, complex."
f" Found: {margin} as {type(margin)}"
)
else:
approx = margin # type: ignore
return column_A <= column_B + approx # type: ignore
Expectation Definition
In this section we define the expectation. This needs to follow a certain structure:
Code Structure
1) The class you define needs to extend one of the Expectations defined by Great Expectations that corresponds to your expectation's type.
2) You must define an "examples" object where you define at least one success and one failure of your expectation to demonstrate its logic. The result format must be set to complete, and you must set the unexpected_index_name variable.
For any examples where you will have unexpected results you must define unexpected_index_list in your "out" element. This will be validated during the testing phase.
3) The metric must be the same you defined in the metric definition.
4) You must define all additional parameters that the user has to/should provide to the expectation.
5) You should define any default values for your expectations parameters.
6) You must define the _validate
method like shown in the example. You must call the validate_result
function
inside your validate method, this process adds a validation to the unexpected index list in the examples.
If your custom expectation requires any extra validations, or you require additional fields to be returned on
the final dataframe, you can add them in this function.
The validate_result method has two optional parameters (partial_success
and `partial_result) that can be used to
pass the result of additional validations and add more information to the result key of the returned dict respectively.
1) class ExpectColumnPairAToBeSmallerOrEqualThanB(ColumnPairMapExpectation):
"""Expect values in column A to be lower or equal than column B.
Args:
column_A: The first column name.
column_B: The second column name.
margin: additional approximation to column B value.
Keyword Args:
allow_cross_type_comparisons: If True, allow
comparisons between types (e.g. integer and string).
Otherwise, attempting such comparisons will raise an exception.
ignore_row_if: "both_values_are_missing",
"either_value_is_missing", "neither" (default).
result_format: Which output mode to use:
`BOOLEAN_ONLY`, `BASIC` (default), `COMPLETE`, or `SUMMARY`.
include_config: If True (default), then include the expectation config
as part of the result object.
catch_exceptions: If True, then catch exceptions and
include them as part of the result object. Default: False.
meta: A JSON-serializable dictionary (nesting allowed)
that will be included in the output without modification.
Returns:
An ExpectationSuiteValidationResult.
"""
2) examples = [
{
"dataset_name": "Test Dataset",
"data": {
"a": [11, 22, 50],
"b": [10, 21, 100],
"c": [9, 21, 30],
},
"schemas": {
"spark": {"a": "IntegerType", "b": "IntegerType", "c": "IntegerType"}
},
"tests": [
{
"title": "negative_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column_A": "a",
"column_B": "c",
"result_format": {
"result_format": "COMPLETE",
"unexpected_index_column_names": ["c"],
"include_unexpected_rows": True,
},
},
"out": {
"success": False,
"unexpected_index_list": [
{"c": 9, "a": 11},
{"c": 21, "a": 22},
{"c": 30, "a": 50},
],
},
},
{
"title": "positive_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column_A": "a",
"column_B": "b",
"margin": 1,
"result_format": {
"result_format": "COMPLETE",
"unexpected_index_column_names": ["a"],
},
},
"out": {"success": True},
},
],
},
]
3) map_metric = "column_values.pattern_match"
4) success_keys = (
"validation_regex",
"mostly",
)
5) default_kwarg_values = {
"ignore_row_if": "never",
"result_format": "BASIC",
"include_config": True,
"catch_exceptions": False,
"mostly": 1,
}
6) def _validate(
self,
configuration: ExpectationConfiguration,
metrics: Dict,
runtime_configuration: Optional[dict] = None,
execution_engine: Optional[ExecutionEngine] = None,
) -> dict:
"""Custom implementation of the GX _validate method.
This method is used on the tests to validate both the result
of the tests themselves and if the unexpected index list
is correctly generated.
The GX test logic does not do this validation, and thus
we need to make it manually.
Args:
configuration: Configuration used in the test.
metrics: Test result metrics.
runtime_configuration: Configuration used when running the expectation.
execution_engine: Execution Engine where the expectation was run.
Returns:
Dictionary with the result of the validation.
"""
return validate_result(self, configuration, metrics)
Printing the Expectation Diagnostics
Your expectations must include the ability to call the Great Expectations diagnostic function in order to be validated.
In order to do this code must be present.
"""Mandatory block of code. If it is removed the expectation will not be available."""
if __name__ == "__main__":
# test the custom expectation with the function `print_diagnostic_checklist()`
ExpectColumnPairAToBeSmallerOrEqualThanB().print_diagnostic_checklist()
Creation Process
1) Create a branch from lakehouse engine.
2) Create a custom expectation with your specific logic:
- All new expectations must be placed inside folder
/lakehouse_engine/dq_processors/custom_expectations
. - The name of the expectation must be added to the file
/lakehouse_engine/core/definitions.py
, to the variable:CUSTOM_EXPECTATION_LIST
. - All new expectations must be tested on
/tests/feature/custom_expectations/test_custom_expectations.py
. In order to create a new test for your custom expectation it is necessary to:
- Copy one of the expectation folders in
tests/resources/feature/custom_expectations
renaming it to your custom expectation. - Make any necessary changes on the data/schema file present.
- On
/tests/feature/custom_expectations/test_custom_expectations.py
add a scenario to test your expectation, all expectations must be tested on batch and streaming. The test is implemented to generate an acon based on each scenario data. - Test your developments to check that everything is working as intended.
3) When the development is completed, create a pull request with your changes.
4) Your expectation will be available with the next release of the lakehouse engine that happens after you pull request is approved. This means that you need to upgrade your version of the lakehouse engine in order to use it.
Usage
Custom Expectations are available to use like any other expectations provided by Great Expectations.
Parameters
Depending on the type of expectation you are defining some parameters are expected by default. Ex: A ColumnMapExpectation has a default "column" parameter.
Mostly
Mostly is a standard parameter for a subset of expectations that is used to define a threshold for the failure of an expectation. Ex: A mostly value of 0.7 makes it so that the expectation only fails if more than 70% of records have a negative result.
Result Format
Great Expectations has several different types of result formats for the expectations results. The lakehouse engine requires the result format to be set to "COMPLETE" in order to tag the lines where the expectations failed.
unexpected_index_column_names
Inside this key you must define what columns are used as an index inside your data. If this is set and the result
format is set to "COMPLETE" a list with the indexes of the lines that failed the validation will be returned by
Great Expectations.
This information is used by the Lakehouse Engine to tag the lines in error after the fact. The additional tests
inside the _validate
method verify that the custom expectation is tagging these lines correctly.