lakehouse_engine.algorithms.algorithm

Module containing the Algorithm class.

  1"""Module containing the Algorithm class."""
  2
  3from typing import List, Tuple
  4
  5from lakehouse_engine.core.definitions import (
  6    DQDefaults,
  7    DQFunctionSpec,
  8    DQSpec,
  9    OutputFormat,
 10)
 11from lakehouse_engine.core.executable import Executable
 12
 13
 14class Algorithm(Executable):
 15    """Class to define the behavior of every algorithm based on ACONs."""
 16
 17    def __init__(self, acon: dict):
 18        """Construct Algorithm instances.
 19
 20        Args:
 21            acon: algorithm configuration.
 22        """
 23        self.acon = acon
 24
 25    @classmethod
 26    def get_dq_spec(
 27        cls, spec: dict
 28    ) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]:
 29        """Get data quality specification object from acon.
 30
 31        Args:
 32            spec: data quality specifications.
 33
 34        Returns:
 35            The DQSpec and the List of DQ Functions Specs.
 36        """
 37        dq_spec = DQSpec(
 38            spec_id=spec["spec_id"],
 39            input_id=spec["input_id"],
 40            dq_type=spec["dq_type"],
 41            dq_functions=[],
 42            dq_db_table=spec.get("dq_db_table"),
 43            execution_point=spec.get("execution_point"),
 44            unexpected_rows_pk=spec.get(
 45                "unexpected_rows_pk", DQSpec.unexpected_rows_pk
 46            ),
 47            gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format),
 48            tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk),
 49            sort_processed_keys=spec.get(
 50                "sort_processed_keys", DQSpec.sort_processed_keys
 51            ),
 52            tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data),
 53            data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name),
 54            expectation_suite_name=spec.get(
 55                "expectation_suite_name", DQSpec.expectation_suite_name
 56            ),
 57            store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value),
 58            local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir),
 59            data_docs_local_fs=spec.get(
 60                "data_docs_local_fs", DQSpec.data_docs_local_fs
 61            ),
 62            bucket=spec.get("bucket", DQSpec.bucket),
 63            data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket),
 64            checkpoint_store_prefix=spec.get(
 65                "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value
 66            ),
 67            expectations_store_prefix=spec.get(
 68                "expectations_store_prefix",
 69                DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
 70            ),
 71            data_docs_prefix=spec.get(
 72                "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value
 73            ),
 74            validations_store_prefix=spec.get(
 75                "validations_store_prefix",
 76                DQDefaults.VALIDATIONS_STORE_PREFIX.value,
 77            ),
 78            result_sink_db_table=spec.get(
 79                "result_sink_db_table", DQSpec.result_sink_db_table
 80            ),
 81            result_sink_location=spec.get(
 82                "result_sink_location", DQSpec.result_sink_location
 83            ),
 84            result_sink_partitions=spec.get(
 85                "result_sink_partitions", DQSpec.result_sink_partitions
 86            ),
 87            result_sink_format=spec.get(
 88                "result_sink_format", OutputFormat.DELTAFILES.value
 89            ),
 90            result_sink_options=spec.get(
 91                "result_sink_options", DQSpec.result_sink_options
 92            ),
 93            result_sink_explode=spec.get(
 94                "result_sink_explode", DQSpec.result_sink_explode
 95            ),
 96            result_sink_extra_columns=spec.get("result_sink_extra_columns", []),
 97            source=spec.get("source", spec["input_id"]),
 98            fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error),
 99            cache_df=spec.get("cache_df", DQSpec.cache_df),
