Skip to content

Definitions

Definitions of standard values and structures for core components.

CollectEngineUsage

Bases: Enum

Options for collecting engine usage stats.

  • enabled, enables the collection and storage of Lakehouse Engine usage statistics for any environment.
  • prod_only, enables the collection and storage of Lakehouse Engine usage statistics for production environment only.
  • disabled, disables the collection and storage of Lakehouse Engine usage statistics, for all environments.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class CollectEngineUsage(Enum):
    """Options for collecting engine usage stats.

    - enabled, enables the collection and storage of Lakehouse Engine
    usage statistics for any environment.
    - prod_only, enables the collection and storage of Lakehouse Engine
    usage statistics for production environment only.
    - disabled, disables the collection and storage of Lakehouse Engine
    usage statistics, for all environments.
    """

    ENABLED = "enabled"
    PROD_ONLY = "prod_only"
    DISABLED = "disabled"

DQDefaults

Bases: Enum

Defaults used on the data quality process.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class DQDefaults(Enum):
    """Defaults used on the data quality process."""

    FILE_SYSTEM_STORE = "file_system"
    FILE_SYSTEM_S3_STORE = "s3"
    DQ_BATCH_IDENTIFIERS = ["spec_id", "input_id", "timestamp"]
    DATASOURCE_CLASS_NAME = "Datasource"
    DATASOURCE_EXECUTION_ENGINE = "SparkDFExecutionEngine"
    DATA_CONNECTORS_CLASS_NAME = "RuntimeDataConnector"
    DATA_CONNECTORS_MODULE_NAME = "great_expectations.datasource.data_connector"
    STORE_BACKEND = "s3"
    EXPECTATIONS_STORE_PREFIX = "dq/expectations/"
    VALIDATIONS_STORE_PREFIX = "dq/validations/"
    CHECKPOINT_STORE_PREFIX = "dq/checkpoints/"
    CUSTOM_EXPECTATION_LIST = [
        "expect_column_values_to_be_date_not_older_than",
        "expect_column_pair_a_to_be_smaller_or_equal_than_b",
        "expect_multicolumn_column_a_must_equal_b_or_c",
        "expect_queried_column_agg_value_to_be",
        "expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b",
        "expect_column_pair_a_to_be_not_equal_to_b",
        "expect_column_values_to_not_be_null_or_empty_string",
    ]
    DQ_COLUMNS_TO_KEEP_TYPES = [
        "success",
        "run_time",
        "validation_results",
        "expectation_success",
        "exception_info",
        "meta",
        "run_time_year",
        "run_time_month",
        "run_time_day",
        "source_primary_key",
        "evaluated_expectations",
        "success_percent",
        "successful_expectations",
        "unsuccessful_expectations",
        "unexpected_index_list",
    ]
    DQ_VALIDATIONS_SCHEMA = StructType(
        [
            StructField(
                "dq_validations",
                StructType(
                    [
                        StructField("run_name", StringType()),
                        StructField("run_success", BooleanType()),
                        StructField("raised_exceptions", BooleanType()),
                        StructField("run_row_success", BooleanType()),
                        StructField(
                            "dq_failure_details",
                            ArrayType(
                                StructType(
                                    [
                                        StructField("expectation_type", StringType()),
                                        StructField("kwargs", StringType()),
                                    ]
                                ),
                            ),
                        ),
                    ]
                ),
            )
        ]
    )

DQExecutionPoint

Bases: Enum

Available data quality execution points.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class DQExecutionPoint(Enum):
    """Available data quality execution points."""

    IN_MOTION = "in_motion"
    AT_REST = "at_rest"

DQFunctionSpec dataclass

Bases: object

Defines a data quality function specification.

  • function - name of the data quality function (expectation) to execute. It follows the great_expectations api https://greatexpectations.io/expectations/.
  • args - args of the function (expectation). Follow the same api as above.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class DQFunctionSpec(object):
    """Defines a data quality function specification.

    - function - name of the data quality function (expectation) to execute.
    It follows the great_expectations api https://greatexpectations.io/expectations/.
    - args - args of the function (expectation). Follow the same api as above.
    """

    function: str
    args: Optional[dict] = None

DQResultFormat

Bases: Enum

Available data quality result formats.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class DQResultFormat(Enum):
    """Available data quality result formats."""

    COMPLETE = "COMPLETE"

DQSpec dataclass

Bases: object

Data quality overall specification.

  • spec_id - id of the specification.
  • input_id - id of the input specification.
  • dq_type - type of DQ process to execute (e.g. validator).
  • dq_functions - list of function specifications to execute.
  • dq_db_table - name of table to derive the dq functions from.
  • dq_table_table_filter - name of the table which rules are to be applied in the validations (Only used when deriving dq functions).
  • dq_table_extra_filters - extra filters to be used when deriving dq functions. This is a sql expression to be applied to the dq_db_table.
  • execution_point - execution point of the dq functions. [at_rest, in_motion]. This is set during the load_data or dq_validator functions.
  • unexpected_rows_pk - the list of columns composing the primary key of the source data to identify the rows failing the DQ validations. Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It is mandatory to provide one of these arguments when using tag_source_data as True. When tag_source_data is False, this is not mandatory, but still recommended.
  • tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from. Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It is mandatory to provide one of these arguments when using tag_source_data as True. hen tag_source_data is False, this is not mandatory, but still recommended.
  • gx_result_format - great expectations result format. Default: "COMPLETE".
  • tag_source_data - when set to true, this will ensure that the DQ process ends by tagging the source data with an additional column with information about the DQ results. This column makes it possible to identify if the DQ run was succeeded in general and, if not, it unlocks the insights to know what specific rows have made the DQ validations fail and why. Default: False. Note: it only works if result_sink_explode is True, gx_result_format is COMPLETE, fail_on_error is False (which is done automatically when you specify tag_source_data as True) and tbl_to_derive_pk or unexpected_rows_pk is configured.
  • store_backend - which store_backend to use (e.g. s3 or file_system).
  • local_fs_root_dir - path of the root directory. Note: only applicable for store_backend file_system.
  • bucket - the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3.
  • expectations_store_prefix - prefix where to store expectations' data. Note: only applicable for store_backend s3.
  • validations_store_prefix - prefix where to store validations' data. Note: only applicable for store_backend s3.
  • checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only applicable for store_backend s3.
  • data_asset_name - name of the data asset to consider when configuring the great expectations' data source.
  • expectation_suite_name - name to consider for great expectations' suite.
  • result_sink_db_table - db.table_name indicating the database and table in which to save the results of the DQ process.
  • result_sink_location - file system location in which to save the results of the DQ process.
  • result_sink_chunk_size - number of records per chunk when writing the results of the DQ process. Default: 1000000 records.
  • processed_keys_location - file system location where the keys processed by the DQ Process are saved. This is specifically used when the DQ Type is PRISMA. Note that this location is always constructed during the process, so any value defined in the configuration will be overwritten.
  • data_product_name - name of the data product.
  • result_sink_partitions - the list of partitions to consider.
  • result_sink_format - format of the result table (e.g. delta, parquet, kafka...).
  • result_sink_options - extra spark options for configuring the result sink. E.g: can be used to configure a Kafka sink if result_sink_format is kafka.
  • result_sink_explode - flag to determine if the output table/location should have the columns exploded (as True) or not (as False). Default: True.
  • result_sink_extra_columns - list of extra columns to be exploded (following the pattern ".*") or columns to be selected. It is only used when result_sink_explode is set to True.
  • source - name of data source, to be easier to identify in analysis. If not specified, it is set as default . This will be only used when result_sink_explode is set to True.
  • fail_on_error - whether to fail the algorithm if the validations of your data in the DQ process failed.
  • cache_df - whether to cache the dataframe before running the DQ process or not.
  • critical_functions - functions that should not fail. When this argument is defined, fail_on_error is nullified.
  • max_percentage_failure - percentage of failure that should be allowed. This argument has priority over both fail_on_error and critical_functions.
  • enable_row_condition - flag to determine if the row_conditions should be enabled or not. row_conditions allow you to filter the rows that are processed by the DQ functions. This is useful when you want to run the DQ functions only on a subset of the data. Default: False. Note: When using PRISMA, if you enable this flag, bear in mind that the number of processed keys will be numerically different from the evaluated keys. This happens because the row_conditions limit the number of rows that are processed by the DQ functions, but the we consider processed keys as all the keys that are passed to the dq_spec.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class DQSpec(object):
    """Data quality overall specification.

    - spec_id - id of the specification.
    - input_id - id of the input specification.
    - dq_type - type of DQ process to execute (e.g. validator).
    - dq_functions - list of function specifications to execute.
    - dq_db_table - name of table to derive the dq functions from.
    - dq_table_table_filter - name of the table which rules are to be applied in the
        validations (Only used when deriving dq functions).
    - dq_table_extra_filters - extra filters to be used when deriving dq functions.
        This is a sql expression to be applied to the dq_db_table.
    - execution_point - execution point of the dq functions. [at_rest, in_motion].
        This is set during the load_data or dq_validator functions.
    - unexpected_rows_pk - the list of columns composing the primary key of the
        source data to identify the rows failing the DQ validations. Note: only one
        of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It
        is mandatory to provide one of these arguments when using tag_source_data
        as True. When tag_source_data is False, this is not mandatory, but still
        recommended.
    - tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from.
        Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to
        be provided. It is mandatory to provide one of these arguments when using
        tag_source_data as True. hen tag_source_data is False, this is not
        mandatory, but still recommended.
    - gx_result_format - great expectations result format. Default: "COMPLETE".
    - tag_source_data - when set to true, this will ensure that the DQ process ends by
        tagging the source data with an additional column with information about the
        DQ results. This column makes it possible to identify if the DQ run was
        succeeded in general and, if not, it unlocks the insights to know what
        specific rows have made the DQ validations fail and why. Default: False.
        Note: it only works if result_sink_explode is True, gx_result_format is
        COMPLETE, fail_on_error is False (which is done automatically when
        you specify tag_source_data as True) and tbl_to_derive_pk or
        unexpected_rows_pk is configured.
    - store_backend - which store_backend to use (e.g. s3 or file_system).
    - local_fs_root_dir - path of the root directory. Note: only applicable for
        store_backend file_system.
    - bucket - the bucket name to consider for the store_backend (store DQ artefacts).
        Note: only applicable for store_backend s3.
    - expectations_store_prefix - prefix where to store expectations' data. Note: only
        applicable for store_backend s3.
    - validations_store_prefix - prefix where to store validations' data. Note: only
        applicable for store_backend s3.
    - checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only
        applicable for store_backend s3.
    - data_asset_name - name of the data asset to consider when configuring the great
        expectations' data source.
    - expectation_suite_name - name to consider for great expectations' suite.
    - result_sink_db_table - db.table_name indicating the database and table in which
        to save the results of the DQ process.
    - result_sink_location - file system location in which to save the results of the
        DQ process.
    - result_sink_chunk_size - number of records per chunk when writing the results of
        the DQ process. Default: 1000000 records.
    - processed_keys_location - file system location where the keys processed by the
        DQ Process are saved. This is specifically used when the DQ Type is PRISMA.
        Note that this location is always constructed during the process, so any
        value defined in the configuration will be overwritten.
    - data_product_name - name of the data product.
    - result_sink_partitions - the list of partitions to consider.
    - result_sink_format - format of the result table (e.g. delta, parquet, kafka...).
    - result_sink_options - extra spark options for configuring the result sink.
        E.g: can be used to configure a Kafka sink if result_sink_format is kafka.
    - result_sink_explode - flag to determine if the output table/location should have
        the columns exploded (as True) or not (as False). Default: True.
    - result_sink_extra_columns - list of extra columns to be exploded (following
        the pattern "<name>.*") or columns to be selected. It is only used when
        result_sink_explode is set to True.
    - source - name of data source, to be easier to identify in analysis. If not
        specified, it is set as default <input_id>. This will be only used
        when result_sink_explode is set to True.
    - fail_on_error - whether to fail the algorithm if the validations of your data in
        the DQ process failed.
    - cache_df - whether to cache the dataframe before running the DQ process or not.
    - critical_functions - functions that should not fail. When this argument is
        defined, fail_on_error is nullified.
    - max_percentage_failure - percentage of failure that should be allowed.
        This argument has priority over both fail_on_error and critical_functions.
    - enable_row_condition - flag to determine if the row_conditions should be
    enabled or not. row_conditions allow you to filter the rows that are
    processed by the DQ functions. This is useful when you want to run the
    DQ functions only on a subset of the data. Default: False. Note: When using PRISMA,
    if you enable this flag, bear in mind that the number of processed keys will be
    numerically different from the evaluated keys. This happens because the
    row_conditions limit the number of rows that are processed by the DQ functions,
    but the we consider processed keys as all the keys that are passed to the dq_spec.
    """

    spec_id: str
    input_id: str
    dq_type: str
    dq_functions: Optional[List[DQFunctionSpec]] = None
    dq_db_table: Optional[str] = None
    dq_table_table_filter: Optional[str] = None
    dq_table_extra_filters: Optional[str] = None
    execution_point: Optional[str] = None
    unexpected_rows_pk: Optional[List[str]] = None
    tbl_to_derive_pk: Optional[str] = None
    gx_result_format: Optional[str] = DQResultFormat.COMPLETE.value
    tag_source_data: Optional[bool] = False
    store_backend: str = DQDefaults.STORE_BACKEND.value
    local_fs_root_dir: Optional[str] = None
    bucket: Optional[str] = None
    expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value
    validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value
    checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value
    data_asset_name: Optional[str] = None
    expectation_suite_name: Optional[str] = None
    result_sink_db_table: Optional[str] = None
    result_sink_location: Optional[str] = None
    result_sink_chunk_size: Optional[int] = 1000000
    processed_keys_location: Optional[str] = None
    data_product_name: Optional[str] = None
    result_sink_partitions: Optional[List[str]] = None
    result_sink_format: str = OutputFormat.DELTAFILES.value
    result_sink_options: Optional[dict] = None
    result_sink_explode: bool = True
    result_sink_extra_columns: Optional[List[str]] = None
    source: Optional[str] = None
    fail_on_error: bool = True
    cache_df: bool = False
    critical_functions: Optional[List[DQFunctionSpec]] = None
    max_percentage_failure: Optional[float] = None
    enable_row_condition: bool = False

