Skip to content

Acon utils

Module to perform validations and resolve the acon.

resolve_dq_functions(acon, execution_point)

Function to resolve the dq functions in the acon.

Parameters:

Name Type Description Default
acon dict

Acon to resolve the dq functions.

required
execution_point str

Execution point of the dq_functions.

required

Returns:

Type Description
dict

Acon after resolving the dq functions.

Source code in mkdocs/lakehouse_engine/packages/utils/acon_utils.py
def resolve_dq_functions(acon: dict, execution_point: str) -> dict:
    """Function to resolve the dq functions in the acon.

    Args:
        acon: Acon to resolve the dq functions.
        execution_point: Execution point of the dq_functions.

    Returns:
        Acon after resolving the dq functions.
    """
    if acon.get("dq_spec"):
        if acon.get("dq_spec").get("dq_type") == DQType.PRISMA.value:
            acon["dq_spec"] = PrismaUtils.build_prisma_dq_spec(
                spec=acon.get("dq_spec"), execution_point=execution_point
            )
    elif acon.get("dq_specs"):
        resolved_dq_specs = []
        for spec in acon.get("dq_specs", []):
            if spec.get("dq_type") == DQType.PRISMA.value:
                resolved_dq_specs.append(
                    PrismaUtils.build_prisma_dq_spec(
                        spec=spec, execution_point=execution_point
                    )
                )
            else:
                resolved_dq_specs.append(spec)
        acon["dq_specs"] = resolved_dq_specs
    return acon

validate_and_resolve_acon(acon, execution_point='')

Function to validate and resolve the acon.

Parameters:

Name Type Description Default
acon dict

Acon to be validated and resolved.

required
execution_point str

Execution point to resolve the dq functions.

''

Returns:

Type Description
dict

Acon after validation and resolution.

Source code in mkdocs/lakehouse_engine/packages/utils/acon_utils.py
def validate_and_resolve_acon(acon: dict, execution_point: str = "") -> dict:
    """Function to validate and resolve the acon.

    Args:
        acon: Acon to be validated and resolved.
        execution_point: Execution point to resolve the dq functions.

    Returns:
        Acon after validation and resolution.
    """
    # Performing validations
    validate_readers(acon)
    validate_writers(acon)

    # Resolving the acon
    if execution_point:
        acon = resolve_dq_functions(acon, execution_point)

    _LOGGER.info(f"Read Algorithm Configuration: {str(acon)}")

    return acon

validate_readers(acon)

Function to validate the readers in the acon.

Parameters:

Name Type Description Default
acon dict

Acon to be validated.

required

Raises:

Type Description
RuntimeError

If the input format is not supported.

Source code in mkdocs/lakehouse_engine/packages/utils/acon_utils.py
def validate_readers(acon: dict) -> None:
    """Function to validate the readers in the acon.

    Args:
        acon: Acon to be validated.

    Raises:
        RuntimeError: If the input format is not supported.
    """
    if "input_specs" in acon.keys() or "input_spec" in acon.keys():
        for spec in acon.get("input_specs", []) or [acon.get("input_spec", {})]:
            if (
                not InputFormat.exists(spec.get("data_format"))
                and "db_table" not in spec.keys()
            ):
                raise WrongIOFormatException(
                    f"Input format not supported: {spec.get('data_format')}"
                )

validate_writers(acon)

Function to validate the writers in the acon.

Parameters:

Name Type Description Default
acon dict

Acon to be validated.

required

Raises:

Type Description
RuntimeError

If the output format is not supported.

Source code in mkdocs/lakehouse_engine/packages/utils/acon_utils.py
def validate_writers(acon: dict) -> None:
    """Function to validate the writers in the acon.

    Args:
        acon: Acon to be validated.

    Raises:
        RuntimeError: If the output format is not supported.
    """
    if "output_specs" in acon.keys() or "output_spec" in acon.keys():
        for spec in acon.get("output_specs", []) or [acon.get("output_spec", {})]:
            if not OutputFormat.exists(spec.get("data_format")):
                raise WrongIOFormatException(
                    f"Output format not supported: {spec.get('data_format')}"
                )