100            critical_functions=spec.get(
101                "critical_functions", DQSpec.critical_functions
102            ),
103            max_percentage_failure=spec.get(
104                "max_percentage_failure", DQSpec.max_percentage_failure
105            ),
106        )
107
108        dq_functions = cls._get_dq_functions(spec, "dq_functions")
109
110        critical_functions = cls._get_dq_functions(spec, "critical_functions")
111
112        cls._validate_dq_tag_strategy(dq_spec)
113
114        return dq_spec, dq_functions, critical_functions
115
116    @staticmethod
117    def _get_dq_functions(spec: dict, function_key: str) -> List[DQFunctionSpec]:
118        """Get DQ Functions from a DQ Spec, based on a function_key.
119
120        Args:
121            spec: data quality specifications.
122            function_key: dq function key ("dq_functions" or
123                "critical_functions").
124
125        Returns:
126            a list of DQ Function Specs.
127        """
128        functions = []
129
130        if spec.get(function_key, []):
131            for f in spec.get(function_key, []):
132                dq_fn_spec = DQFunctionSpec(
133                    function=f["function"],
134                    args=f.get("args", {}),
135                )
136                functions.append(dq_fn_spec)
137
138        return functions
139
140    @staticmethod
141    def _validate_dq_tag_strategy(spec: DQSpec) -> None:
142        """Validate DQ Spec arguments related with the data tagging strategy.
143
144        Args:
145            spec: data quality specifications.
146        """
147        if spec.tag_source_data:
148            spec.gx_result_format = DQSpec.gx_result_format
149            spec.fail_on_error = False
150            spec.result_sink_explode = DQSpec.result_sink_explode
151        elif spec.gx_result_format != DQSpec.gx_result_format:
152            spec.tag_source_data = False
class Algorithm(lakehouse_engine.core.executable.Executable):
 15class Algorithm(Executable):
 16    """Class to define the behavior of every algorithm based on ACONs."""
 17
 18    def __init__(self, acon: dict):
 19        """Construct Algorithm instances.
 20
 21        Args:
 22            acon: algorithm configuration.
 23        """
 24        self.acon = acon
 25
 26    @classmethod
 27    def get_dq_spec(
 28        cls, spec: dict
 29    ) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]:
 30        """Get data quality specification object from acon.
 31
 32        Args:
 33            spec: data quality specifications.
 34
 35        Returns:
 36            The DQSpec and the List of DQ Functions Specs.
 37        """
 38        dq_spec = DQSpec(
 39            spec_id=spec["spec_id"],
 40            input_id=spec["input_id"],
 41            dq_type=spec["dq_type"],
 42            dq_functions=[],
 43            dq_db_table=spec.get("dq_db_table"),
 44            execution_point=spec.get("execution_point"),
 45            unexpected_rows_pk=spec.get(
 46                "unexpected_rows_pk", DQSpec.unexpected_rows_pk
 47            ),
 48            gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format),
 49            tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk),
 50            sort_processed_keys=spec.get(
 51                "sort_processed_keys", DQSpec.sort_processed_keys
 52            ),
 53            tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data),
 54            data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name),
 55            expectation_suite_name=spec.get(
 56                "expectation_suite_name", DQSpec.expectation_suite_name
 57            ),
 58            store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value),
 59            local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir),
 60            data_docs_local_fs=spec.get(
 61                "data_docs_local_fs", DQSpec.data_docs_local_fs
 62            ),
 63            bucket=spec.get("bucket", DQSpec.bucket),
 64            data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket),
 65            checkpoint_store_prefix=spec.get(
 66                "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value
 67            ),
 68            expectations_store_prefix=spec.get(
 69                "expectations_store_prefix",
 70                DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
 71            ),
 72            data_docs_prefix=spec.get(
 73                "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value
 74            ),
 75            validations_store_prefix=spec.get(
 76                "validations_store_prefix",
 77                DQDefaults.VALIDATIONS_STORE_PREFIX.value,
 78            ),
 79            result_sink_db_table=spec.get(
 80                "result_sink_db_table", DQSpec.result_sink_db_table
 81            ),
 82            result_sink_location=spec.get(
 83                "result_sink_location", DQSpec.result_sink_location
 84            ),
 85            result_sink_partitions=spec.get(
 86                "result_sink_partitions", DQSpec.result_sink_partitions
 87            ),
 88            result_sink_format=spec.get(
 89                "result_sink_format", OutputFormat.DELTAFILES.value
 90            ),
 91            result_sink_options=spec.get(
 92                "result_sink_options", DQSpec.result_sink_options
 93            ),
 94            result_sink_explode=spec.get(
 95                "result_sink_explode", DQSpec.result_sink_explode
 96            ),
 97            result_sink_extra_columns=spec.get("result_sink_extra_columns", []),
 98            source=spec.get("source", spec["input_id"]),
 99            fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error),
100            cache_df=spec.get("cache_df", DQSpec.cache_df),
101            critical_functions=spec.get(
102                "critical_functions", DQSpec.critical_functions
103            ),
104            max_percentage_failure=spec.get(
105                "max_percentage_failure", DQSpec.max_percentage_failure
106            ),
107        )
108
109        dq_functions = cls._get_dq_functions(spec, "dq_functions")
110
111        critical_functions = cls._get_dq_functions(spec, "critical_functions")
112
113        cls._validate_dq_tag_strategy(dq_spec)
114
115        return dq_spec, dq_functions, critical_functions
116
117    @staticmethod
118    def _get_dq_functions(spec: dict, function_key: str) -> List[DQFunctionSpec]:
119        """Get DQ Functions from a DQ Spec, based on a function_key.
120
121        Args:
122            spec: data quality specifications.
123            function_key: dq function key ("dq_functions" or
124                "critical_functions").
125
126        Returns:
127            a list of DQ Function Specs.
128        """
129        functions = []
130
131        if spec.get(function_key, []):
132            for f in spec.get(function_key, []):
133                dq_fn_spec = DQFunctionSpec(
134                    function=f["function"],
135                    args=f.get("args", {}),
136                )
137                functions.append(dq_fn_spec)
138
139        return functions
140
141    @staticmethod
142    def _validate_dq_tag_strategy(spec: DQSpec) -> None:
143        """Validate DQ Spec arguments related with the data tagging strategy.
144
145        Args:
146            spec: data quality specifications.
147        """
148        if spec.tag_source_data:
149            spec.gx_result_format = DQSpec.gx_result_format
150            spec.fail_on_error = False
151            spec.result_sink_explode = DQSpec.result_sink_explode
152        elif spec.gx_result_format != DQSpec.gx_result_format:
153            spec.tag_source_data = False

