Skip to content

Algorithm

Module containing the Algorithm class.

Algorithm

Bases: Executable

Class to define the behavior of every algorithm based on ACONs.

Source code in mkdocs/lakehouse_engine/packages/algorithms/algorithm.py
class Algorithm(Executable):
    """Class to define the behavior of every algorithm based on ACONs."""

    def __init__(self, acon: dict):
        """Construct Algorithm instances.

        Args:
            acon: algorithm configuration.
        """
        self.acon = acon

    @classmethod
    def get_dq_spec(
        cls, spec: dict
    ) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]:
        """Get data quality specification object from acon.

        Args:
            spec: data quality specifications.

        Returns:
            The DQSpec and the List of DQ Functions Specs.
        """
        dq_spec = DQSpec(
            spec_id=spec["spec_id"],
            input_id=spec["input_id"],
            dq_type=spec["dq_type"],
            dq_functions=[],
            dq_db_table=spec.get("dq_db_table"),
            dq_table_table_filter=spec.get("dq_table_table_filter"),
            dq_table_extra_filters=spec.get(
                "dq_table_extra_filters", DQSpec.dq_table_extra_filters
            ),
            execution_point=spec.get("execution_point"),
            unexpected_rows_pk=spec.get(
                "unexpected_rows_pk", DQSpec.unexpected_rows_pk
            ),
            gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format),
            tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk),
            sort_processed_keys=spec.get(
                "sort_processed_keys", DQSpec.sort_processed_keys
            ),
            tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data),
            data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name),
            expectation_suite_name=spec.get(
                "expectation_suite_name", DQSpec.expectation_suite_name
            ),
            store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value),
            local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir),
            data_docs_local_fs=spec.get(
                "data_docs_local_fs", DQSpec.data_docs_local_fs
            ),
            bucket=spec.get("bucket", DQSpec.bucket),
            data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket),
            checkpoint_store_prefix=spec.get(
                "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value
            ),
            expectations_store_prefix=spec.get(
                "expectations_store_prefix",
                DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
            ),
            data_docs_prefix=spec.get(
                "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value
            ),
            validations_store_prefix=spec.get(
                "validations_store_prefix",
                DQDefaults.VALIDATIONS_STORE_PREFIX.value,
            ),
            result_sink_db_table=spec.get(
                "result_sink_db_table", DQSpec.result_sink_db_table
            ),
            result_sink_location=spec.get(
                "result_sink_location", DQSpec.result_sink_location
            ),
            result_sink_partitions=spec.get(
                "result_sink_partitions", DQSpec.result_sink_partitions
            ),
            result_sink_format=spec.get(
                "result_sink_format", OutputFormat.DELTAFILES.value
            ),
            result_sink_options=spec.get(
                "result_sink_options", DQSpec.result_sink_options
            ),
            result_sink_explode=spec.get(
                "result_sink_explode", DQSpec.result_sink_explode
            ),
            result_sink_extra_columns=spec.get("result_sink_extra_columns", []),
            source=spec.get("source", spec["input_id"]),
            fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error),
            cache_df=spec.get("cache_df", DQSpec.cache_df),
            critical_functions=spec.get(
                "critical_functions", DQSpec.critical_functions
            ),
            max_percentage_failure=spec.get(
                "max_percentage_failure", DQSpec.max_percentage_failure
            ),
        )

        dq_functions = cls._get_dq_functions(spec, "dq_functions")

        critical_functions = cls._get_dq_functions(spec, "critical_functions")

        cls._validate_dq_tag_strategy(dq_spec)

        return dq_spec, dq_functions, critical_functions

    @staticmethod
    def _get_dq_functions(spec: dict, function_key: str) -> List[DQFunctionSpec]:
        """Get DQ Functions from a DQ Spec, based on a function_key.

        Args:
            spec: data quality specifications.
            function_key: dq function key ("dq_functions" or
                "critical_functions").

        Returns:
            a list of DQ Function Specs.
        """
        functions = []

        if spec.get(function_key, []):
            for f in spec.get(function_key, []):
                dq_fn_spec = DQFunctionSpec(
                    function=f["function"],
                    args=f.get("args", {}),
                )
                functions.append(dq_fn_spec)

        return functions

    @staticmethod
    def _validate_dq_tag_strategy(spec: DQSpec) -> None:
        """Validate DQ Spec arguments related with the data tagging strategy.

        Args:
            spec: data quality specifications.
        """
        if spec.tag_source_data:
            spec.gx_result_format = DQSpec.gx_result_format
            spec.fail_on_error = False
            spec.result_sink_explode = DQSpec.result_sink_explode
        elif spec.gx_result_format != DQSpec.gx_result_format:
            spec.tag_source_data = False