DQTableBaseParameters

Bases: Enum

Base parameters for importing DQ rules from a table.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class DQTableBaseParameters(Enum):
    """Base parameters for importing DQ rules from a table."""

    PRISMA_BASE_PARAMETERS = ["arguments", "dq_tech_function"]

DQType

Bases: Enum

Available data quality tasks.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class DQType(Enum):
    """Available data quality tasks."""

    VALIDATOR = "validator"
    PRISMA = "prisma"

DQValidatorSpec dataclass

Bases: object

Data Quality Validator Specification.

  • input_spec: input specification of the data to be checked/validated.
  • dq_spec: data quality specification.
  • restore_prev_version: specify if, having delta table/files as input, they should be restored to the previous version if the data quality process fails. Note: this is only considered if fail_on_error is kept as True.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class DQValidatorSpec(object):
    """Data Quality Validator Specification.

    - input_spec: input specification of the data to be checked/validated.
    - dq_spec: data quality specification.
    - restore_prev_version: specify if, having
        delta table/files as input, they should be restored to the
        previous version if the data quality process fails. Note: this
        is only considered if fail_on_error is kept as True.
    """

    input_spec: InputSpec
    dq_spec: DQSpec
    restore_prev_version: Optional[bool] = False

EngineConfig dataclass

Bases: object

Definitions that can come from the Engine Config file.

  • dq_bucket: S3 prod bucket used to store data quality related artifacts.
  • dq_dev_bucket: S3 dev bucket used to store data quality related artifacts.
  • notif_disallowed_email_servers: email servers not allowed to be used for sending notifications.
  • engine_usage_path: path where the engine prod usage stats are stored.
  • engine_dev_usage_path: path where the engine dev usage stats are stored.
  • collect_engine_usage: whether to enable the collection of lakehouse engine usage stats or not.
  • dq_functions_column_list: list of columns to be added to the meta argument of GX when using PRISMA.
  • raise_on_config_not_available: whether to raise an exception if a spark config is not available.
  • prod_catalog: name of the prod catalog being used. This is useful to derive whether the environment is prod or dev, so the dev or prod buckets/paths can be used for storing engine usage stats and dq artifacts.
  • environment: environment that the engine is being executed on. Takes precedence over prod_catalog when defining if the environment is prod or dev.
  • sharepoint_authority: authority for the Sharepoint api.
  • sharepoint_company_domain: company domain for the Sharepoint api.
  • sharepoint_api_domain: api domain for the Sharepoint api.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class EngineConfig(object):
    """Definitions that can come from the Engine Config file.

    - dq_bucket: S3 prod bucket used to store data quality related artifacts.
    - dq_dev_bucket: S3 dev bucket used to store data quality related artifacts.
    - notif_disallowed_email_servers: email servers not allowed to be used
        for sending notifications.
    - engine_usage_path: path where the engine prod usage stats are stored.
    - engine_dev_usage_path: path where the engine dev usage stats are stored.
    - collect_engine_usage: whether to enable the collection of lakehouse
        engine usage stats or not.
    - dq_functions_column_list: list of columns to be added to the meta argument
        of GX when using PRISMA.
    - raise_on_config_not_available: whether to raise an exception if a spark config
        is not available.
    - prod_catalog: name of the prod catalog being used. This is useful to derive
        whether the environment is prod or dev, so the dev or prod buckets/paths can be
        used for storing engine usage stats and dq artifacts.
    - environment: environment that the engine is being executed on. Takes precedence
        over prod_catalog when defining if the environment is prod or dev.
    - sharepoint_authority: authority for the Sharepoint api.
    - sharepoint_company_domain: company domain for the Sharepoint api.
    - sharepoint_api_domain: api domain for the Sharepoint api.
    """

    dq_bucket: Optional[str] = None
    dq_dev_bucket: Optional[str] = None
    notif_disallowed_email_servers: Optional[list] = None
    engine_usage_path: Optional[str] = None
    engine_dev_usage_path: Optional[str] = None
    collect_engine_usage: str = CollectEngineUsage.ENABLED.value
    dq_functions_column_list: Optional[list] = None
    dq_result_sink_columns_to_delete: Optional[list] = None
    sharepoint_authority: Optional[str] = None
    sharepoint_company_domain: Optional[str] = None
    sharepoint_api_domain: Optional[str] = None
    raise_on_config_not_available: bool = False
    prod_catalog: Optional[str] = None
    environment: Optional[str] = None

EngineStats

Bases: object

Definitions for collection of Lakehouse Engine Stats.

Note

whenever the value comes from a key inside a Spark Config that returns an array, it can be specified with a '#' so that it is adequately processed.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class EngineStats(object):
    """Definitions for collection of Lakehouse Engine Stats.

    !!! note
        whenever the value comes from a key inside a Spark Config
        that returns an array, it can be specified with a '#' so that it
        is adequately processed.
    """

    CLUSTER_USAGE_TAGS = "spark.databricks.clusterUsageTags"
    DEF_SPARK_CONFS = {
        "dp_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#accountName",
        "environment": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#environment",
        "workspace_id": f"{CLUSTER_USAGE_TAGS}.orgId",
        "job_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#JobId",
        "job_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#RunName",
        "run_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#ClusterName",
    }
    DEF_DATABRICKS_CONTEXT_KEYS = {
        "environment": "environment",
        "dp_name": "jobName",
        "run_id": "runId",
        "job_id": "jobId",
        "job_name": "jobName",
        "workspace_id": "workspaceId",
        "policy_id": "usagePolicyId",
    }

FileManagerAPIKeys

Bases: Enum

File Manager s3 api keys.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class FileManagerAPIKeys(Enum):
    """File Manager s3 api keys."""

    CONTENTS = "Contents"
    KEY = "Key"
    CONTINUATION = "NextContinuationToken"
    BUCKET = "Bucket"
    OBJECTS = "Objects"

GABCadence

Bases: Enum

Representation of the supported cadences on GAB.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class GABCadence(Enum):
    """Representation of the supported cadences on GAB."""

    DAY = 1
    WEEK = 2
    MONTH = 3
    QUARTER = 4
    YEAR = 5

    @classmethod
    def get_ordered_cadences(cls) -> dict:
        """Get the cadences ordered by the value.

        Returns:
            dict containing ordered cadences as `{name:value}`.
        """
        return {
            cadence.name: cadence.value
            for cadence in sorted(GABCadence, key=lambda gab_cadence: gab_cadence.value)
        }

    @classmethod
    def get_cadences(cls) -> set[str]:
        """Get the cadences values as set.

        Returns:
            set containing all possible cadence values as `{value}`.
        """
        return {cadence.name for cadence in GABCadence}

    @classmethod
    def order_cadences(cls, cadences_to_order: list[str]) -> list[str]:
        """Order a list of cadences by value.

        Returns:
            ordered set containing the received cadences.
        """
        return sorted(
            cadences_to_order,
            key=lambda item: cls.get_ordered_cadences().get(item),  # type: ignore
        )

get_cadences() classmethod

Get the cadences values as set.

Returns:

Type Description
set[str]

set containing all possible cadence values as {value}.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def get_cadences(cls) -> set[str]:
    """Get the cadences values as set.

    Returns:
        set containing all possible cadence values as `{value}`.
    """
    return {cadence.name for cadence in GABCadence}

get_ordered_cadences() classmethod

Get the cadences ordered by the value.

Returns:

Type Description
dict

dict containing ordered cadences as {name:value}.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def get_ordered_cadences(cls) -> dict:
    """Get the cadences ordered by the value.

    Returns:
        dict containing ordered cadences as `{name:value}`.
    """
    return {
        cadence.name: cadence.value
        for cadence in sorted(GABCadence, key=lambda gab_cadence: gab_cadence.value)
    }