Class to define the behavior of every algorithm based on ACONs.

Algorithm(acon: dict)
18    def __init__(self, acon: dict):
19        """Construct Algorithm instances.
20
21        Args:
22            acon: algorithm configuration.
23        """
24        self.acon = acon

Construct Algorithm instances.

Arguments:
  • acon: algorithm configuration.
acon
 26    @classmethod
 27    def get_dq_spec(
 28        cls, spec: dict
 29    ) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]:
 30        """Get data quality specification object from acon.
 31
 32        Args:
 33            spec: data quality specifications.
 34
 35        Returns:
 36            The DQSpec and the List of DQ Functions Specs.
 37        """
 38        dq_spec = DQSpec(
 39            spec_id=spec["spec_id"],
 40            input_id=spec["input_id"],
 41            dq_type=spec["dq_type"],
 42            dq_functions=[],
 43            dq_db_table=spec.get("dq_db_table"),
 44            execution_point=spec.get("execution_point"),
 45            unexpected_rows_pk=spec.get(
 46                "unexpected_rows_pk", DQSpec.unexpected_rows_pk
 47            ),
 48            gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format),
 49            tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk),
 50            sort_processed_keys=spec.get(
 51                "sort_processed_keys", DQSpec.sort_processed_keys
 52            ),
 53            tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data),
 54            data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name),
 55            expectation_suite_name=spec.get(
 56                "expectation_suite_name", DQSpec.expectation_suite_name
 57            ),
 58            store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value),
 59            local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir),
 60            data_docs_local_fs=spec.get(
 61                "data_docs_local_fs", DQSpec.data_docs_local_fs
 62            ),
 63            bucket=spec.get("bucket", DQSpec.bucket),
 64            data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket),
 65            checkpoint_store_prefix=spec.get(
 66                "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value
 67            ),
 68            expectations_store_prefix=spec.get(
 69                "expectations_store_prefix",
 70                DQDefaults.EXPECTATIONS_STORE_PREFIX.value,
 71            ),
 72            data_docs_prefix=spec.get(
 73                "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value
 74            ),
 75            validations_store_prefix=spec.get(
 76                "validations_store_prefix",
 77                DQDefaults.VALIDATIONS_STORE_PREFIX.value,
 78            ),
 79            result_sink_db_table=spec.get(
 80                "result_sink_db_table", DQSpec.result_sink_db_table
 81            ),
 82            result_sink_location=spec.get(
 83                "result_sink_location", DQSpec.result_sink_location
 84            ),
 85            result_sink_partitions=spec.get(
 86                "result_sink_partitions", DQSpec.result_sink_partitions
 87            ),
 88            result_sink_format=spec.get(
 89                "result_sink_format", OutputFormat.DELTAFILES.value
 90            ),
 91            result_sink_options=spec.get(
 92                "result_sink_options", DQSpec.result_sink_options
 93            ),
 94            result_sink_explode=spec.get(
 95                "result_sink_explode", DQSpec.result_sink_explode
 96            ),
 97            result_sink_extra_columns=spec.get("result_sink_extra_columns", []),
 98            source=spec.get("source", spec["input_id"]),
 99            fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error),
100            cache_df=spec.get("cache_df", DQSpec.cache_df),
101            critical_functions=spec.get(
102                "critical_functions", DQSpec.critical_functions
103            ),
104            max_percentage_failure=spec.get(
105                "max_percentage_failure", DQSpec.max_percentage_failure
106            ),
107        )
108
109        dq_functions = cls._get_dq_functions(spec, "dq_functions")
110
111        critical_functions = cls._get_dq_functions(spec, "critical_functions")
112
113        cls._validate_dq_tag_strategy(dq_spec)
114
115        return dq_spec, dq_functions, critical_functions

Get data quality specification object from acon.

Arguments:
  • spec: data quality specifications.
Returns:

The DQSpec and the List of DQ Functions Specs.