__init__(acon)

Construct Algorithm instances.

Parameters:

Name Type Description Default
acon dict

algorithm configuration.

required
Source code in mkdocs/lakehouse_engine/packages/algorithms/algorithm.py
def __init__(self, acon: dict):
    """Construct Algorithm instances.

    Args:
        acon: algorithm configuration.
    """
    self.acon = acon

get_dq_spec(spec) classmethod

Get data quality specification object from acon.

Parameters:

Name Type Description Default
spec dict

data quality specifications.

required

Returns:

Type Description
Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]

The DQSpec and the List of DQ Functions Specs.

Source code in mkdocs/lakehouse_engine/packages/algorithms/algorithm.py
@classmethod
def get_dq_spec(
    cls, spec: dict
) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]:
    """Get data quality specification object from acon.

    Args:
        spec: data quality specifications.

    Returns:
        The DQSpec and the List of DQ Functions Specs.
    """
    dq_spec = DQSpec(
        spec_id=spec["spec_id"],
        input_id=spec["input_id"],
        dq_type=spec["dq_type"],
        dq_functions=[],
        dq_db_table=spec.get("dq_db_table"),
        dq_table_table_filter=spec.get("dq_table_table_filter"),
        dq_table_extra_filters=spec.get(
            "dq_table_extra_filters", DQSpec.dq_table_extra_filters
        ),
        execution_point=spec.get("execution_point"),
        unexpected_rows_pk=spec.get(
            "unexpected_rows_pk", DQSpec.unexpected_rows_pk
        ),
        gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format),
        tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk),
        sort_processed_keys=spec.get(
            "sort_processed_keys", DQSpec.sort_processed_keys
        ),
        tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data),
        data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name),
        expectation_suite_name=spec.get(
            "expectation_suite_name", DQSpec.expectation_suite_name
        ),
        store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value),
        local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir),
        data_docs_local_fs=spec.get(
            "data_docs_local_fs", DQSpec.data_docs_local_fs
        ),
        bucket=spec.get("bucket", DQSpec.bucket),
        data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket),
        checkpoint_store_prefix=spec.get(
            "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value
        ),
        expectations_store_prefix=spec.get(
            "expectations_store_prefix",
            DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
        ),
        data_docs_prefix=spec.get(
            "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value
        ),
        validations_store_prefix=spec.get(
            "validations_store_prefix",
            DQDefaults.VALIDATIONS_STORE_PREFIX.value,
        ),
        result_sink_db_table=spec.get(
            "result_sink_db_table", DQSpec.result_sink_db_table
        ),
        result_sink_location=spec.get(
            "result_sink_location", DQSpec.result_sink_location
        ),
        result_sink_partitions=spec.get(
            "result_sink_partitions", DQSpec.result_sink_partitions
        ),
        result_sink_format=spec.get(
            "result_sink_format", OutputFormat.DELTAFILES.value
        ),
        result_sink_options=spec.get(
            "result_sink_options", DQSpec.result_sink_options
        ),
        result_sink_explode=spec.get(
            "result_sink_explode", DQSpec.result_sink_explode
        ),
        result_sink_extra_columns=spec.get("result_sink_extra_columns", []),
        source=spec.get("source", spec["input_id"]),
        fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error),
        cache_df=spec.get("cache_df", DQSpec.cache_df),
        critical_functions=spec.get(
            "critical_functions", DQSpec.critical_functions
        ),
        max_percentage_failure=spec.get(
            "max_percentage_failure", DQSpec.max_percentage_failure
        ),
    )

    dq_functions = cls._get_dq_functions(spec, "dq_functions")

    critical_functions = cls._get_dq_functions(spec, "critical_functions")

    cls._validate_dq_tag_strategy(dq_spec)

    return dq_spec, dq_functions, critical_functions