order_cadences(cadences_to_order) classmethod

Order a list of cadences by value.

Returns:

Type Description
list[str]

ordered set containing the received cadences.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def order_cadences(cls, cadences_to_order: list[str]) -> list[str]:
    """Order a list of cadences by value.

    Returns:
        ordered set containing the received cadences.
    """
    return sorted(
        cadences_to_order,
        key=lambda item: cls.get_ordered_cadences().get(item),  # type: ignore
    )

GABCombinedConfiguration

Bases: Enum

GAB combined configuration.

Based on the use case configuration return the values to override in the SQL file. This enum aims to exhaustively map each combination of cadence, reconciliation, week_start and snap_flag return the corresponding values join_select, project_start and project_end to replace this values in the stages SQL file.

Return corresponding configuration (join_select, project_start, project_end) for each combination (cadence x recon x week_start x snap_flag).

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
class GABCombinedConfiguration(Enum):
    """GAB combined configuration.

    Based on the use case configuration return the values to override in the SQL file.
    This enum aims to exhaustively map each combination of `cadence`, `reconciliation`,
        `week_start` and `snap_flag` return the corresponding values `join_select`,
        `project_start` and `project_end` to replace this values in the stages SQL file.

    Return corresponding configuration (join_select, project_start, project_end) for
        each combination (cadence x recon x week_start x snap_flag).
    """

    _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE = (
        "date(date_trunc('${cad}',${date_column}))"
    )
    _DEFAULT_PROJECT_START = "df_cal.cadence_start_date"
    _DEFAULT_PROJECT_END = "df_cal.cadence_end_date"

    COMBINED_CONFIGURATION = {
        # Combination of:
        # - cadence: `DAY`
        # - reconciliation_window: `DAY`, `WEEK`, `MONTH`, `QUARTER`, `YEAR`
        # - week_start: `S`, `M`
        # - snapshot_flag: `Y`, `N`
        1: {
            "cadence": GABCadence.DAY.name,
            "recon": GABCadence.get_cadences(),
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": {"Y", "N"},
            "join_select": "",
            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
            "project_end": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
        },
        # Combination of:
        # - cadence: `WEEK`
        # - reconciliation_window: `DAY`
        # - week_start: `S`, `M`
        # - snapshot_flag: `Y`
        2: {
            "cadence": GABCadence.WEEK.name,
            "recon": GABCadence.DAY.name,
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "Y",
            "join_select": """
            select distinct case
                when '${config_week_start}' = 'Monday' then weekstart_mon
                when '${config_week_start}' = 'Sunday' then weekstart_sun
            end as cadence_start_date,
            calendar_date as cadence_end_date
        """,
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        # Combination of:
        # - cadence: `WEEK`
        # - reconciliation_window: `DAY, `MONTH`, `QUARTER`, `YEAR`
        # - week_start: `M`
        # - snapshot_flag: `Y`, `N`
        3: {
            "cadence": GABCadence.WEEK.name,
            "recon": {
                GABCadence.DAY.name,
                GABCadence.MONTH.name,
                GABCadence.QUARTER.name,
                GABCadence.YEAR.name,
            },
            "week_start": "M",
            "snap_flag": {"Y", "N"},
            "join_select": """
            select distinct case
                when '${config_week_start}'  = 'Monday' then weekstart_mon
                when '${config_week_start}' = 'Sunday' then weekstart_sun
            end as cadence_start_date,
            case
                when '${config_week_start}' = 'Monday' then weekend_mon
                when '${config_week_start}' = 'Sunday' then weekend_sun
            end as cadence_end_date""",
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        4: {
            "cadence": GABCadence.MONTH.name,
            "recon": GABCadence.DAY.name,
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "Y",
            "join_select": """
            select distinct month_start as cadence_start_date,
            calendar_date as cadence_end_date
        """,
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        5: {
            "cadence": GABCadence.MONTH.name,
            "recon": GABCadence.WEEK.name,
            "week_start": GABStartOfWeek.MONDAY.value,
            "snap_flag": "Y",
            "join_select": """
            select distinct month_start as cadence_start_date,
            case
                when date(
                    date_trunc('MONTH',add_months(calendar_date, 1))
                )-1 < weekend_mon
                    then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1
                else weekend_mon
            end as cadence_end_date""",
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        6: {
            "cadence": GABCadence.MONTH.name,
            "recon": GABCadence.WEEK.name,
            "week_start": GABStartOfWeek.SUNDAY.value,
            "snap_flag": "Y",
            "join_select": """
            select distinct month_start as cadence_start_date,
            case
                when date(
                    date_trunc('MONTH',add_months(calendar_date, 1))
                )-1 < weekend_sun
                    then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1
                else weekend_sun
            end as cadence_end_date""",
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        7: {
            "cadence": GABCadence.MONTH.name,
            "recon": GABCadence.get_cadences(),
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": {"Y", "N"},
            "join_select": "",
            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
            "project_end": "date(date_trunc('MONTH',add_months(${date_column}, 1)))-1",
        },
        8: {
            "cadence": GABCadence.QUARTER.name,
            "recon": GABCadence.DAY.name,
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "Y",
            "join_select": """
            select distinct quarter_start as cadence_start_date,
            calendar_date as cadence_end_date
        """,
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        9: {
            "cadence": GABCadence.QUARTER.name,
            "recon": GABCadence.WEEK.name,
            "week_start": GABStartOfWeek.MONDAY.value,
            "snap_flag": "Y",
            "join_select": """
            select distinct quarter_start as cadence_start_date,
            case
                when weekend_mon > date(
                    date_trunc('QUARTER',add_months(calendar_date, 3))
                )-1
                    then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1
                else weekend_mon
            end as cadence_end_date""",
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        10: {
            "cadence": GABCadence.QUARTER.name,
            "recon": GABCadence.WEEK.name,
            "week_start": GABStartOfWeek.SUNDAY.value,
            "snap_flag": "Y",
            "join_select": """
            select distinct quarter_start as cadence_start_date,
            case
                when weekend_sun > date(
                    date_trunc('QUARTER',add_months(calendar_date, 3))
                )-1
                    then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1
                else weekend_sun
            end as cadence_end_date""",
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        11: {
            "cadence": GABCadence.QUARTER.name,
            "recon": GABCadence.MONTH.name,
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "Y",
            "join_select": """
            select distinct quarter_start as cadence_start_date,
            month_end as cadence_end_date
        """,
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        12: {
            "cadence": GABCadence.QUARTER.name,
            "recon": GABCadence.YEAR.name,
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "N",
            "join_select": "",
            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
            "project_end": """
            date(
                date_trunc(
                    '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 3)
                )
            )-1
        """,
        },
        13: {
            "cadence": GABCadence.QUARTER.name,
            "recon": GABCadence.get_cadences(),
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "N",
            "join_select": "",
            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
            "project_end": """
            date(
                date_trunc(
                    '${cad}',add_months( date(date_trunc('${cad}',${date_column})), 3)
                )
            )-1
        """,
        },
        14: {
            "cadence": GABCadence.YEAR.name,
            "recon": GABCadence.WEEK.name,
            "week_start": GABStartOfWeek.MONDAY.value,
            "snap_flag": "Y",
            "join_select": """
            select distinct year_start as cadence_start_date,
            case
                when weekend_mon > date(
                    date_trunc('YEAR',add_months(calendar_date, 12))
                )-1
                    then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1
                else weekend_mon
            end as cadence_end_date""",
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        15: {
            "cadence": GABCadence.YEAR.name,
            "recon": GABCadence.WEEK.name,
            "week_start": GABStartOfWeek.SUNDAY.value,
            "snap_flag": "Y",
            "join_select": """
            select distinct year_start as cadence_start_date,
            case
                when weekend_sun > date(
                    date_trunc('YEAR',add_months(calendar_date, 12))
                )-1
                    then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1
                else weekend_sun
            end as cadence_end_date""",
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        16: {
            "cadence": GABCadence.YEAR.name,
            "recon": GABCadence.get_cadences(),
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "N",
            "inverse_flag": "Y",
            "join_select": "",
            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
            "project_end": """
            date(
                date_trunc(
                    '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 12)
                )
            )-1
        """,
        },
        17: {
            "cadence": GABCadence.YEAR.name,
            "recon": {
                GABCadence.DAY.name,
                GABCadence.MONTH.name,
                GABCadence.QUARTER.name,
            },
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": "Y",
            "join_select": """
            select distinct year_start as cadence_start_date,
            case
                when '${rec_cadence}' = 'DAY' then calendar_date
                when '${rec_cadence}' = 'MONTH' then month_end
                when '${rec_cadence}' = 'QUARTER' then quarter_end
            end as cadence_end_date
        """,
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
        18: {
            "cadence": GABCadence.get_cadences(),
            "recon": GABCadence.get_cadences(),
            "week_start": GABStartOfWeek.get_values(),
            "snap_flag": {"Y", "N"},
            "join_select": """
            select distinct
            case
                when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'
                    then weekstart_mon
                when  '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'
                    then weekstart_sun
                else
                    date(date_trunc('${cad}',calendar_date))
            end as cadence_start_date,
            case
                when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'
                    then weekend_mon
                when  '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'
                    then weekend_sun
                when '${cad}' = 'DAY'
                    then date(date_trunc('${cad}',calendar_date))
                when '${cad}' = 'MONTH'
                    then date(
                        date_trunc(
                            'MONTH',
                            add_months(date(date_trunc('${cad}',calendar_date)), 1)
                        )
                    )-1
                when '${cad}' = 'QUARTER'
                    then date(
                        date_trunc(
                            'QUARTER',
                            add_months(date(date_trunc('${cad}',calendar_date)) , 3)
                        )
                    )-1
                when '${cad}' = 'YEAR'
                    then date(
                        date_trunc(
                            'YEAR',
                            add_months(date(date_trunc('${cad}',calendar_date)), 12)
                        )
                    )-1
            end as cadence_end_date
        """,
            "project_start": _DEFAULT_PROJECT_START,
            "project_end": _DEFAULT_PROJECT_END,
        },
    }

GABDefaults

Bases: Enum

Defaults used on the GAB process.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class GABDefaults(Enum):
    """Defaults used on the GAB process."""

    DATE_FORMAT = "%Y-%m-%d"
    DIMENSIONS_DEFAULT_COLUMNS = ["from_date", "to_date"]
    DEFAULT_DIMENSION_CALENDAR_TABLE = "dim_calendar"
    DEFAULT_LOOKUP_QUERY_BUILDER_TABLE = "lkp_query_builder"

GABKeys

Constants used to update pre-configured gab dict key.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class GABKeys:
    """Constants used to update pre-configured gab dict key."""

    JOIN_SELECT = "join_select"
    PROJECT_START = "project_start"
    PROJECT_END = "project_end"

GABReplaceableKeys

Constants used to replace pre-configured gab dict values.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class GABReplaceableKeys:
    """Constants used to replace pre-configured gab dict values."""

    CADENCE = "${cad}"
    DATE_COLUMN = "${date_column}"
    CONFIG_WEEK_START = "${config_week_start}"
    RECONCILIATION_CADENCE = "${rec_cadence}"

GABSpec dataclass

Bases: object

Gab Specification.

  • query_label_filter: query use-case label to execute.
  • queue_filter: queue to execute the job.
  • cadence_filter: selected cadences to build the asset.
  • target_database: target database to write.
  • curr_date: current date.
  • start_date: period start date.
  • end_date: period end date.
  • rerun_flag: rerun flag.
  • target_table: target table to write.
  • source_database: source database.
  • gab_base_path: base path to read the use cases.
  • lookup_table: gab configuration table.
  • calendar_table: gab calendar table.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class GABSpec(object):
    """Gab Specification.

    - query_label_filter: query use-case label to execute.
    - queue_filter: queue to execute the job.
    - cadence_filter: selected cadences to build the asset.
    - target_database: target database to write.
    - curr_date: current date.
    - start_date: period start date.
    - end_date: period end date.
    - rerun_flag: rerun flag.
    - target_table: target table to write.
    - source_database: source database.
    - gab_base_path: base path to read the use cases.
    - lookup_table: gab configuration table.
    - calendar_table: gab calendar table.
    """

    query_label_filter: list[str]
    queue_filter: list[str]
    cadence_filter: list[str]
    target_database: str
    current_date: datetime
    start_date: datetime
    end_date: datetime
    rerun_flag: str
    target_table: str
    source_database: str
    gab_base_path: str
    lookup_table: str
    calendar_table: str

    @classmethod
    def create_from_acon(cls, acon: dict):  # type: ignore
        """Create GabSpec from acon.

        Args:
            acon: gab ACON.
        """
        lookup_table = f"{acon['source_database']}." + (
            acon.get(
                "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value
            )
        )

        calendar_table = f"{acon['source_database']}." + (
            acon.get(
                "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value
            )
        )

        def format_date(date_to_format: datetime | str) -> datetime:
            if isinstance(date_to_format, str):
                return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value)
            else:
                return date_to_format

        return cls(
            query_label_filter=acon["query_label_filter"],
            queue_filter=acon["queue_filter"],
            cadence_filter=acon["cadence_filter"],
            target_database=acon["target_database"],
            current_date=datetime.now(),
            start_date=format_date(acon["start_date"]),
            end_date=format_date(acon["end_date"]),
            rerun_flag=acon["rerun_flag"],
            target_table=acon["target_table"],
            source_database=acon["source_database"],
            gab_base_path=acon["gab_base_path"],
            lookup_table=lookup_table,
            calendar_table=calendar_table,
        )

create_from_acon(acon) classmethod

Create GabSpec from acon.

Parameters:

Name Type Description Default
acon dict

gab ACON.

required
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def create_from_acon(cls, acon: dict):  # type: ignore
    """Create GabSpec from acon.

    Args:
        acon: gab ACON.
    """
    lookup_table = f"{acon['source_database']}." + (
        acon.get(
            "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value
        )
    )

    calendar_table = f"{acon['source_database']}." + (
        acon.get(
            "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value
        )
    )

    def format_date(date_to_format: datetime | str) -> datetime:
        if isinstance(date_to_format, str):
            return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value)
        else:
            return date_to_format

    return cls(
        query_label_filter=acon["query_label_filter"],
        queue_filter=acon["queue_filter"],
        cadence_filter=acon["cadence_filter"],
        target_database=acon["target_database"],
        current_date=datetime.now(),
        start_date=format_date(acon["start_date"]),
        end_date=format_date(acon["end_date"]),
        rerun_flag=acon["rerun_flag"],
        target_table=acon["target_table"],
        source_database=acon["source_database"],
        gab_base_path=acon["gab_base_path"],
        lookup_table=lookup_table,
        calendar_table=calendar_table,
    )

GABStartOfWeek

Bases: Enum

Representation of start of week values on GAB.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class GABStartOfWeek(Enum):
    """Representation of start of week values on GAB."""

    SUNDAY = "S"
    MONDAY = "M"

    @classmethod
    def get_start_of_week(cls) -> dict:
        """Get the start of week enum as a dict.

        Returns:
            dict containing all enum entries as `{name:value}`.
        """
        return {
            start_of_week.name: start_of_week.value for start_of_week in GABStartOfWeek
        }

    @classmethod
    def get_values(cls) -> set[str]:
        """Get the start of week enum values as set.

        Returns:
            set containing all possible values `{value}`.
        """
        return {start_of_week.value for start_of_week in GABStartOfWeek}

get_start_of_week() classmethod

Get the start of week enum as a dict.

Returns:

Type Description
dict

dict containing all enum entries as {name:value}.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def get_start_of_week(cls) -> dict:
    """Get the start of week enum as a dict.

    Returns:
        dict containing all enum entries as `{name:value}`.
    """
    return {
        start_of_week.name: start_of_week.value for start_of_week in GABStartOfWeek
    }

get_values() classmethod

Get the start of week enum values as set.

Returns:

Type Description
set[str]

set containing all possible values {value}.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def get_values(cls) -> set[str]:
    """Get the start of week enum values as set.

    Returns:
        set containing all possible values `{value}`.
    """
    return {start_of_week.value for start_of_week in GABStartOfWeek}

HeartbeatConfigSpec dataclass

Bases: object

Heartbeat Configurations and control table specifications.

This provides the way in which the Heartbeat can pass environment and specific quantum related config information to sensor acon.

  • sensor_source: specifies the source system of sensor, for e.g. sap_b4, sap_bw, delta_table, kafka, lmu_delta_table, trigger_file etc. It is also a part of heartbeat control table, Therefore it is useful for filtering out data from Heartbeat control table based on template source system.
  • data_format: format of the input source, e.g jdbc, delta, kafka, cloudfiles etc.
  • heartbeat_sensor_db_table: heartbeat control table along with database from config.
  • lakehouse_engine_sensor_db_table: Control table along with database(config).
  • options: dict with other relevant options for reading data from specified input data_format. This can vary for each source system. For e.g. For sap systems, DRIVER, URL, USERNAME, PASSWORD are required which are all being read from config file of quantum.
  • jdbc_db_table: schema and table name of JDBC sources.
  • token: token to access Databricks Job API(read from config).
  • domain: workspace domain url for quantum(read from config).
  • base_checkpoint_location: checkpoint location for streaming sources(from config).
  • kafka_configs: configs required for kafka. It is (read from config) as JSON. config hierarchy is [sensor_kafka --> --> main kafka options].
  • kafka_secret_scope: secret scope for kafka (read from config).
  • base_trigger_file_location: location where all the trigger files are being created (read from config).
  • schema_dict: dict representation of schema of the trigger file (e.g. Spark struct type schema).
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class HeartbeatConfigSpec(object):
    """Heartbeat Configurations and control table specifications.

    This provides the way in which the Heartbeat can pass environment and
    specific quantum related config information to sensor acon.

    - sensor_source: specifies the source system of sensor, for e.g.
        sap_b4, sap_bw, delta_table, kafka, lmu_delta_table, trigger_file etc.
        It is also a part of heartbeat control table, Therefore it is useful for
        filtering out data from Heartbeat control table based on template source system.
    - data_format: format of the input source, e.g jdbc, delta, kafka, cloudfiles etc.
    - heartbeat_sensor_db_table: heartbeat control table along
        with database from config.
    - lakehouse_engine_sensor_db_table: Control table along with database(config).
    - options: dict with other relevant options for reading data from specified input
        data_format. This can vary for each source system.
        For e.g. For sap systems, DRIVER, URL, USERNAME, PASSWORD are required which are
        all being read from config file of quantum.
    - jdbc_db_table: schema and table name of JDBC sources.
    - token: token to access Databricks Job API(read from config).
    - domain: workspace domain url for quantum(read from config).
    - base_checkpoint_location: checkpoint location for streaming sources(from config).
    - kafka_configs: configs required for kafka. It is (read from config) as JSON.
        config hierarchy is [sensor_kafka --> <dp_name/prefix> --> main kafka options].
    - kafka_secret_scope: secret scope for kafka (read from config).
    - base_trigger_file_location: location where all the trigger files are being
        created (read from config).
    - schema_dict: dict representation of schema of the trigger file (e.g. Spark struct
        type schema).
    """

    sensor_source: str
    data_format: str
    heartbeat_sensor_db_table: str
    lakehouse_engine_sensor_db_table: str
    token: str
    domain: str
    options: Optional[dict]
    jdbc_db_table: Optional[str]
    base_checkpoint_location: Optional[str]
    kafka_configs: Optional[dict]
    kafka_secret_scope: Optional[str]
    base_trigger_file_location: Optional[str]
    schema_dict: Optional[dict]

    @classmethod
    def create_from_acon(cls, acon: dict):  # type: ignore
        """Create HeartbeatConfigSpec from acon.

        Args:
            acon: Heartbeat ACON.
        """
        return cls(
            sensor_source=acon["sensor_source"],
            data_format=acon["data_format"],
            heartbeat_sensor_db_table=acon["heartbeat_sensor_db_table"],
            lakehouse_engine_sensor_db_table=acon["lakehouse_engine_sensor_db_table"],
            token=acon["token"],
            domain=acon["domain"],
            options=acon.get("options"),
            jdbc_db_table=acon.get("jdbc_db_table"),
            base_checkpoint_location=acon.get("base_checkpoint_location"),
            kafka_configs=acon.get("kafka_configs"),
            kafka_secret_scope=acon.get("kafka_secret_scope"),
            base_trigger_file_location=acon.get("base_trigger_file_location"),
            schema_dict=acon.get("schema_dict"),
        )

create_from_acon(acon) classmethod

Create HeartbeatConfigSpec from acon.

Parameters:

Name Type Description Default
acon dict

Heartbeat ACON.

required
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def create_from_acon(cls, acon: dict):  # type: ignore
    """Create HeartbeatConfigSpec from acon.

    Args:
        acon: Heartbeat ACON.
    """
    return cls(
        sensor_source=acon["sensor_source"],
        data_format=acon["data_format"],
        heartbeat_sensor_db_table=acon["heartbeat_sensor_db_table"],
        lakehouse_engine_sensor_db_table=acon["lakehouse_engine_sensor_db_table"],
        token=acon["token"],
        domain=acon["domain"],
        options=acon.get("options"),
        jdbc_db_table=acon.get("jdbc_db_table"),
        base_checkpoint_location=acon.get("base_checkpoint_location"),
        kafka_configs=acon.get("kafka_configs"),
        kafka_secret_scope=acon.get("kafka_secret_scope"),
        base_trigger_file_location=acon.get("base_trigger_file_location"),
        schema_dict=acon.get("schema_dict"),
    )

HeartbeatSensorSource

Bases: Enum

Formats of algorithm input.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class HeartbeatSensorSource(Enum):
    """Formats of algorithm input."""

    SAP_BW = "sap_bw"
    SAP_B4 = "sap_b4"
    DELTA_TABLE = "delta_table"
    KAFKA = "kafka"
    LMU_DELTA_TABLE = "lmu_delta_table"
    TRIGGER_FILE = "trigger_file"

    @classmethod
    def values(cls):  # type: ignore
        """Generates a list containing all enum values.

        Returns:
            A list with all enum values.
        """
        return (c.value for c in cls)

values() classmethod

Generates a list containing all enum values.

Returns:

Type Description

A list with all enum values.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def values(cls):  # type: ignore
    """Generates a list containing all enum values.

    Returns:
        A list with all enum values.
    """
    return (c.value for c in cls)

HeartbeatStatus

Bases: Enum

Status for a sensor.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class HeartbeatStatus(Enum):
    """Status for a sensor."""

    NEW_EVENT_AVAILABLE = "NEW_EVENT_AVAILABLE"
    IN_PROGRESS = "IN_PROGRESS"
    COMPLETED = "COMPLETED"

InputFormat

Bases: Enum

Formats of algorithm input.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class InputFormat(Enum):
    """Formats of algorithm input."""

    JDBC = "jdbc"
    AVRO = "avro"
    JSON = "json"
    CSV = "csv"
    PARQUET = "parquet"
    DELTAFILES = "delta"
    CLOUDFILES = "cloudfiles"
    KAFKA = "kafka"
    SQL = "sql"
    SAP_BW = "sap_bw"
    SAP_B4 = "sap_b4"
    DATAFRAME = "dataframe"
    SFTP = "sftp"
    SHAREPOINT = "sharepoint"

    @classmethod
    def values(cls):  # type: ignore
        """Generates a list containing all enum values.

        Returns:
            A list with all enum values.
        """
        return (c.value for c in cls)

    @classmethod
    def exists(cls, input_format: str) -> bool:
        """Checks if the input format exists in the enum values.

        Args:
            input_format: format to check if exists.

        Returns:
            If the input format exists in our enum.
        """
        return input_format in cls.values()

exists(input_format) classmethod

Checks if the input format exists in the enum values.

Parameters:

Name Type Description Default
input_format str

format to check if exists.

required

Returns:

Type Description
bool

If the input format exists in our enum.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def exists(cls, input_format: str) -> bool:
    """Checks if the input format exists in the enum values.

    Args:
        input_format: format to check if exists.

    Returns:
        If the input format exists in our enum.
    """
    return input_format in cls.values()

values() classmethod

Generates a list containing all enum values.

Returns:

Type Description

A list with all enum values.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def values(cls):  # type: ignore
    """Generates a list containing all enum values.

    Returns:
        A list with all enum values.
    """
    return (c.value for c in cls)

InputSpec dataclass

Bases: object

Specification of an algorithm input.

This is very aligned with the way the execution environment connects to the sources (e.g., spark sources).

  • spec_id: spec_id of the input specification
  • read_type: ReadType type of read operation.
  • data_format: format of the input.
  • sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp directory.
  • df_name: dataframe name.
  • db_table: table name in the form of <db>.<table>.
  • location: uri that identifies from where to read data in the specified format.
  • sharepoint_opts: Options to apply when reading from Sharepoint.
  • enforce_schema_from_table: if we want to enforce the table schema or not, by providing a table name in the form of <db>.<table>.
  • query: sql query to execute and return the dataframe. Use it if you do not want to read from a file system nor from a table, but rather from a sql query.
  • schema: dict representation of a schema of the input (e.g., Spark struct type schema).
  • schema_path: path to a file with a representation of a schema of the input (e.g., Spark struct type schema).
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
  • with_filepath: if we want to include the path of the file that is being read. Only works with the file reader (batch and streaming modes are supported).
  • options: dict with other relevant options according to the execution environment (e.g., spark) possible sources.
  • calculate_upper_bound: when to calculate upper bound to extract from SAP BW or not.
  • calc_upper_bound_schema: specific schema for the calculated upper_bound.
  • generate_predicates: when to generate predicates to extract from SAP BW or not.
  • predicates_add_null: if we want to include is null on partition by predicates.
  • temp_view: optional name of a view to point to the input dataframe to be used to create or replace a temp view on top of the dataframe.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class InputSpec(object):
    """Specification of an algorithm input.

    This is very aligned with the way the execution environment connects to the sources
    (e.g., spark sources).

    - spec_id: spec_id of the input specification
    - read_type: ReadType type of read
        operation.
    - data_format: format of the input.
    - sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp
        directory.
    - df_name: dataframe name.
    - db_table: table name in the form of `<db>.<table>`.
    - location: uri that identifies from where to read data in the
      specified format.
    - sharepoint_opts: Options to apply when reading from Sharepoint.
    - enforce_schema_from_table: if we want to enforce the table schema or not,
    by providing a table name in the form of `<db>.<table>`.
    - query: sql query to execute and return the dataframe. Use it if you do not want
        to read from a file system nor from a table, but rather from a sql query.
    - schema: dict representation of a schema of the input (e.g., Spark struct
    type schema).
    - schema_path: path to a file with a representation of a schema of the
    input (e.g., Spark struct type schema).
    - disable_dbfs_retry: optional flag to disable file storage dbfs.
    - with_filepath: if we want to include the path of the file that is being
    read. Only
        works with the file reader (batch and streaming modes are supported).
    - options: dict with other relevant options according to the execution
        environment (e.g., spark) possible sources.
    - calculate_upper_bound: when to calculate upper bound to extract from SAP BW
        or not.
    - calc_upper_bound_schema: specific schema for the calculated upper_bound.
    - generate_predicates: when to generate predicates to extract from SAP BW or not.
    - predicates_add_null: if we want to include is null on partition by predicates.
    - temp_view: optional name of a view to point to the input dataframe to be used
        to create or replace a temp view on top of the dataframe.
    """

    spec_id: str
    read_type: str
    data_format: Optional[str] = None
    sftp_files_format: Optional[str] = None
    df_name: Optional[DataFrame] = None
    db_table: Optional[str] = None
    location: Optional[str] = None
    sharepoint_opts: Optional[SharepointOptions] = None
    query: Optional[str] = None
    enforce_schema_from_table: Optional[str] = None
    schema: Optional[dict] = None
    schema_path: Optional[str] = None
    disable_dbfs_retry: bool = False
    with_filepath: bool = False
    options: Optional[dict] = None
    jdbc_args: Optional[dict] = None
    calculate_upper_bound: bool = False
    calc_upper_bound_schema: Optional[str] = None
    generate_predicates: bool = False
    predicates_add_null: bool = True
    temp_view: Optional[str] = None

    def __post_init__(self) -> None:
        """Normalize Sharepoint options if passed as a raw dictionary.

        Args:
            self: Instance of the class where `sharepoint_opts` attribute
                may be either a dictionary or a SharepointOptions object.
        """
        if isinstance(self.sharepoint_opts, dict):
            self.sharepoint_opts = SharepointOptions(**self.sharepoint_opts)

__post_init__()

Normalize Sharepoint options if passed as a raw dictionary.

Parameters:

Name Type Description Default
self

Instance of the class where sharepoint_opts attribute may be either a dictionary or a SharepointOptions object.

required
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
def __post_init__(self) -> None:
    """Normalize Sharepoint options if passed as a raw dictionary.

    Args:
        self: Instance of the class where `sharepoint_opts` attribute
            may be either a dictionary or a SharepointOptions object.
    """
    if isinstance(self.sharepoint_opts, dict):
        self.sharepoint_opts = SharepointOptions(**self.sharepoint_opts)

MergeOptions dataclass

Bases: object

Options for a merge operation.

  • merge_predicate: predicate to apply to the merge operation so that we can check if a new record corresponds to a record already included in the historical data.
  • insert_only: indicates if the merge should only insert data (e.g., deduplicate scenarios).
  • delete_predicate: predicate to apply to the delete operation.
  • update_predicate: predicate to apply to the update operation.
  • insert_predicate: predicate to apply to the insert operation.
  • update_column_set: rules to apply to the update operation which allows to set the value for each column to be updated. (e.g. {"data": "new.data", "count": "current.count + 1"} )
  • insert_column_set: rules to apply to the insert operation which allows to set the value for each column to be inserted. (e.g. {"date": "updates.date", "count": "1"} )
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class MergeOptions(object):
    """Options for a merge operation.

    - merge_predicate: predicate to apply to the merge operation so that we can
        check if a new record corresponds to a record already included in the
        historical data.
    - insert_only: indicates if the merge should only insert data (e.g., deduplicate
        scenarios).
    - delete_predicate: predicate to apply to the delete operation.
    - update_predicate: predicate to apply to the update operation.
    - insert_predicate: predicate to apply to the insert operation.
    - update_column_set: rules to apply to the update operation which allows to
        set the value for each column to be updated.
        (e.g. {"data": "new.data", "count": "current.count + 1"} )
    - insert_column_set: rules to apply to the insert operation which allows to
        set the value for each column to be inserted.
        (e.g. {"date": "updates.date", "count": "1"} )
    """

    merge_predicate: str
    insert_only: bool = False
    delete_predicate: Optional[str] = None
    update_predicate: Optional[str] = None
    insert_predicate: Optional[str] = None
    update_column_set: Optional[dict] = None
    insert_column_set: Optional[dict] = None

NotificationRuntimeParameters

Bases: Enum

Parameters to be replaced in runtime.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class NotificationRuntimeParameters(Enum):
    """Parameters to be replaced in runtime."""

    DATABRICKS_JOB_NAME = "databricks_job_name"
    DATABRICKS_WORKSPACE_ID = "databricks_workspace_id"
    JOB_EXCEPTION = "exception"

NotifierType

Bases: Enum

Type of notifier available.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class NotifierType(Enum):
    """Type of notifier available."""

    EMAIL = "email"

OutputFormat

Bases: Enum

Formats of algorithm output.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class OutputFormat(Enum):
    """Formats of algorithm output."""

    JDBC = "jdbc"
    AVRO = "avro"
    JSON = "json"
    CSV = "csv"
    PARQUET = "parquet"
    DELTAFILES = "delta"
    KAFKA = "kafka"
    CONSOLE = "console"
    NOOP = "noop"
    DATAFRAME = "dataframe"
    REST_API = "rest_api"
    FILE = "file"  # Internal use only
    TABLE = "table"  # Internal use only
    SHAREPOINT = "sharepoint"

    @classmethod
    def values(cls):  # type: ignore
        """Generates a list containing all enum values.

        Returns:
            A list with all enum values.
        """
        return (c.value for c in cls)

    @classmethod
    def exists(cls, output_format: str) -> bool:
        """Checks if the output format exists in the enum values.

        Args:
            output_format: format to check if exists.

        Returns:
            If the output format exists in our enum.
        """
        return output_format in cls.values()

exists(output_format) classmethod

Checks if the output format exists in the enum values.

Parameters:

Name Type Description Default
output_format str

format to check if exists.

required

Returns:

Type Description
bool

If the output format exists in our enum.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def exists(cls, output_format: str) -> bool:
    """Checks if the output format exists in the enum values.

    Args:
        output_format: format to check if exists.

    Returns:
        If the output format exists in our enum.
    """
    return output_format in cls.values()

values() classmethod

Generates a list containing all enum values.

Returns:

Type Description

A list with all enum values.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def values(cls):  # type: ignore
    """Generates a list containing all enum values.

    Returns:
        A list with all enum values.
    """
    return (c.value for c in cls)

OutputSpec dataclass

Bases: object

Specification of an algorithm output.

This is very aligned with the way the execution environment connects to the output systems (e.g., spark outputs).

  • spec_id: id of the output specification.
  • input_id: id of the corresponding input specification.
  • write_type: type of write operation.
  • data_format: format of the output. Defaults to DELTA.
  • db_table: table name in the form of <db>.<table>.
  • location: uri that identifies from where to write data in the specified format.
  • sharepoint_opts: options to apply on writing on Sharepoint operations.
  • partitions: list of partition input_col names.
  • merge_opts: options to apply to the merge operation.
  • streaming_micro_batch_transformers: transformers to invoke for each streaming micro batch, before writing (i.e., in Spark's foreachBatch structured streaming function). Note: the lakehouse engine manages this for you, so you don't have to manually specify streaming transformations here, so we don't advise you to manually specify transformations through this parameter. Supply them as regular transformers in the transform_specs sections of an ACON.
  • streaming_once: if the streaming query is to be executed just once, or not, generating just one micro batch.
  • streaming_processing_time: if streaming query is to be kept alive, this indicates the processing time of each micro batch.
  • streaming_available_now: if set to True, set a trigger that processes all available data in multiple batches then terminates the query. When using streaming, this is the default trigger that the lakehouse-engine will use, unless you configure a different one.
  • streaming_continuous: set a trigger that runs a continuous query with a given checkpoint interval.
  • streaming_await_termination: whether to wait (True) for the termination of the streaming query (e.g. timeout or exception) or not (False). Default: True.
  • streaming_await_termination_timeout: a timeout to set to the streaming_await_termination. Default: None.
  • with_batch_id: whether to include the streaming batch id in the final data, or not. It only takes effect in streaming mode.
  • options: dict with other relevant options according to the execution environment (e.g., spark) possible outputs. E.g.,: JDBC options, checkpoint location for streaming, etc.
  • streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers but for the DQ functions to be executed. Used internally by the lakehouse engine, so you don't have to supply DQ functions through this parameter. Use the dq_specs of the acon instead.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class OutputSpec(object):
    """Specification of an algorithm output.

    This is very aligned with the way the execution environment connects to the output
    systems (e.g., spark outputs).

    - spec_id: id of the output specification.
    - input_id: id of the corresponding input specification.
    - write_type: type of write operation.
    - data_format: format of the output. Defaults to DELTA.
    - db_table: table name in the form of `<db>.<table>`.
    - location: uri that identifies from where to write data in the specified format.
    - sharepoint_opts: options to apply on writing on Sharepoint operations.
    - partitions: list of partition input_col names.
    - merge_opts: options to apply to the merge operation.
    - streaming_micro_batch_transformers: transformers to invoke for each streaming
        micro batch, before writing (i.e., in Spark's foreachBatch structured
        streaming function). Note: the lakehouse engine manages this for you, so
        you don't have to manually specify streaming transformations here, so we don't
        advise you to manually specify transformations through this parameter. Supply
        them as regular transformers in the transform_specs sections of an ACON.
    - streaming_once: if the streaming query is to be executed just once, or not,
        generating just one micro batch.
    - streaming_processing_time: if streaming query is to be kept alive, this indicates
        the processing time of each micro batch.
    - streaming_available_now: if set to True, set a trigger that processes all
        available data in multiple batches then terminates the query.
        When using streaming, this is the default trigger that the lakehouse-engine will
        use, unless you configure a different one.
    - streaming_continuous: set a trigger that runs a continuous query with a given
        checkpoint interval.
    - streaming_await_termination: whether to wait (True) for the termination of the
        streaming query (e.g. timeout or exception) or not (False). Default: True.
    - streaming_await_termination_timeout: a timeout to set to the
        streaming_await_termination. Default: None.
    - with_batch_id: whether to include the streaming batch id in the final data,
        or not. It only takes effect in streaming mode.
    - options: dict with other relevant options according to the execution environment
        (e.g., spark) possible outputs.  E.g.,: JDBC options, checkpoint location for
        streaming, etc.
    - streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers
        but for the DQ functions to be executed. Used internally by the lakehouse
        engine, so you don't have to supply DQ functions through this parameter. Use the
        dq_specs of the acon instead.
    """

    spec_id: str
    input_id: str
    write_type: str
    data_format: str = OutputFormat.DELTAFILES.value
    db_table: Optional[str] = None
    location: Optional[str] = None
    sharepoint_opts: Optional[SharepointOptions] = None
    merge_opts: Optional[MergeOptions] = None
    partitions: Optional[List[str]] = None
    streaming_micro_batch_transformers: Optional[List[TransformerSpec]] = None
    streaming_once: Optional[bool] = None
    streaming_processing_time: Optional[str] = None
    streaming_available_now: bool = True
    streaming_continuous: Optional[str] = None
    streaming_await_termination: bool = True
    streaming_await_termination_timeout: Optional[int] = None
    with_batch_id: bool = False
    options: Optional[dict] = None
    streaming_micro_batch_dq_processors: Optional[List[DQSpec]] = None

ReadMode

Bases: Enum

Different modes that control how we handle compliance to the provided schema.

These read modes map to Spark's read modes at the moment.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class ReadMode(Enum):
    """Different modes that control how we handle compliance to the provided schema.

    These read modes map to Spark's read modes at the moment.
    """

    PERMISSIVE = "PERMISSIVE"
    FAILFAST = "FAILFAST"
    DROPMALFORMED = "DROPMALFORMED"

ReadType

Bases: Enum

Define the types of read operations.

  • BATCH - read the data in batch mode (e.g., Spark batch).
  • STREAMING - read the data in streaming mode (e.g., Spark streaming).
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class ReadType(Enum):
    """Define the types of read operations.

    - BATCH - read the data in batch mode (e.g., Spark batch).
    - STREAMING - read the data in streaming mode (e.g., Spark streaming).
    """

    BATCH = "batch"
    STREAMING = "streaming"

ReconciliatorSpec dataclass

Bases: object

Reconciliator Specification.

  • metrics: list of metrics in the form of: [{ metric: name of the column present in both truth and current datasets, aggregation: sum, avg, max, min, ..., type: percentage or absolute, yellow: value, red: value }].
  • recon_type: reconciliation type (percentage or absolute). Percentage calculates the difference between truth and current results as a percentage (x-y/x), and absolute calculates the raw difference (x - y).
  • truth_input_spec: input specification of the truth data.
  • current_input_spec: input specification of the current results data
  • truth_preprocess_query: additional query on top of the truth input data to preprocess the truth data before it gets fueled into the reconciliation process. Important note: you need to assume that the data out of the truth_input_spec is referencable by a table called 'truth'.
  • truth_preprocess_query_args: optional dict having the functions/transformations to apply on top of the truth_preprocess_query and respective arguments. Note: cache is being applied on the Dataframe, by default. For turning the default behavior off, pass "truth_preprocess_query_args": [].
  • current_preprocess_query: additional query on top of the current results input data to preprocess the current results data before it gets fueled into the reconciliation process. Important note: you need to assume that the data out of the current_results_input_spec is referencable by a table called 'current'.
  • current_preprocess_query_args: optional dict having the functions/transformations to apply on top of the current_preprocess_query and respective arguments. Note: cache is being applied on the Dataframe, by default. For turning the default behavior off, pass "current_preprocess_query_args": [].
  • ignore_empty_df: optional boolean, to ignore the recon process if source & target dataframes are empty, recon will exit success code (passed)
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class ReconciliatorSpec(object):
    """Reconciliator Specification.

    - metrics: list of metrics in the form of:
        [{
            metric: name of the column present in both truth and current datasets,
            aggregation: sum, avg, max, min, ...,
            type: percentage or absolute,
            yellow: value,
            red: value
        }].
    - recon_type: reconciliation type (percentage or absolute). Percentage calculates
        the difference between truth and current results as a percentage (x-y/x), and
        absolute calculates the raw difference (x - y).
    - truth_input_spec: input specification of the truth data.
    - current_input_spec: input specification of the current results data
    - truth_preprocess_query: additional query on top of the truth input data to
        preprocess the truth data before it gets fueled into the reconciliation process.
        Important note: you need to assume that the data out of
        the truth_input_spec is referencable by a table called 'truth'.
    - truth_preprocess_query_args: optional dict having the functions/transformations to
        apply on top of the truth_preprocess_query and respective arguments. Note: cache
        is being applied on the Dataframe, by default. For turning the default behavior
        off, pass `"truth_preprocess_query_args": []`.
    - current_preprocess_query: additional query on top of the current results input
        data to preprocess the current results data before it gets fueled into the
        reconciliation process. Important note: you need to assume that the data out of
        the current_results_input_spec is referencable by a table called 'current'.
    - current_preprocess_query_args: optional dict having the
        functions/transformations to apply on top of the current_preprocess_query
        and respective arguments. Note: cache is being applied on the Dataframe,
        by default. For turning the default behavior off, pass
        `"current_preprocess_query_args": []`.
    - ignore_empty_df: optional boolean, to ignore the recon process if source & target
       dataframes are empty, recon will exit success code (passed)
    """

    metrics: List[dict]
    truth_input_spec: InputSpec
    current_input_spec: InputSpec
    truth_preprocess_query: Optional[str] = None
    truth_preprocess_query_args: Optional[List[dict]] = None
    current_preprocess_query: Optional[str] = None
    current_preprocess_query_args: Optional[List[dict]] = None
    ignore_empty_df: Optional[bool] = False

RestoreStatus

Bases: Enum

Archive types.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class RestoreStatus(Enum):
    """Archive types."""

    NOT_STARTED = "not_started"
    ONGOING = "ongoing"
    RESTORED = "restored"

RestoreType

Bases: Enum

Archive types.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class RestoreType(Enum):
    """Archive types."""

    BULK = "Bulk"
    STANDARD = "Standard"
    EXPEDITED = "Expedited"

    @classmethod
    def values(cls):  # type: ignore
        """Generates a list containing all enum values.

        Returns:
            A list with all enum values.
        """
        return (c.value for c in cls)

    @classmethod
    def exists(cls, restore_type: str) -> bool:
        """Checks if the restore type exists in the enum values.

        Args:
            restore_type: restore type to check if exists.

        Returns:
            If the restore type exists in our enum.
        """
        return restore_type in cls.values()

exists(restore_type) classmethod

Checks if the restore type exists in the enum values.

Parameters:

Name Type Description Default
restore_type str

restore type to check if exists.

required

Returns:

Type Description
bool

If the restore type exists in our enum.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def exists(cls, restore_type: str) -> bool:
    """Checks if the restore type exists in the enum values.

    Args:
        restore_type: restore type to check if exists.

    Returns:
        If the restore type exists in our enum.
    """
    return restore_type in cls.values()

values() classmethod

Generates a list containing all enum values.

Returns:

Type Description

A list with all enum values.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def values(cls):  # type: ignore
    """Generates a list containing all enum values.

    Returns:
        A list with all enum values.
    """
    return (c.value for c in cls)

SAPLogchain

Bases: Enum

Defaults used on consuming data from SAP Logchain.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class SAPLogchain(Enum):
    """Defaults used on consuming data from SAP Logchain."""

    DBTABLE = "SAPPHA.RSPCLOGCHAIN"
    GREEN_STATUS = "G"
    ENGINE_TABLE = "sensor_new_data"

SQLDefinitions

Bases: Enum

SQL definitions statements.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class SQLDefinitions(Enum):
    """SQL definitions statements."""

    compute_table_stats = "ANALYZE TABLE {} COMPUTE STATISTICS"
    drop_table_stmt = "DROP TABLE IF EXISTS"
    drop_view_stmt = "DROP VIEW IF EXISTS"
    truncate_stmt = "TRUNCATE TABLE"
    describe_stmt = "DESCRIBE TABLE"
    optimize_stmt = "OPTIMIZE"
    show_tbl_props_stmt = "SHOW TBLPROPERTIES"
    delete_where_stmt = "DELETE FROM {} WHERE {}"

SQLParser

Bases: Enum

Defaults to use for parsing.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class SQLParser(Enum):
    """Defaults to use for parsing."""

    DOUBLE_QUOTES = '"'
    SINGLE_QUOTES = "'"
    BACKSLASH = "\\"
    SINGLE_TRACE = "-"
    DOUBLE_TRACES = "--"
    SLASH = "/"
    OPENING_MULTIPLE_LINE_COMMENT = "/*"
    CLOSING_MULTIPLE_LINE_COMMENT = "*/"
    PARAGRAPH = "\n"
    STAR = "*"

    MULTIPLE_LINE_COMMENT = [
        OPENING_MULTIPLE_LINE_COMMENT,
        CLOSING_MULTIPLE_LINE_COMMENT,
    ]

SensorSpec dataclass

Bases: object

Sensor Specification.

  • sensor_id: sensor id.
  • assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.
  • control_db_table_name: db.table to store sensor metadata.
  • input_spec: input specification of the source to be checked for new data.
  • preprocess_query: SQL query to transform/filter the result from the upstream. Consider that we should refer to 'new_data' whenever we are referring to the input of the sensor. E.g.: "SELECT dummy_col FROM new_data WHERE ..."
  • checkpoint_location: optional location to store checkpoints to resume from. These checkpoints use the same as Spark checkpoint strategy. For Spark readers that do not support checkpoints, use the preprocess_query parameter to form a SQL query to filter the result from the upstream accordingly.
  • fail_on_empty_result: if the sensor should throw an error if there is no new data in the upstream. Default: True.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class SensorSpec(object):
    """Sensor Specification.

    - sensor_id: sensor id.
    - assets: a list of assets that are considered as available to
        consume downstream after this sensor has status
        PROCESSED_NEW_DATA.
    - control_db_table_name: db.table to store sensor metadata.
    - input_spec: input specification of the source to be checked for new data.
    - preprocess_query: SQL query to transform/filter the result from the
        upstream. Consider that we should refer to 'new_data' whenever
        we are referring to the input of the sensor. E.g.:
            "SELECT dummy_col FROM new_data WHERE ..."
    - checkpoint_location: optional location to store checkpoints to resume
        from. These checkpoints use the same as Spark checkpoint strategy.
        For Spark readers that do not support checkpoints, use the
        preprocess_query parameter to form a SQL query to filter the result
        from the upstream accordingly.
    - fail_on_empty_result: if the sensor should throw an error if there is no new data
    in the upstream. Default: True.
    """

    sensor_id: str
    assets: List[str]
    control_db_table_name: str
    input_spec: InputSpec
    preprocess_query: Optional[str]
    checkpoint_location: Optional[str]
    fail_on_empty_result: bool = True

    @classmethod
    def create_from_acon(cls, acon: dict):  # type: ignore
        """Create SensorSpec from acon.

        Args:
            acon: sensor ACON.
        """
        checkpoint_location = acon.get("base_checkpoint_location")
        if checkpoint_location:
            checkpoint_location = (
                f"{checkpoint_location.rstrip('/')}/lakehouse_engine/"
                f"sensors/{acon['sensor_id']}"
            )

        return cls(
            sensor_id=acon["sensor_id"],
            assets=acon["assets"],
            control_db_table_name=acon["control_db_table_name"],
            input_spec=InputSpec(**acon["input_spec"]),
            preprocess_query=acon.get("preprocess_query"),
            checkpoint_location=checkpoint_location,
            fail_on_empty_result=acon.get("fail_on_empty_result", True),
        )

create_from_acon(acon) classmethod

Create SensorSpec from acon.

Parameters:

Name Type Description Default
acon dict

sensor ACON.

required
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@classmethod
def create_from_acon(cls, acon: dict):  # type: ignore
    """Create SensorSpec from acon.

    Args:
        acon: sensor ACON.
    """
    checkpoint_location = acon.get("base_checkpoint_location")
    if checkpoint_location:
        checkpoint_location = (
            f"{checkpoint_location.rstrip('/')}/lakehouse_engine/"
            f"sensors/{acon['sensor_id']}"
        )

    return cls(
        sensor_id=acon["sensor_id"],
        assets=acon["assets"],
        control_db_table_name=acon["control_db_table_name"],
        input_spec=InputSpec(**acon["input_spec"]),
        preprocess_query=acon.get("preprocess_query"),
        checkpoint_location=checkpoint_location,
        fail_on_empty_result=acon.get("fail_on_empty_result", True),
    )

SensorStatus

Bases: Enum

Status for a sensor.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class SensorStatus(Enum):
    """Status for a sensor."""

    ACQUIRED_NEW_DATA = "ACQUIRED_NEW_DATA"
    PROCESSED_NEW_DATA = "PROCESSED_NEW_DATA"

SharepointFile dataclass

Represents a file from Sharepoint with metadata and optional content.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class SharepointFile:
    """Represents a file from Sharepoint with metadata and optional content."""

    file_name: str
    time_created: str
    time_modified: str
    content: Optional[bytes] = None
    _folder: Optional[str] = None
    skip_rename: bool = False
    _already_archived: bool = False

    @property
    def file_extension(self) -> str:
        """Returns the file extension of the stored file."""
        return Path(self.file_name).suffix

    @property
    def file_path(self) -> str:
        """Full Sharepoint path including folder and file name."""
        if not self._folder:
            raise AttributeError("file_path unavailable; _folder not set.")
        return f"{self._folder}/{self.file_name}"

    @property
    def is_csv(self) -> bool:
        """True if file is a CSV."""
        return self.file_extension.lower() == ".csv"

    @property
    def is_excel(self) -> bool:
        """True if file is an Excel file."""
        return self.file_extension.lower() == ".xlsx"

    @property
    def content_size(self) -> int:
        """Size of content in bytes."""
        return len(self.content) if self.content else 0

content_size property

Size of content in bytes.

file_extension property

Returns the file extension of the stored file.

file_path property

Full Sharepoint path including folder and file name.

is_csv property

True if file is a CSV.

is_excel property

True if file is an Excel file.

SharepointOptions dataclass

Bases: object

Options for Sharepoint I/O (used by both reader and writer).

This dataclass is shared by the Sharepoint reader and writer. Some fields are required/used only in read mode, others only in write mode. Use validate_for_reader() / validate_for_writer() to enforce the correct subsets.

Common (reader & writer): - client_id (str): Azure AD application (client) ID. - tenant_id (str): Azure AD tenant (directory) ID. - site_name (str): Sharepoint site name. - drive_name (str): Document library/drive name. - secret (str): Client secret. - local_path (str): Local/volume path for staging (read/write temp). - api_version (str): Microsoft Graph API version (default: "v1.0"). - conflict_behaviour (Optional[str]): e.g. 'replace', 'fail'. - allowed_extensions (Optional[Collection[str]]): Defaults to SHAREPOINT_SUPPORTED_EXTENSIONS {".csv", ".xlsx"}.

Reader-specific
  • folder_relative_path (Optional[str]): Folder (or full file path) to read from.
  • file_name (Optional[str]): Name of a single file inside the folder to read. If folder_relative_path already points to a file, file_name must be None.
  • file_type (Optional[str]): "csv" or "xlsx" when reading a folder.
  • file_pattern (Optional[str]): Glob (e.g. '*.csv') when reading a folder.
  • local_options (Optional[dict]): Spark CSV read options (e.g. header, sep).
  • chunk_size (Optional[int]): Download chunk size (bytes).
Writer-specific
  • file_name (Optional[str]): Target file name to upload.
  • local_options (Optional[dict]): Spark CSV write options.
  • chunk_size (Optional[int]): Upload chunk size (bytes).

Archiving (reader): - archive_enabled (bool): Whether to move files after a successful/failed read. Default: True. - archive_success_subfolder (Optional[str]): Success folder (default "done"). Set None to keep in place. - archive_error_subfolder (Optional[str]): Error folder (default "error"). Set None to keep in place.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class SharepointOptions(object):
    """Options for Sharepoint I/O (used by both reader and writer).

    This dataclass is shared by the Sharepoint reader and writer. Some fields
    are required/used only in *read* mode, others only in *write* mode.
    Use `validate_for_reader()` / `validate_for_writer()` to enforce the
    correct subsets.

    Common (reader & writer):
      - client_id (str): Azure AD application (client) ID.
      - tenant_id (str): Azure AD tenant (directory) ID.
      - site_name (str): Sharepoint site name.
      - drive_name (str): Document library/drive name.
      - secret (str): Client secret.
      - local_path (str): Local/volume path for staging (read/write temp).
      - api_version (str): Microsoft Graph API version (default: "v1.0").
      - conflict_behaviour (Optional[str]): e.g. 'replace', 'fail'.
      - allowed_extensions (Optional[Collection[str]]):
          Defaults to SHAREPOINT_SUPPORTED_EXTENSIONS {".csv", ".xlsx"}.

    Reader-specific:
      - folder_relative_path (Optional[str]): Folder (or full file path)
          to read from.
      - file_name (Optional[str]): Name of a single file inside the folder
          to read. If `folder_relative_path` already points to a file,
          `file_name` must be None.
      - file_type (Optional[str]): "csv" or "xlsx" when reading a folder.
      - file_pattern (Optional[str]): Glob (e.g. '*.csv') when reading a folder.
      - local_options (Optional[dict]): Spark CSV read options (e.g. header, sep).
      - chunk_size (Optional[int]): Download chunk size (bytes).

    Writer-specific:
      - file_name (Optional[str]): Target file name to upload.
      - local_options (Optional[dict]): Spark CSV write options.
      - chunk_size (Optional[int]): Upload chunk size (bytes).

    Archiving (reader):
      - archive_enabled (bool): Whether to move files after a successful/failed read.
          Default: True.
      - archive_success_subfolder (Optional[str]): Success folder (default "done").
          Set None to keep in place.
      - archive_error_subfolder (Optional[str]): Error folder (default "error").
          Set None to keep in place.
    """

    # Common
    client_id: str
    tenant_id: str
    site_name: str
    drive_name: str
    secret: str
    local_path: str
    file_name: Optional[str] = None  # used by reader (optional) and writer (target)
    api_version: str = "v1.0"
    conflict_behaviour: Optional[str] = None
    allowed_extensions: Optional[Collection[str]] = None

    # Reader
    file_type: Optional[str] = None
    folder_relative_path: Optional[str] = None
    file_pattern: Optional[str] = None
    chunk_size: Optional[int] = 100 * 1024 * 1024  # 100 MB (read & write)
    local_options: Optional[dict] = None  # (read & write)

    # Reader archiving
    archive_enabled: bool = True
    archive_success_subfolder: Optional[str] = "done"
    archive_error_subfolder: Optional[str] = "error"

    REQUIRED_READER_OPTS: ClassVar[Tuple[str, ...]] = (
        "site_name",
        "drive_name",
        "folder_relative_path",
    )
    REQUIRED_WRITER_OPTS: ClassVar[Tuple[str, ...]] = (
        "site_name",
        "drive_name",
        "local_path",
    )

    def __post_init__(self) -> None:
        """Normalize and validate Sharepoint options (types, extensions, etc)."""
        allowed_extensions = self._get_allowed_extensions()
        allowed_file_types = {extension.lstrip(".") for extension in allowed_extensions}

        self._validate_file_type(allowed_file_types)
        self._normalize_folder_relative_path()

        self._validate_folder_relative_path_extension_if_looks_like_file(
            allowed_extensions
        )
        self._validate_single_file_mode_constraints_if_folder_is_file_path(
            allowed_extensions
        )

        self._validate_file_name_and_file_pattern_are_not_both_set()

    def _get_allowed_extensions(self) -> set[str]:
        """Return the supported file extensions (lowercased)."""
        return {
            extension.lower()
            for extension in (
                self.allowed_extensions or SHAREPOINT_SUPPORTED_EXTENSIONS
            )
        }

    def _validate_file_type(self, allowed_file_types: set[str]) -> None:
        """Validate that `file_type` is supported when provided."""
        if not self.file_type:
            return

        if self.file_type.lower() not in allowed_file_types:
            raise ValueError(
                f"`file_type` must be one of {sorted(allowed_file_types)}. "
                f"Got: '{self.file_type}'"
            )

    def _normalize_folder_relative_path(self) -> None:
        """Strip leading and trailing slashes from `folder_relative_path`."""
        if self.folder_relative_path:
            self.folder_relative_path = self.folder_relative_path.strip("/")

    def _ends_with_supported_extension(
        self,
        path_value: str,
        allowed_extensions: set[str],
    ) -> bool:
        """Return True if the path ends with any supported extension."""
        lowered_path_value = path_value.lower()
        return any(
            lowered_path_value.endswith(extension) for extension in allowed_extensions
        )

    def _validate_single_file_mode_constraints_if_folder_is_file_path(
        self,
        allowed_extensions: set[str],
    ) -> None:
        """Forbid file name, pattern, and type when folder_relative_path end is file."""
        if not self.folder_relative_path:
            return

        if not self._ends_with_supported_extension(
            self.folder_relative_path, allowed_extensions
        ):
            return

        if self.file_name:
            raise ValueError(
                "When `folder_relative_path` points to a file, `file_name` must "
                "be None."
            )
        if self.file_pattern:
            raise ValueError(
                "When `folder_relative_path` points to a file, `file_pattern` must "
                "be None."
            )
        if self.file_type:
            raise ValueError(
                "When `folder_relative_path` points to a file, `file_type` must "
                "be None (it's derived from file_path extension)"
            )

    def _validate_file_name_extension(self, allowed_extensions: set[str]) -> None:
        """Validate that `file_name` ends with a supported extension when provided."""
        if not self.file_name:
            return

        if not self._ends_with_supported_extension(self.file_name, allowed_extensions):
            raise ValueError(
                f"`file_name` must end with one of {sorted(allowed_extensions)},"
                f" got: {self.file_name}"
            )

    def _validate_file_name_and_file_pattern_are_not_both_set(self) -> None:
        """Validate that `file_name` and `file_pattern` are not both set."""
        if self.file_name and self.file_pattern:
            raise ValueError(
                "Conflicting options: provide either `file_name` or `file_pattern`"
                ", not both."
            )

    def _validate_folder_relative_path_extension_if_looks_like_file(
        self,
        allowed_extensions: set[str],
    ) -> None:
        """Fail if folder_relative_path is a file path but has unsupported extension."""
        if not self.folder_relative_path:
            return

        last_segment = self.folder_relative_path.split("/")[-1]
        looks_like_file = "." in last_segment
        if not looks_like_file:
            return

        if self._ends_with_supported_extension(last_segment, allowed_extensions):
            return

        raise ValueError(
            f"`folder_relative_path` appears to be a file path but does not end "
            f"with one of {sorted(allowed_extensions)}: {self.folder_relative_path}"
        )

    def validate_for_reader(self) -> None:
        """Validate Sharepoint options required for reading."""
        missing = [opt for opt in self.REQUIRED_READER_OPTS if not getattr(self, opt)]
        if missing:
            raise InputNotFoundException(
                f"Missing required Sharepoint options for reader: {', '.join(missing)}"
            )
        allowed_extensions = self._get_allowed_extensions()
        if self.file_name and not self._ends_with_supported_extension(
            self.file_name, allowed_extensions
        ):
            raise ValueError(
                f"`file_name` must end with one of {sorted(allowed_extensions)}, "
                "got: {self.file_name}"
            )

    def validate_for_writer(self) -> None:
        """Validate Sharepoint options required for writing."""
        missing = [opt for opt in self.REQUIRED_WRITER_OPTS if not getattr(self, opt)]
        if missing:
            raise InputNotFoundException(
                f"Missing required Sharepoint options for writer: {', '.join(missing)}"
            )

__post_init__()

Normalize and validate Sharepoint options (types, extensions, etc).

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
def __post_init__(self) -> None:
    """Normalize and validate Sharepoint options (types, extensions, etc)."""
    allowed_extensions = self._get_allowed_extensions()
    allowed_file_types = {extension.lstrip(".") for extension in allowed_extensions}

    self._validate_file_type(allowed_file_types)
    self._normalize_folder_relative_path()

    self._validate_folder_relative_path_extension_if_looks_like_file(
        allowed_extensions
    )
    self._validate_single_file_mode_constraints_if_folder_is_file_path(
        allowed_extensions
    )

    self._validate_file_name_and_file_pattern_are_not_both_set()

validate_for_reader()

Validate Sharepoint options required for reading.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
def validate_for_reader(self) -> None:
    """Validate Sharepoint options required for reading."""
    missing = [opt for opt in self.REQUIRED_READER_OPTS if not getattr(self, opt)]
    if missing:
        raise InputNotFoundException(
            f"Missing required Sharepoint options for reader: {', '.join(missing)}"
        )
    allowed_extensions = self._get_allowed_extensions()
    if self.file_name and not self._ends_with_supported_extension(
        self.file_name, allowed_extensions
    ):
        raise ValueError(
            f"`file_name` must end with one of {sorted(allowed_extensions)}, "
            "got: {self.file_name}"
        )

validate_for_writer()

Validate Sharepoint options required for writing.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
def validate_for_writer(self) -> None:
    """Validate Sharepoint options required for writing."""
    missing = [opt for opt in self.REQUIRED_WRITER_OPTS if not getattr(self, opt)]
    if missing:
        raise InputNotFoundException(
            f"Missing required Sharepoint options for writer: {', '.join(missing)}"
        )

TerminatorSpec dataclass

Bases: object

Terminator Specification.

I.e., the specification that defines a terminator operation to be executed. Examples are compute statistics, vacuum, optimize, etc.

  • function: terminator function to execute.
  • args: arguments of the terminator function.
  • input_id: id of the corresponding output specification (Optional).
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class TerminatorSpec(object):
    """Terminator Specification.

    I.e., the specification that defines a terminator operation to be executed. Examples
    are compute statistics, vacuum, optimize, etc.

    - function: terminator function to execute.
    - args: arguments of the terminator function.
    - input_id: id of the corresponding output specification (Optional).
    """

    function: str
    args: Optional[dict] = None
    input_id: Optional[str] = None

TransformSpec dataclass

Bases: object

Transformation Specification.

I.e., the specification that defines the many transformations to be done to the data that was read.

  • spec_id: id of the terminate specification
  • input_id: id of the corresponding input specification.
  • transformers: list of transformers to execute.
  • force_streaming_foreach_batch_processing: sometimes, when using streaming, we want to force the transform to be executed in the foreachBatch function to ensure non-supported streaming operations can be properly executed.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class TransformSpec(object):
    """Transformation Specification.

    I.e., the specification that defines the many transformations to be done to the data
    that was read.

    - spec_id: id of the terminate specification
    - input_id: id of the corresponding input
    specification.
    - transformers: list of transformers to execute.
    - force_streaming_foreach_batch_processing: sometimes, when using streaming, we want
        to force the transform to be executed in the foreachBatch function to ensure
        non-supported streaming operations can be properly executed.
    """

    spec_id: str
    input_id: str
    transformers: List[TransformerSpec]
    force_streaming_foreach_batch_processing: bool = False

TransformerSpec dataclass

Bases: object

Transformer Specification, i.e., a single transformation amongst many.

  • function: name of the function (or callable function) to be executed.
  • args: (not applicable if using a callable function) dict with the arguments to pass to the function <k,v> pairs with the name of the parameter of the function and the respective value.
Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
@dataclass
class TransformerSpec(object):
    """Transformer Specification, i.e., a single transformation amongst many.

    - function: name of the function (or callable function) to be executed.
    - args: (not applicable if using a callable function) dict with the arguments
        to pass to the function `<k,v>` pairs with the name of the parameter of
        the function and the respective value.
    """

    function: str
    args: dict

WriteType

Bases: Enum

Types of write operations.

Source code in mkdocs/lakehouse_engine/packages/core/definitions.py
class WriteType(Enum):
    """Types of write operations."""

    OVERWRITE = "overwrite"
    COMPLETE = "complete"
    APPEND = "append"
    UPDATE = "update"
    MERGE = "merge"
    ERROR_IF_EXISTS = "error"
    IGNORE_IF_EXISTS = "ignore"