lakehouse_engine.algorithms.algorithm
Module containing the Algorithm class.
1"""Module containing the Algorithm class.""" 2 3from typing import List, Tuple 4 5from lakehouse_engine.core.definitions import ( 6 DQDefaults, 7 DQFunctionSpec, 8 DQSpec, 9 OutputFormat, 10) 11from lakehouse_engine.core.executable import Executable 12 13 14class Algorithm(Executable): 15 """Class to define the behavior of every algorithm based on ACONs.""" 16 17 def __init__(self, acon: dict): 18 """Construct Algorithm instances. 19 20 Args: 21 acon: algorithm configuration. 22 """ 23 self.acon = acon 24 25 @classmethod 26 def get_dq_spec( 27 cls, spec: dict 28 ) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]: 29 """Get data quality specification object from acon. 30 31 Args: 32 spec: data quality specifications. 33 34 Returns: 35 The DQSpec and the List of DQ Functions Specs. 36 """ 37 dq_spec = DQSpec( 38 spec_id=spec["spec_id"], 39 input_id=spec["input_id"], 40 dq_type=spec["dq_type"], 41 dq_functions=[], 42 dq_db_table=spec.get("dq_db_table"), 43 execution_point=spec.get("execution_point"), 44 unexpected_rows_pk=spec.get( 45 "unexpected_rows_pk", DQSpec.unexpected_rows_pk 46 ), 47 gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format), 48 tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk), 49 sort_processed_keys=spec.get( 50 "sort_processed_keys", DQSpec.sort_processed_keys 51 ), 52 tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data), 53 data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name), 54 expectation_suite_name=spec.get( 55 "expectation_suite_name", DQSpec.expectation_suite_name 56 ), 57 store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value), 58 local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir), 59 data_docs_local_fs=spec.get( 60 "data_docs_local_fs", DQSpec.data_docs_local_fs 61 ), 62 bucket=spec.get("bucket", DQSpec.bucket), 63 data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket), 64 checkpoint_store_prefix=spec.get( 65 "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value 66 ), 67 expectations_store_prefix=spec.get( 68 "expectations_store_prefix", 69 DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 70 ), 71 data_docs_prefix=spec.get( 72 "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value 73 ), 74 validations_store_prefix=spec.get( 75 "validations_store_prefix", 76 DQDefaults.VALIDATIONS_STORE_PREFIX.value, 77 ), 78 result_sink_db_table=spec.get( 79 "result_sink_db_table", DQSpec.result_sink_db_table 80 ), 81 result_sink_location=spec.get( 82 "result_sink_location", DQSpec.result_sink_location 83 ), 84 result_sink_partitions=spec.get( 85 "result_sink_partitions", DQSpec.result_sink_partitions 86 ), 87 result_sink_format=spec.get( 88 "result_sink_format", OutputFormat.DELTAFILES.value 89 ), 90 result_sink_options=spec.get( 91 "result_sink_options", DQSpec.result_sink_options 92 ), 93 result_sink_explode=spec.get( 94 "result_sink_explode", DQSpec.result_sink_explode 95 ), 96 result_sink_extra_columns=spec.get("result_sink_extra_columns", []), 97 source=spec.get("source", spec["input_id"]), 98 fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error), 99 cache_df=spec.get("cache_df", DQSpec.cache_df), 100 critical_functions=spec.get( 101 "critical_functions", DQSpec.critical_functions 102 ), 103 max_percentage_failure=spec.get( 104 "max_percentage_failure", DQSpec.max_percentage_failure 105 ), 106 ) 107 108 dq_functions = cls._get_dq_functions(spec, "dq_functions") 109 110 critical_functions = cls._get_dq_functions(spec, "critical_functions") 111 112 cls._validate_dq_tag_strategy(dq_spec) 113 114 return dq_spec, dq_functions, critical_functions 115 116 @staticmethod 117 def _get_dq_functions(spec: dict, function_key: str) -> List[DQFunctionSpec]: 118 """Get DQ Functions from a DQ Spec, based on a function_key. 119 120 Args: 121 spec: data quality specifications. 122 function_key: dq function key ("dq_functions" or 123 "critical_functions"). 124 125 Returns: 126 a list of DQ Function Specs. 127 """ 128 functions = [] 129 130 if spec.get(function_key, []): 131 for f in spec.get(function_key, []): 132 dq_fn_spec = DQFunctionSpec( 133 function=f["function"], 134 args=f.get("args", {}), 135 ) 136 functions.append(dq_fn_spec) 137 138 return functions 139 140 @staticmethod 141 def _validate_dq_tag_strategy(spec: DQSpec) -> None: 142 """Validate DQ Spec arguments related with the data tagging strategy. 143 144 Args: 145 spec: data quality specifications. 146 """ 147 if spec.tag_source_data: 148 spec.gx_result_format = DQSpec.gx_result_format 149 spec.fail_on_error = False 150 spec.result_sink_explode = DQSpec.result_sink_explode 151 elif spec.gx_result_format != DQSpec.gx_result_format: 152 spec.tag_source_data = False
15class Algorithm(Executable): 16 """Class to define the behavior of every algorithm based on ACONs.""" 17 18 def __init__(self, acon: dict): 19 """Construct Algorithm instances. 20 21 Args: 22 acon: algorithm configuration. 23 """ 24 self.acon = acon 25 26 @classmethod 27 def get_dq_spec( 28 cls, spec: dict 29 ) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]: 30 """Get data quality specification object from acon. 31 32 Args: 33 spec: data quality specifications. 34 35 Returns: 36 The DQSpec and the List of DQ Functions Specs. 37 """ 38 dq_spec = DQSpec( 39 spec_id=spec["spec_id"], 40 input_id=spec["input_id"], 41 dq_type=spec["dq_type"], 42 dq_functions=[], 43 dq_db_table=spec.get("dq_db_table"), 44 execution_point=spec.get("execution_point"), 45 unexpected_rows_pk=spec.get( 46 "unexpected_rows_pk", DQSpec.unexpected_rows_pk 47 ), 48 gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format), 49 tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk), 50 sort_processed_keys=spec.get( 51 "sort_processed_keys", DQSpec.sort_processed_keys 52 ), 53 tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data), 54 data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name), 55 expectation_suite_name=spec.get( 56 "expectation_suite_name", DQSpec.expectation_suite_name 57 ), 58 store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value), 59 local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir), 60 data_docs_local_fs=spec.get( 61 "data_docs_local_fs", DQSpec.data_docs_local_fs 62 ), 63 bucket=spec.get("bucket", DQSpec.bucket), 64 data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket), 65 checkpoint_store_prefix=spec.get( 66 "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value 67 ), 68 expectations_store_prefix=spec.get( 69 "expectations_store_prefix", 70 DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 71 ), 72 data_docs_prefix=spec.get( 73 "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value 74 ), 75 validations_store_prefix=spec.get( 76 "validations_store_prefix", 77 DQDefaults.VALIDATIONS_STORE_PREFIX.value, 78 ), 79 result_sink_db_table=spec.get( 80 "result_sink_db_table", DQSpec.result_sink_db_table 81 ), 82 result_sink_location=spec.get( 83 "result_sink_location", DQSpec.result_sink_location 84 ), 85 result_sink_partitions=spec.get( 86 "result_sink_partitions", DQSpec.result_sink_partitions 87 ), 88 result_sink_format=spec.get( 89 "result_sink_format", OutputFormat.DELTAFILES.value 90 ), 91 result_sink_options=spec.get( 92 "result_sink_options", DQSpec.result_sink_options 93 ), 94 result_sink_explode=spec.get( 95 "result_sink_explode", DQSpec.result_sink_explode 96 ), 97 result_sink_extra_columns=spec.get("result_sink_extra_columns", []), 98 source=spec.get("source", spec["input_id"]), 99 fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error), 100 cache_df=spec.get("cache_df", DQSpec.cache_df), 101 critical_functions=spec.get( 102 "critical_functions", DQSpec.critical_functions 103 ), 104 max_percentage_failure=spec.get( 105 "max_percentage_failure", DQSpec.max_percentage_failure 106 ), 107 ) 108 109 dq_functions = cls._get_dq_functions(spec, "dq_functions") 110 111 critical_functions = cls._get_dq_functions(spec, "critical_functions") 112 113 cls._validate_dq_tag_strategy(dq_spec) 114 115 return dq_spec, dq_functions, critical_functions 116 117 @staticmethod 118 def _get_dq_functions(spec: dict, function_key: str) -> List[DQFunctionSpec]: 119 """Get DQ Functions from a DQ Spec, based on a function_key. 120 121 Args: 122 spec: data quality specifications. 123 function_key: dq function key ("dq_functions" or 124 "critical_functions"). 125 126 Returns: 127 a list of DQ Function Specs. 128 """ 129 functions = [] 130 131 if spec.get(function_key, []): 132 for f in spec.get(function_key, []): 133 dq_fn_spec = DQFunctionSpec( 134 function=f["function"], 135 args=f.get("args", {}), 136 ) 137 functions.append(dq_fn_spec) 138 139 return functions 140 141 @staticmethod 142 def _validate_dq_tag_strategy(spec: DQSpec) -> None: 143 """Validate DQ Spec arguments related with the data tagging strategy. 144 145 Args: 146 spec: data quality specifications. 147 """ 148 if spec.tag_source_data: 149 spec.gx_result_format = DQSpec.gx_result_format 150 spec.fail_on_error = False 151 spec.result_sink_explode = DQSpec.result_sink_explode 152 elif spec.gx_result_format != DQSpec.gx_result_format: 153 spec.tag_source_data = False
Class to define the behavior of every algorithm based on ACONs.
Algorithm(acon: dict)
18 def __init__(self, acon: dict): 19 """Construct Algorithm instances. 20 21 Args: 22 acon: algorithm configuration. 23 """ 24 self.acon = acon
Construct Algorithm instances.
Arguments:
- acon: algorithm configuration.
@classmethod
def
get_dq_spec( cls, spec: dict) -> Tuple[lakehouse_engine.core.definitions.DQSpec, List[lakehouse_engine.core.definitions.DQFunctionSpec], List[lakehouse_engine.core.definitions.DQFunctionSpec]]:
26 @classmethod 27 def get_dq_spec( 28 cls, spec: dict 29 ) -> Tuple[DQSpec, List[DQFunctionSpec], List[DQFunctionSpec]]: 30 """Get data quality specification object from acon. 31 32 Args: 33 spec: data quality specifications. 34 35 Returns: 36 The DQSpec and the List of DQ Functions Specs. 37 """ 38 dq_spec = DQSpec( 39 spec_id=spec["spec_id"], 40 input_id=spec["input_id"], 41 dq_type=spec["dq_type"], 42 dq_functions=[], 43 dq_db_table=spec.get("dq_db_table"), 44 execution_point=spec.get("execution_point"), 45 unexpected_rows_pk=spec.get( 46 "unexpected_rows_pk", DQSpec.unexpected_rows_pk 47 ), 48 gx_result_format=spec.get("gx_result_format", DQSpec.gx_result_format), 49 tbl_to_derive_pk=spec.get("tbl_to_derive_pk", DQSpec.tbl_to_derive_pk), 50 sort_processed_keys=spec.get( 51 "sort_processed_keys", DQSpec.sort_processed_keys 52 ), 53 tag_source_data=spec.get("tag_source_data", DQSpec.tag_source_data), 54 data_asset_name=spec.get("data_asset_name", DQSpec.data_asset_name), 55 expectation_suite_name=spec.get( 56 "expectation_suite_name", DQSpec.expectation_suite_name 57 ), 58 store_backend=spec.get("store_backend", DQDefaults.STORE_BACKEND.value), 59 local_fs_root_dir=spec.get("local_fs_root_dir", DQSpec.local_fs_root_dir), 60 data_docs_local_fs=spec.get( 61 "data_docs_local_fs", DQSpec.data_docs_local_fs 62 ), 63 bucket=spec.get("bucket", DQSpec.bucket), 64 data_docs_bucket=spec.get("data_docs_bucket", DQSpec.data_docs_bucket), 65 checkpoint_store_prefix=spec.get( 66 "checkpoint_store_prefix", DQDefaults.CHECKPOINT_STORE_PREFIX.value 67 ), 68 expectations_store_prefix=spec.get( 69 "expectations_store_prefix", 70 DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 71 ), 72 data_docs_prefix=spec.get( 73 "data_docs_prefix", DQDefaults.DATA_DOCS_PREFIX.value 74 ), 75 validations_store_prefix=spec.get( 76 "validations_store_prefix", 77 DQDefaults.VALIDATIONS_STORE_PREFIX.value, 78 ), 79 result_sink_db_table=spec.get( 80 "result_sink_db_table", DQSpec.result_sink_db_table 81 ), 82 result_sink_location=spec.get( 83 "result_sink_location", DQSpec.result_sink_location 84 ), 85 result_sink_partitions=spec.get( 86 "result_sink_partitions", DQSpec.result_sink_partitions 87 ), 88 result_sink_format=spec.get( 89 "result_sink_format", OutputFormat.DELTAFILES.value 90 ), 91 result_sink_options=spec.get( 92 "result_sink_options", DQSpec.result_sink_options 93 ), 94 result_sink_explode=spec.get( 95 "result_sink_explode", DQSpec.result_sink_explode 96 ), 97 result_sink_extra_columns=spec.get("result_sink_extra_columns", []), 98 source=spec.get("source", spec["input_id"]), 99 fail_on_error=spec.get("fail_on_error", DQSpec.fail_on_error), 100 cache_df=spec.get("cache_df", DQSpec.cache_df), 101 critical_functions=spec.get( 102 "critical_functions", DQSpec.critical_functions 103 ), 104 max_percentage_failure=spec.get( 105 "max_percentage_failure", DQSpec.max_percentage_failure 106 ), 107 ) 108 109 dq_functions = cls._get_dq_functions(spec, "dq_functions") 110 111 critical_functions = cls._get_dq_functions(spec, "critical_functions") 112 113 cls._validate_dq_tag_strategy(dq_spec) 114 115 return dq_spec, dq_functions, critical_functions
Get data quality specification object from acon.
Arguments:
- spec: data quality specifications.
Returns:
The DQSpec and the List of DQ Functions Specs.