lakehouse_engine.utils.acon_utils

Module to perform validations and resolve the acon.

 1"""Module to perform validations and resolve the acon."""
 2
 3from lakehouse_engine.core.definitions import DQType, InputFormat, OutputFormat
 4from lakehouse_engine.io.exceptions import WrongIOFormatException
 5from lakehouse_engine.utils.dq_utils import PrismaUtils
 6from lakehouse_engine.utils.logging_handler import LoggingHandler
 7
 8_LOGGER = LoggingHandler(__name__).get_logger()
 9
10
11def validate_and_resolve_acon(acon: dict, execution_point: str = "") -> dict:
12    """Function to validate and resolve the acon.
13
14    Args:
15        acon: Acon to be validated and resolved.
16        execution_point: Execution point to resolve the dq functions.
17
18    Returns:
19        Acon after validation and resolution.
20    """
21    # Performing validations
22    validate_readers(acon)
23    validate_writers(acon)
24
25    # Resolving the acon
26    if execution_point:
27        acon = resolve_dq_functions(acon, execution_point)
28
29    _LOGGER.info(f"Read Algorithm Configuration: {str(acon)}")
30
31    return acon
32
33
34def validate_readers(acon: dict) -> None:
35    """Function to validate the readers in the acon.
36
37    Args:
38        acon: Acon to be validated.
39
40    Raises:
41        RuntimeError: If the input format is not supported.
42    """
43    if "input_specs" in acon.keys() or "input_spec" in acon.keys():
44        for spec in acon.get("input_specs", []) or [acon.get("input_spec", {})]:
45            if (
46                not InputFormat.exists(spec.get("data_format"))
47                and "db_table" not in spec.keys()
48            ):
49                raise WrongIOFormatException(
50                    f"Input format not supported: {spec.get('data_format')}"
51                )
52
53
54def validate_writers(acon: dict) -> None:
55    """Function to validate the writers in the acon.
56
57    Args:
58        acon: Acon to be validated.
59
60    Raises:
61        RuntimeError: If the output format is not supported.
62    """
63    if "output_specs" in acon.keys() or "output_spec" in acon.keys():
64        for spec in acon.get("output_specs", []) or [acon.get("output_spec", {})]:
65            if not OutputFormat.exists(spec.get("data_format")):
66                raise WrongIOFormatException(
67                    f"Output format not supported: {spec.get('data_format')}"
68                )
69
70
71def resolve_dq_functions(acon: dict, execution_point: str) -> dict:
72    """Function to resolve the dq functions in the acon.
73
74    Args:
75        acon: Acon to resolve the dq functions.
76        execution_point: Execution point of the dq_functions.
77
78    Returns:
79        Acon after resolving the dq functions.
80    """
81    if acon.get("dq_spec"):
82        if acon.get("dq_spec").get("dq_type") == DQType.PRISMA.value:
83            acon["dq_spec"] = PrismaUtils.build_prisma_dq_spec(
84                spec=acon.get("dq_spec"), execution_point=execution_point
85            )
86    elif acon.get("dq_specs"):
87        resolved_dq_specs = []
88        for spec in acon.get("dq_specs", []):
89            if spec.get("dq_type") == DQType.PRISMA.value:
90                resolved_dq_specs.append(
91                    PrismaUtils.build_prisma_dq_spec(
92                        spec=spec, execution_point=execution_point
93                    )
94                )
95            else:
96                resolved_dq_specs.append(spec)
97        acon["dq_specs"] = resolved_dq_specs
98    return acon
def validate_and_resolve_acon(acon: dict, execution_point: str = '') -> dict:
12def validate_and_resolve_acon(acon: dict, execution_point: str = "") -> dict:
13    """Function to validate and resolve the acon.
14
15    Args:
16        acon: Acon to be validated and resolved.
17        execution_point: Execution point to resolve the dq functions.
18
19    Returns:
20        Acon after validation and resolution.
21    """
22    # Performing validations
23    validate_readers(acon)
24    validate_writers(acon)
25
26    # Resolving the acon
27    if execution_point:
28        acon = resolve_dq_functions(acon, execution_point)
29
30    _LOGGER.info(f"Read Algorithm Configuration: {str(acon)}")
31
32    return acon

Function to validate and resolve the acon.

Arguments:
  • acon: Acon to be validated and resolved.
  • execution_point: Execution point to resolve the dq functions.
Returns:

Acon after validation and resolution.

def validate_readers(acon: dict) -> None:
35def validate_readers(acon: dict) -> None:
36    """Function to validate the readers in the acon.
37
38    Args:
39        acon: Acon to be validated.
40
41    Raises:
42        RuntimeError: If the input format is not supported.
43    """
44    if "input_specs" in acon.keys() or "input_spec" in acon.keys():
45        for spec in acon.get("input_specs", []) or [acon.get("input_spec", {})]:
46            if (
47                not InputFormat.exists(spec.get("data_format"))
48                and "db_table" not in spec.keys()
49            ):
50                raise WrongIOFormatException(
51                    f"Input format not supported: {spec.get('data_format')}"
52                )

Function to validate the readers in the acon.

Arguments:
  • acon: Acon to be validated.
Raises:
  • RuntimeError: If the input format is not supported.
def validate_writers(acon: dict) -> None:
55def validate_writers(acon: dict) -> None:
56    """Function to validate the writers in the acon.
57
58    Args:
59        acon: Acon to be validated.
60
61    Raises:
62        RuntimeError: If the output format is not supported.
63    """
64    if "output_specs" in acon.keys() or "output_spec" in acon.keys():
65        for spec in acon.get("output_specs", []) or [acon.get("output_spec", {})]:
66            if not OutputFormat.exists(spec.get("data_format")):
67                raise WrongIOFormatException(
68                    f"Output format not supported: {spec.get('data_format')}"
69                )

Function to validate the writers in the acon.

Arguments:
  • acon: Acon to be validated.
Raises:
  • RuntimeError: If the output format is not supported.
def resolve_dq_functions(acon: dict, execution_point: str) -> dict:
72def resolve_dq_functions(acon: dict, execution_point: str) -> dict:
73    """Function to resolve the dq functions in the acon.
74
75    Args:
76        acon: Acon to resolve the dq functions.
77        execution_point: Execution point of the dq_functions.
78
79    Returns:
80        Acon after resolving the dq functions.
81    """
82    if acon.get("dq_spec"):
83        if acon.get("dq_spec").get("dq_type") == DQType.PRISMA.value:
84            acon["dq_spec"] = PrismaUtils.build_prisma_dq_spec(
85                spec=acon.get("dq_spec"), execution_point=execution_point
86            )
87    elif acon.get("dq_specs"):
88        resolved_dq_specs = []
89        for spec in acon.get("dq_specs", []):
90            if spec.get("dq_type") == DQType.PRISMA.value:
91                resolved_dq_specs.append(
92                    PrismaUtils.build_prisma_dq_spec(
93                        spec=spec, execution_point=execution_point
94                    )
95                )
96            else:
97                resolved_dq_specs.append(spec)
98        acon["dq_specs"] = resolved_dq_specs
99    return acon

Function to resolve the dq functions in the acon.

Arguments:
  • acon: Acon to resolve the dq functions.
  • execution_point: Execution point of the dq_functions.
Returns:

Acon after resolving the dq functions.