lakehouse_engine.utils.acon_utils
Module to perform validations and resolve the acon.
1"""Module to perform validations and resolve the acon.""" 2 3from lakehouse_engine.core.definitions import DQType, InputFormat, OutputFormat 4from lakehouse_engine.io.exceptions import WrongIOFormatException 5from lakehouse_engine.utils.dq_utils import PrismaUtils 6from lakehouse_engine.utils.logging_handler import LoggingHandler 7 8_LOGGER = LoggingHandler(__name__).get_logger() 9 10 11def validate_and_resolve_acon(acon: dict, execution_point: str = "") -> dict: 12 """Function to validate and resolve the acon. 13 14 Args: 15 acon: Acon to be validated and resolved. 16 execution_point: Execution point to resolve the dq functions. 17 18 Returns: 19 Acon after validation and resolution. 20 """ 21 # Performing validations 22 validate_readers(acon) 23 validate_writers(acon) 24 25 # Resolving the acon 26 if execution_point: 27 acon = resolve_dq_functions(acon, execution_point) 28 29 _LOGGER.info(f"Read Algorithm Configuration: {str(acon)}") 30 31 return acon 32 33 34def validate_readers(acon: dict) -> None: 35 """Function to validate the readers in the acon. 36 37 Args: 38 acon: Acon to be validated. 39 40 Raises: 41 RuntimeError: If the input format is not supported. 42 """ 43 if "input_specs" in acon.keys() or "input_spec" in acon.keys(): 44 for spec in acon.get("input_specs", []) or [acon.get("input_spec", {})]: 45 if ( 46 not InputFormat.exists(spec.get("data_format")) 47 and "db_table" not in spec.keys() 48 ): 49 raise WrongIOFormatException( 50 f"Input format not supported: {spec.get('data_format')}" 51 ) 52 53 54def validate_writers(acon: dict) -> None: 55 """Function to validate the writers in the acon. 56 57 Args: 58 acon: Acon to be validated. 59 60 Raises: 61 RuntimeError: If the output format is not supported. 62 """ 63 if "output_specs" in acon.keys() or "output_spec" in acon.keys(): 64 for spec in acon.get("output_specs", []) or [acon.get("output_spec", {})]: 65 if not OutputFormat.exists(spec.get("data_format")): 66 raise WrongIOFormatException( 67 f"Output format not supported: {spec.get('data_format')}" 68 ) 69 70 71def resolve_dq_functions(acon: dict, execution_point: str) -> dict: 72 """Function to resolve the dq functions in the acon. 73 74 Args: 75 acon: Acon to resolve the dq functions. 76 execution_point: Execution point of the dq_functions. 77 78 Returns: 79 Acon after resolving the dq functions. 80 """ 81 if acon.get("dq_spec"): 82 if acon.get("dq_spec").get("dq_type") == DQType.PRISMA.value: 83 acon["dq_spec"] = PrismaUtils.build_prisma_dq_spec( 84 spec=acon.get("dq_spec"), execution_point=execution_point 85 ) 86 elif acon.get("dq_specs"): 87 resolved_dq_specs = [] 88 for spec in acon.get("dq_specs", []): 89 if spec.get("dq_type") == DQType.PRISMA.value: 90 resolved_dq_specs.append( 91 PrismaUtils.build_prisma_dq_spec( 92 spec=spec, execution_point=execution_point 93 ) 94 ) 95 else: 96 resolved_dq_specs.append(spec) 97 acon["dq_specs"] = resolved_dq_specs 98 return acon
def
validate_and_resolve_acon(acon: dict, execution_point: str = '') -> dict:
12def validate_and_resolve_acon(acon: dict, execution_point: str = "") -> dict: 13 """Function to validate and resolve the acon. 14 15 Args: 16 acon: Acon to be validated and resolved. 17 execution_point: Execution point to resolve the dq functions. 18 19 Returns: 20 Acon after validation and resolution. 21 """ 22 # Performing validations 23 validate_readers(acon) 24 validate_writers(acon) 25 26 # Resolving the acon 27 if execution_point: 28 acon = resolve_dq_functions(acon, execution_point) 29 30 _LOGGER.info(f"Read Algorithm Configuration: {str(acon)}") 31 32 return acon
Function to validate and resolve the acon.
Arguments:
- acon: Acon to be validated and resolved.
- execution_point: Execution point to resolve the dq functions.
Returns:
Acon after validation and resolution.
def
validate_readers(acon: dict) -> None:
35def validate_readers(acon: dict) -> None: 36 """Function to validate the readers in the acon. 37 38 Args: 39 acon: Acon to be validated. 40 41 Raises: 42 RuntimeError: If the input format is not supported. 43 """ 44 if "input_specs" in acon.keys() or "input_spec" in acon.keys(): 45 for spec in acon.get("input_specs", []) or [acon.get("input_spec", {})]: 46 if ( 47 not InputFormat.exists(spec.get("data_format")) 48 and "db_table" not in spec.keys() 49 ): 50 raise WrongIOFormatException( 51 f"Input format not supported: {spec.get('data_format')}" 52 )
Function to validate the readers in the acon.
Arguments:
- acon: Acon to be validated.
Raises:
- RuntimeError: If the input format is not supported.
def
validate_writers(acon: dict) -> None:
55def validate_writers(acon: dict) -> None: 56 """Function to validate the writers in the acon. 57 58 Args: 59 acon: Acon to be validated. 60 61 Raises: 62 RuntimeError: If the output format is not supported. 63 """ 64 if "output_specs" in acon.keys() or "output_spec" in acon.keys(): 65 for spec in acon.get("output_specs", []) or [acon.get("output_spec", {})]: 66 if not OutputFormat.exists(spec.get("data_format")): 67 raise WrongIOFormatException( 68 f"Output format not supported: {spec.get('data_format')}" 69 )
Function to validate the writers in the acon.
Arguments:
- acon: Acon to be validated.
Raises:
- RuntimeError: If the output format is not supported.
def
resolve_dq_functions(acon: dict, execution_point: str) -> dict:
72def resolve_dq_functions(acon: dict, execution_point: str) -> dict: 73 """Function to resolve the dq functions in the acon. 74 75 Args: 76 acon: Acon to resolve the dq functions. 77 execution_point: Execution point of the dq_functions. 78 79 Returns: 80 Acon after resolving the dq functions. 81 """ 82 if acon.get("dq_spec"): 83 if acon.get("dq_spec").get("dq_type") == DQType.PRISMA.value: 84 acon["dq_spec"] = PrismaUtils.build_prisma_dq_spec( 85 spec=acon.get("dq_spec"), execution_point=execution_point 86 ) 87 elif acon.get("dq_specs"): 88 resolved_dq_specs = [] 89 for spec in acon.get("dq_specs", []): 90 if spec.get("dq_type") == DQType.PRISMA.value: 91 resolved_dq_specs.append( 92 PrismaUtils.build_prisma_dq_spec( 93 spec=spec, execution_point=execution_point 94 ) 95 ) 96 else: 97 resolved_dq_specs.append(spec) 98 acon["dq_specs"] = resolved_dq_specs 99 return acon
Function to resolve the dq functions in the acon.
Arguments:
- acon: Acon to resolve the dq functions.
- execution_point: Execution point of the dq_functions.
Returns:
Acon after resolving the dq functions.