lakehouse_engine.core.definitions

Definitions of standard values and structures for core components.

   1"""Definitions of standard values and structures for core components."""
   2
   3from dataclasses import dataclass
   4from datetime import datetime
   5from enum import Enum
   6from typing import List, Optional, Union
   7
   8from pyspark.sql import DataFrame
   9from pyspark.sql.types import (
  10    ArrayType,
  11    BooleanType,
  12    StringType,
  13    StructField,
  14    StructType,
  15    TimestampType,
  16)
  17
  18
  19class CollectEngineUsage(Enum):
  20    """Options for collecting engine usage stats.
  21
  22    - enabled, enables the collection and storage of Lakehouse Engine
  23    usage statistics for any environment.
  24    - prod_only, enables the collection and storage of Lakehouse Engine
  25    usage statistics for production environment only.
  26    - disabled, disables the collection and storage of Lakehouse Engine
  27    usage statistics, for all environments.
  28    """
  29
  30    ENABLED = "enabled"
  31    PROD_ONLY = "prod_only"
  32    DISABLED = "disabled"
  33
  34
  35@dataclass
  36class EngineConfig(object):
  37    """Definitions that can come from the Engine Config file.
  38
  39    - dq_bucket: S3 prod bucket used to store data quality related artifacts.
  40    - dq_dev_bucket: S3 dev bucket used to store data quality related artifacts.
  41    - notif_disallowed_email_servers: email servers not allowed to be used
  42        for sending notifications.
  43    - engine_usage_path: path where the engine prod usage stats are stored.
  44    - engine_dev_usage_path: path where the engine dev usage stats are stored.
  45    - collect_engine_usage: whether to enable the collection of lakehouse
  46        engine usage stats or not.
  47    - dq_functions_column_list: list of columns to be added to the meta argument
  48        of GX when using PRISMA.
  49    """
  50
  51    dq_bucket: Optional[str] = None
  52    dq_dev_bucket: Optional[str] = None
  53    notif_disallowed_email_servers: Optional[list] = None
  54    engine_usage_path: Optional[str] = None
  55    engine_dev_usage_path: Optional[str] = None
  56    collect_engine_usage: str = CollectEngineUsage.ENABLED.value
  57    dq_functions_column_list: Optional[list] = None
  58
  59
  60class EngineStats(Enum):
  61    """Definitions for collection of Lakehouse Engine Stats.
  62
  63    .. note::
  64        Note: whenever the value comes from a key inside a Spark Config
  65        that returns an array, it can be specified with a '#' so that it
  66        is adequately processed.
  67    """
  68
  69    CLUSTER_USAGE_TAGS = "spark.databricks.clusterUsageTags"
  70    DEF_SPARK_CONFS = {
  71        "dp_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#accountName",
  72        "environment": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#environment",
  73        "workspace_id": f"{CLUSTER_USAGE_TAGS}.orgId",
  74        "job_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#JobId",
  75        "job_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#RunName",
  76        "run_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#ClusterName",
  77    }
  78
  79
  80class InputFormat(Enum):
  81    """Formats of algorithm input."""
  82
  83    JDBC = "jdbc"
  84    AVRO = "avro"
  85    JSON = "json"
  86    CSV = "csv"
  87    PARQUET = "parquet"
  88    DELTAFILES = "delta"
  89    CLOUDFILES = "cloudfiles"
  90    KAFKA = "kafka"
  91    SQL = "sql"
  92    SAP_BW = "sap_bw"
  93    SAP_B4 = "sap_b4"
  94    DATAFRAME = "dataframe"
  95    SFTP = "sftp"
  96
  97    @classmethod
  98    def values(cls):  # type: ignore
  99        """Generates a list containing all enum values.
 100
 101        Return:
 102            A list with all enum values.
 103        """
 104        return (c.value for c in cls)
 105
 106    @classmethod
 107    def exists(cls, input_format: str) -> bool:
 108        """Checks if the input format exists in the enum values.
 109
 110        Args:
 111            input_format: format to check if exists.
 112
 113        Return:
 114            If the input format exists in our enum.
 115        """
 116        return input_format in cls.values()
 117
 118
 119# Formats of input that are considered files.
 120FILE_INPUT_FORMATS = [
 121    InputFormat.AVRO.value,
 122    InputFormat.JSON.value,
 123    InputFormat.PARQUET.value,
 124    InputFormat.CSV.value,
 125    InputFormat.DELTAFILES.value,
 126    InputFormat.CLOUDFILES.value,
 127]
 128
 129
 130class OutputFormat(Enum):
 131    """Formats of algorithm output."""
 132
 133    JDBC = "jdbc"
 134    AVRO = "avro"
 135    JSON = "json"
 136    CSV = "csv"
 137    PARQUET = "parquet"
 138    DELTAFILES = "delta"
 139    KAFKA = "kafka"
 140    CONSOLE = "console"
 141    NOOP = "noop"
 142    DATAFRAME = "dataframe"
 143    REST_API = "rest_api"
 144    FILE = "file"  # Internal use only
 145    TABLE = "table"  # Internal use only
 146
 147    @classmethod
 148    def values(cls):  # type: ignore
 149        """Generates a list containing all enum values.
 150
 151        Return:
 152            A list with all enum values.
 153        """
 154        return (c.value for c in cls)
 155
 156    @classmethod
 157    def exists(cls, output_format: str) -> bool:
 158        """Checks if the output format exists in the enum values.
 159
 160        Args:
 161            output_format: format to check if exists.
 162
 163        Return:
 164            If the output format exists in our enum.
 165        """
 166        return output_format in cls.values()
 167
 168
 169# Formats of output that are considered files.
 170FILE_OUTPUT_FORMATS = [
 171    OutputFormat.AVRO.value,
 172    OutputFormat.JSON.value,
 173    OutputFormat.PARQUET.value,
 174    OutputFormat.CSV.value,
 175    OutputFormat.DELTAFILES.value,
 176]
 177
 178
 179class NotifierType(Enum):
 180    """Type of notifier available."""
 181
 182    EMAIL = "email"
 183
 184
 185class NotificationRuntimeParameters(Enum):
 186    """Parameters to be replaced in runtime."""
 187
 188    DATABRICKS_JOB_NAME = "databricks_job_name"
 189    DATABRICKS_WORKSPACE_ID = "databricks_workspace_id"
 190
 191
 192NOTIFICATION_RUNTIME_PARAMETERS = [
 193    NotificationRuntimeParameters.DATABRICKS_JOB_NAME.value,
 194    NotificationRuntimeParameters.DATABRICKS_WORKSPACE_ID.value,
 195]
 196
 197
 198class ReadType(Enum):
 199    """Define the types of read operations.
 200
 201    - BATCH - read the data in batch mode (e.g., Spark batch).
 202    - STREAMING - read the data in streaming mode (e.g., Spark streaming).
 203    """
 204
 205    BATCH = "batch"
 206    STREAMING = "streaming"
 207
 208
 209class ReadMode(Enum):
 210    """Different modes that control how we handle compliance to the provided schema.
 211
 212    These read modes map to Spark's read modes at the moment.
 213    """
 214
 215    PERMISSIVE = "PERMISSIVE"
 216    FAILFAST = "FAILFAST"
 217    DROPMALFORMED = "DROPMALFORMED"
 218
 219
 220class DQDefaults(Enum):
 221    """Defaults used on the data quality process."""
 222
 223    FILE_SYSTEM_STORE = "file_system"
 224    FILE_SYSTEM_S3_STORE = "s3"
 225    DQ_BATCH_IDENTIFIERS = ["spec_id", "input_id", "timestamp"]
 226    DATASOURCE_CLASS_NAME = "Datasource"
 227    DATASOURCE_EXECUTION_ENGINE = "SparkDFExecutionEngine"
 228    DATA_CONNECTORS_CLASS_NAME = "RuntimeDataConnector"
 229    DATA_CONNECTORS_MODULE_NAME = "great_expectations.datasource.data_connector"
 230    DATA_CHECKPOINTS_CLASS_NAME = "SimpleCheckpoint"
 231    DATA_CHECKPOINTS_CONFIG_VERSION = 1.0
 232    STORE_BACKEND = "s3"
 233    EXPECTATIONS_STORE_PREFIX = "dq/expectations/"
 234    VALIDATIONS_STORE_PREFIX = "dq/validations/"
 235    DATA_DOCS_PREFIX = "dq/data_docs/site/"
 236    CHECKPOINT_STORE_PREFIX = "dq/checkpoints/"
 237    VALIDATION_COLUMN_IDENTIFIER = "validationresultidentifier"
 238    CUSTOM_EXPECTATION_LIST = [
 239        "expect_column_values_to_be_date_not_older_than",
 240        "expect_column_pair_a_to_be_smaller_or_equal_than_b",
 241        "expect_multicolumn_column_a_must_equal_b_or_c",
 242        "expect_queried_column_agg_value_to_be",
 243        "expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b",
 244        "expect_column_pair_a_to_be_not_equal_to_b",
 245    ]
 246    DQ_VALIDATIONS_SCHEMA = StructType(
 247        [
 248            StructField(
 249                "dq_validations",
 250                StructType(
 251                    [
 252                        StructField("run_name", StringType()),
 253                        StructField("run_success", BooleanType()),
 254                        StructField("raised_exceptions", BooleanType()),
 255                        StructField("run_row_success", BooleanType()),
 256                        StructField(
 257                            "dq_failure_details",
 258                            ArrayType(
 259                                StructType(
 260                                    [
 261                                        StructField("expectation_type", StringType()),
 262                                        StructField("kwargs", StringType()),
 263                                    ]
 264                                ),
 265                            ),
 266                        ),
 267                    ]
 268                ),
 269            )
 270        ]
 271    )
 272
 273
 274class WriteType(Enum):
 275    """Types of write operations."""
 276
 277    OVERWRITE = "overwrite"
 278    COMPLETE = "complete"
 279    APPEND = "append"
 280    UPDATE = "update"
 281    MERGE = "merge"
 282    ERROR_IF_EXISTS = "error"
 283    IGNORE_IF_EXISTS = "ignore"
 284
 285
 286@dataclass
 287class InputSpec(object):
 288    """Specification of an algorithm input.
 289
 290    This is very aligned with the way the execution environment connects to the sources
 291    (e.g., spark sources).
 292
 293    - spec_id: spec_id of the input specification read_type: ReadType type of read
 294        operation.
 295    - data_format: format of the input.
 296    - sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp
 297        directory.
 298    - df_name: dataframe name.
 299    - db_table: table name in the form of `<db>.<table>`.
 300    - location: uri that identifies from where to read data in the specified format.
 301    - enforce_schema_from_table: if we want to enforce the table schema or not, by
 302        providing a table name in the form of `<db>.<table>`.
 303    - query: sql query to execute and return the dataframe. Use it if you do not want to
 304        read from a file system nor from a table, but rather from a sql query instead.
 305    - schema: dict representation of a schema of the input (e.g., Spark struct type
 306        schema).
 307    - schema_path: path to a file with a representation of a schema of the input (e.g.,
 308        Spark struct type schema).
 309    - disable_dbfs_retry: optional flag to disable file storage dbfs.
 310    - with_filepath: if we want to include the path of the file that is being read. Only
 311        works with the file reader (batch and streaming modes are supported).
 312    - options: dict with other relevant options according to the execution
 313        environment (e.g., spark) possible sources.
 314    - calculate_upper_bound: when to calculate upper bound to extract from SAP BW
 315        or not.
 316    - calc_upper_bound_schema: specific schema for the calculated upper_bound.
 317    - generate_predicates: when to generate predicates to extract from SAP BW or not.
 318    - predicates_add_null: if we want to include is null on partition by predicates.
 319    - temp_view: optional name of a view to point to the input dataframe to be used
 320        to create or replace a temp view on top of the dataframe.
 321    """
 322
 323    spec_id: str
 324    read_type: str
 325    data_format: Optional[str] = None
 326    sftp_files_format: Optional[str] = None
 327    df_name: Optional[DataFrame] = None
 328    db_table: Optional[str] = None
 329    location: Optional[str] = None
 330    query: Optional[str] = None
 331    enforce_schema_from_table: Optional[str] = None
 332    schema: Optional[dict] = None
 333    schema_path: Optional[str] = None
 334    disable_dbfs_retry: bool = False
 335    with_filepath: bool = False
 336    options: Optional[dict] = None
 337    jdbc_args: Optional[dict] = None
 338    calculate_upper_bound: bool = False
 339    calc_upper_bound_schema: Optional[str] = None
 340    generate_predicates: bool = False
 341    predicates_add_null: bool = True
 342    temp_view: Optional[str] = None
 343
 344
 345@dataclass
 346class TransformerSpec(object):
 347    """Transformer Specification, i.e., a single transformation amongst many.
 348
 349    - function: name of the function (or callable function) to be executed.
 350    - args: (not applicable if using a callable function) dict with the arguments
 351        to pass to the function `<k,v>` pairs with the name of the parameter of
 352        the function and the respective value.
 353    """
 354
 355    function: str
 356    args: dict
 357
 358
 359@dataclass
 360class TransformSpec(object):
 361    """Transformation Specification.
 362
 363    I.e., the specification that defines the many transformations to be done to the data
 364    that was read.
 365
 366    - spec_id: id of the terminate specification
 367    - input_id: id of the corresponding input
 368    specification.
 369    - transformers: list of transformers to execute.
 370    - force_streaming_foreach_batch_processing: sometimes, when using streaming, we want
 371        to force the transform to be executed in the foreachBatch function to ensure
 372        non-supported streaming operations can be properly executed.
 373    """
 374
 375    spec_id: str
 376    input_id: str
 377    transformers: List[TransformerSpec]
 378    force_streaming_foreach_batch_processing: bool = False
 379
 380
 381class DQType(Enum):
 382    """Available data quality tasks."""
 383
 384    VALIDATOR = "validator"
 385    PRISMA = "prisma"
 386
 387
 388class DQExecutionPoint(Enum):
 389    """Available data quality execution points."""
 390
 391    IN_MOTION = "in_motion"
 392    AT_REST = "at_rest"
 393
 394
 395class DQTableBaseParameters(Enum):
 396    """Base parameters for importing DQ rules from a table."""
 397
 398    PRISMA_BASE_PARAMETERS = ["arguments", "dq_tech_function"]
 399
 400
 401@dataclass
 402class DQFunctionSpec(object):
 403    """Defines a data quality function specification.
 404
 405    - function - name of the data quality function (expectation) to execute.
 406    It follows the great_expectations api https://greatexpectations.io/expectations/.
 407    - args - args of the function (expectation). Follow the same api as above.
 408    """
 409
 410    function: str
 411    args: Optional[dict] = None
 412
 413
 414@dataclass
 415class DQSpec(object):
 416    """Data quality overall specification.
 417
 418    - spec_id - id of the specification.
 419    - input_id - id of the input specification.
 420    - dq_type - type of DQ process to execute (e.g. validator).
 421    - dq_functions - list of function specifications to execute.
 422    - dq_db_table - name of table to derive the dq functions from.
 423    - dq_table_table_filter - name of the table which rules are to be applied in the
 424        validations (Only used when deriving dq functions).
 425    - dq_table_extra_filters - extra filters to be used when deriving dq functions.
 426        This is a sql expression to be applied to the dq_db_table.
 427    - execution_point - execution point of the dq functions. [at_rest, in_motion].
 428        This is set during the load_data or dq_validator functions.
 429    - unexpected_rows_pk - the list of columns composing the primary key of the
 430        source data to identify the rows failing the DQ validations. Note: only one
 431        of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It
 432        is mandatory to provide one of these arguments when using tag_source_data
 433        as True. When tag_source_data is False, this is not mandatory, but still
 434        recommended.
 435    - tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from.
 436        Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to
 437        be provided. It is mandatory to provide one of these arguments when using
 438        tag_source_data as True. hen tag_source_data is False, this is not
 439        mandatory, but still recommended.
 440    - sort_processed_keys - when using the `prisma` `dq_type`, a column `processed_keys`
 441        is automatically added to give observability over the PK values that were
 442        processed during a run. This parameter (`sort_processed_keys`) controls whether
 443        the processed keys column value should be sorted or not. Default: False.
 444    - gx_result_format - great expectations result format. Default: "COMPLETE".
 445    - tag_source_data - when set to true, this will ensure that the DQ process ends by
 446        tagging the source data with an additional column with information about the
 447        DQ results. This column makes it possible to identify if the DQ run was
 448        succeeded in general and, if not, it unlocks the insights to know what
 449        specific rows have made the DQ validations fail and why. Default: False.
 450        Note: it only works if result_sink_explode is True, gx_result_format is
 451        COMPLETE, fail_on_error is False (which is done automatically when
 452        you specify tag_source_data as True) and tbl_to_derive_pk or
 453        unexpected_rows_pk is configured.
 454    - store_backend - which store_backend to use (e.g. s3 or file_system).
 455    - local_fs_root_dir - path of the root directory. Note: only applicable for
 456        store_backend file_system.
 457    - data_docs_local_fs - the path for data docs only for store_backend
 458        file_system.
 459    - bucket - the bucket name to consider for the store_backend (store DQ artefacts).
 460        Note: only applicable for store_backend s3.
 461    - data_docs_bucket - the bucket name for data docs only. When defined, it will
 462        supersede bucket parameter. Note: only applicable for store_backend s3.
 463    - expectations_store_prefix - prefix where to store expectations' data. Note: only
 464        applicable for store_backend s3.
 465    - validations_store_prefix - prefix where to store validations' data. Note: only
 466        applicable for store_backend s3.
 467    - data_docs_prefix - prefix where to store data_docs' data.
 468    - checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only
 469        applicable for store_backend s3.
 470    - data_asset_name - name of the data asset to consider when configuring the great
 471        expectations' data source.
 472    - expectation_suite_name - name to consider for great expectations' suite.
 473    - result_sink_db_table - db.table_name indicating the database and table in which
 474        to save the results of the DQ process.
 475    - result_sink_location - file system location in which to save the results of the
 476        DQ process.
 477    - data_product_name - name of the data product.
 478    - result_sink_partitions - the list of partitions to consider.
 479    - result_sink_format - format of the result table (e.g. delta, parquet, kafka...).
 480    - result_sink_options - extra spark options for configuring the result sink.
 481        E.g: can be used to configure a Kafka sink if result_sink_format is kafka.
 482    - result_sink_explode - flag to determine if the output table/location should have
 483        the columns exploded (as True) or not (as False). Default: True.
 484    - result_sink_extra_columns - list of extra columns to be exploded (following
 485        the pattern "<name>.*") or columns to be selected. It is only used when
 486        result_sink_explode is set to True.
 487    - source - name of data source, to be easier to identify in analysis. If not
 488        specified, it is set as default <input_id>. This will be only used
 489        when result_sink_explode is set to True.
 490    - fail_on_error - whether to fail the algorithm if the validations of your data in
 491        the DQ process failed.
 492    - cache_df - whether to cache the dataframe before running the DQ process or not.
 493    - critical_functions - functions that should not fail. When this argument is
 494        defined, fail_on_error is nullified.
 495    - max_percentage_failure - percentage of failure that should be allowed.
 496        This argument has priority over both fail_on_error and critical_functions.
 497    """
 498
 499    spec_id: str
 500    input_id: str
 501    dq_type: str
 502    dq_functions: Optional[List[DQFunctionSpec]] = None
 503    dq_db_table: Optional[str] = None
 504    dq_table_table_filter: Optional[str] = None
 505    dq_table_extra_filters: Optional[str] = None
 506    execution_point: Optional[str] = None
 507    unexpected_rows_pk: Optional[List[str]] = None
 508    tbl_to_derive_pk: Optional[str] = None
 509    sort_processed_keys: Optional[bool] = False
 510    gx_result_format: Optional[str] = "COMPLETE"
 511    tag_source_data: Optional[bool] = False
 512    store_backend: str = DQDefaults.STORE_BACKEND.value
 513    local_fs_root_dir: Optional[str] = None
 514    data_docs_local_fs: Optional[str] = None
 515    bucket: Optional[str] = None
 516    data_docs_bucket: Optional[str] = None
 517    expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value
 518    validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value
 519    data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value
 520    checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value
 521    data_asset_name: Optional[str] = None
 522    expectation_suite_name: Optional[str] = None
 523    result_sink_db_table: Optional[str] = None
 524    result_sink_location: Optional[str] = None
 525    data_product_name: Optional[str] = None
 526    result_sink_partitions: Optional[List[str]] = None
 527    result_sink_format: str = OutputFormat.DELTAFILES.value
 528    result_sink_options: Optional[dict] = None
 529    result_sink_explode: bool = True
 530    result_sink_extra_columns: Optional[List[str]] = None
 531    source: Optional[str] = None
 532    fail_on_error: bool = True
 533    cache_df: bool = False
 534    critical_functions: Optional[List[DQFunctionSpec]] = None
 535    max_percentage_failure: Optional[float] = None
 536
 537
 538@dataclass
 539class MergeOptions(object):
 540    """Options for a merge operation.
 541
 542    - merge_predicate: predicate to apply to the merge operation so that we can
 543        check if a new record corresponds to a record already included in the
 544        historical data.
 545    - insert_only: indicates if the merge should only insert data (e.g., deduplicate
 546        scenarios).
 547    - delete_predicate: predicate to apply to the delete operation.
 548    - update_predicate: predicate to apply to the update operation.
 549    - insert_predicate: predicate to apply to the insert operation.
 550    - update_column_set: rules to apply to the update operation which allows to
 551        set the value for each column to be updated.
 552        (e.g. {"data": "new.data", "count": "current.count + 1"} )
 553    - insert_column_set: rules to apply to the insert operation which allows to
 554        set the value for each column to be inserted.
 555        (e.g. {"date": "updates.date", "count": "1"} )
 556    """
 557
 558    merge_predicate: str
 559    insert_only: bool = False
 560    delete_predicate: Optional[str] = None
 561    update_predicate: Optional[str] = None
 562    insert_predicate: Optional[str] = None
 563    update_column_set: Optional[dict] = None
 564    insert_column_set: Optional[dict] = None
 565
 566
 567@dataclass
 568class OutputSpec(object):
 569    """Specification of an algorithm output.
 570
 571    This is very aligned with the way the execution environment connects to the output
 572    systems (e.g., spark outputs).
 573
 574    - spec_id: id of the output specification.
 575    - input_id: id of the corresponding input specification.
 576    - write_type: type of write operation.
 577    - data_format: format of the output. Defaults to DELTA.
 578    - db_table: table name in the form of `<db>.<table>`.
 579    - location: uri that identifies from where to write data in the specified format.
 580    - partitions: list of partition input_col names.
 581    - merge_opts: options to apply to the merge operation.
 582    - streaming_micro_batch_transformers: transformers to invoke for each streaming
 583        micro batch, before writing (i.e., in Spark's foreachBatch structured
 584        streaming function). Note: the lakehouse engine manages this for you, so
 585        you don't have to manually specify streaming transformations here, so we don't
 586        advise you to manually specify transformations through this parameter. Supply
 587        them as regular transformers in the transform_specs sections of an ACON.
 588    - streaming_once: if the streaming query is to be executed just once, or not,
 589        generating just one micro batch.
 590    - streaming_processing_time: if streaming query is to be kept alive, this indicates
 591        the processing time of each micro batch.
 592    - streaming_available_now: if set to True, set a trigger that processes all
 593        available data in multiple batches then terminates the query.
 594        When using streaming, this is the default trigger that the lakehouse-engine will
 595        use, unless you configure a different one.
 596    - streaming_continuous: set a trigger that runs a continuous query with a given
 597        checkpoint interval.
 598    - streaming_await_termination: whether to wait (True) for the termination of the
 599        streaming query (e.g. timeout or exception) or not (False). Default: True.
 600    - streaming_await_termination_timeout: a timeout to set to the
 601        streaming_await_termination. Default: None.
 602    - with_batch_id: whether to include the streaming batch id in the final data,
 603        or not. It only takes effect in streaming mode.
 604    - options: dict with other relevant options according to the execution environment
 605        (e.g., spark) possible outputs.  E.g.,: JDBC options, checkpoint location for
 606        streaming, etc.
 607    - streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers
 608        but for the DQ functions to be executed. Used internally by the lakehouse
 609        engine, so you don't have to supply DQ functions through this parameter. Use the
 610        dq_specs of the acon instead.
 611    """
 612
 613    spec_id: str
 614    input_id: str
 615    write_type: str
 616    data_format: str = OutputFormat.DELTAFILES.value
 617    db_table: Optional[str] = None
 618    location: Optional[str] = None
 619    merge_opts: Optional[MergeOptions] = None
 620    partitions: Optional[List[str]] = None
 621    streaming_micro_batch_transformers: Optional[List[TransformerSpec]] = None
 622    streaming_once: Optional[bool] = None
 623    streaming_processing_time: Optional[str] = None
 624    streaming_available_now: bool = True
 625    streaming_continuous: Optional[str] = None
 626    streaming_await_termination: bool = True
 627    streaming_await_termination_timeout: Optional[int] = None
 628    with_batch_id: bool = False
 629    options: Optional[dict] = None
 630    streaming_micro_batch_dq_processors: Optional[List[DQSpec]] = None
 631
 632
 633@dataclass
 634class TerminatorSpec(object):
 635    """Terminator Specification.
 636
 637    I.e., the specification that defines a terminator operation to be executed. Examples
 638    are compute statistics, vacuum, optimize, etc.
 639
 640    - function: terminator function to execute.
 641    - args: arguments of the terminator function.
 642    - input_id: id of the corresponding output specification (Optional).
 643    """
 644
 645    function: str
 646    args: Optional[dict] = None
 647    input_id: Optional[str] = None
 648
 649
 650@dataclass
 651class ReconciliatorSpec(object):
 652    """Reconciliator Specification.
 653
 654    - metrics: list of metrics in the form of:
 655        [{
 656            metric: name of the column present in both truth and current datasets,
 657            aggregation: sum, avg, max, min, ...,
 658            type: percentage or absolute,
 659            yellow: value,
 660            red: value
 661        }].
 662    - recon_type: reconciliation type (percentage or absolute). Percentage calculates
 663        the difference between truth and current results as a percentage (x-y/x), and
 664        absolute calculates the raw difference (x - y).
 665    - truth_input_spec: input specification of the truth data.
 666    - current_input_spec: input specification of the current results data
 667    - truth_preprocess_query: additional query on top of the truth input data to
 668        preprocess the truth data before it gets fueled into the reconciliation process.
 669        Important note: you need to assume that the data out of
 670        the truth_input_spec is referencable by a table called 'truth'.
 671    - truth_preprocess_query_args: optional dict having the functions/transformations to
 672        apply on top of the truth_preprocess_query and respective arguments. Note: cache
 673        is being applied on the Dataframe, by default. For turning the default behavior
 674        off, pass `"truth_preprocess_query_args": []`.
 675    - current_preprocess_query: additional query on top of the current results input
 676        data to preprocess the current results data before it gets fueled into the
 677        reconciliation process. Important note: you need to assume that the data out of
 678        the current_results_input_spec is referencable by a table called 'current'.
 679    - current_preprocess_query_args: optional dict having the
 680        functions/transformations to apply on top of the current_preprocess_query
 681        and respective arguments. Note: cache is being applied on the Dataframe,
 682        by default. For turning the default behavior off, pass
 683        `"current_preprocess_query_args": []`.
 684    - ignore_empty_df: optional boolean, to ignore the recon process if source & target
 685       dataframes are empty, recon will exit success code (passed)
 686    """
 687
 688    metrics: List[dict]
 689    truth_input_spec: InputSpec
 690    current_input_spec: InputSpec
 691    truth_preprocess_query: Optional[str] = None
 692    truth_preprocess_query_args: Optional[List[dict]] = None
 693    current_preprocess_query: Optional[str] = None
 694    current_preprocess_query_args: Optional[List[dict]] = None
 695    ignore_empty_df: Optional[bool] = False
 696
 697
 698@dataclass
 699class DQValidatorSpec(object):
 700    """Data Quality Validator Specification.
 701
 702    - input_spec: input specification of the data to be checked/validated.
 703    - dq_spec: data quality specification.
 704    - restore_prev_version: specify if, having
 705        delta table/files as input, they should be restored to the
 706        previous version if the data quality process fails. Note: this
 707        is only considered if fail_on_error is kept as True.
 708    """
 709
 710    input_spec: InputSpec
 711    dq_spec: DQSpec
 712    restore_prev_version: Optional[bool] = False
 713
 714
 715class SQLDefinitions(Enum):
 716    """SQL definitions statements."""
 717
 718    compute_table_stats = "ANALYZE TABLE {} COMPUTE STATISTICS"
 719    drop_table_stmt = "DROP TABLE IF EXISTS"
 720    drop_view_stmt = "DROP VIEW IF EXISTS"
 721    truncate_stmt = "TRUNCATE TABLE"
 722    describe_stmt = "DESCRIBE TABLE"
 723    optimize_stmt = "OPTIMIZE"
 724    show_tbl_props_stmt = "SHOW TBLPROPERTIES"
 725    delete_where_stmt = "DELETE FROM {} WHERE {}"
 726
 727
 728class FileManagerAPIKeys(Enum):
 729    """File Manager s3 api keys."""
 730
 731    CONTENTS = "Contents"
 732    KEY = "Key"
 733    CONTINUATION = "NextContinuationToken"
 734    BUCKET = "Bucket"
 735    OBJECTS = "Objects"
 736
 737
 738@dataclass
 739class SensorSpec(object):
 740    """Sensor Specification.
 741
 742    - sensor_id: sensor id.
 743    - assets: a list of assets that are considered as available to
 744        consume downstream after this sensor has status
 745        PROCESSED_NEW_DATA.
 746    - control_db_table_name: db.table to store sensor metadata.
 747    - input_spec: input specification of the source to be checked for new data.
 748    - preprocess_query: SQL query to transform/filter the result from the
 749        upstream. Consider that we should refer to 'new_data' whenever
 750        we are referring to the input of the sensor. E.g.:
 751            "SELECT dummy_col FROM new_data WHERE ..."
 752    - checkpoint_location: optional location to store checkpoints to resume
 753        from. These checkpoints use the same as Spark checkpoint strategy.
 754        For Spark readers that do not support checkpoints, use the
 755        preprocess_query parameter to form a SQL query to filter the result
 756        from the upstream accordingly.
 757    - fail_on_empty_result: if the sensor should throw an error if there is no new
 758        data in the upstream. Default: True.
 759    """
 760
 761    sensor_id: str
 762    assets: List[str]
 763    control_db_table_name: str
 764    input_spec: InputSpec
 765    preprocess_query: Optional[str]
 766    checkpoint_location: Optional[str]
 767    fail_on_empty_result: bool = True
 768
 769    @classmethod
 770    def create_from_acon(cls, acon: dict):  # type: ignore
 771        """Create SensorSpec from acon.
 772
 773        Args:
 774            acon: sensor ACON.
 775        """
 776        checkpoint_location = acon.get("base_checkpoint_location")
 777        if checkpoint_location:
 778            checkpoint_location = (
 779                f"{checkpoint_location.rstrip('/')}/lakehouse_engine/"
 780                f"sensors/{acon['sensor_id']}"
 781            )
 782
 783        return cls(
 784            sensor_id=acon["sensor_id"],
 785            assets=acon["assets"],
 786            control_db_table_name=acon["control_db_table_name"],
 787            input_spec=InputSpec(**acon["input_spec"]),
 788            preprocess_query=acon.get("preprocess_query"),
 789            checkpoint_location=checkpoint_location,
 790            fail_on_empty_result=acon.get("fail_on_empty_result", True),
 791        )
 792
 793
 794class SensorStatus(Enum):
 795    """Status for a sensor."""
 796
 797    ACQUIRED_NEW_DATA = "ACQUIRED_NEW_DATA"
 798    PROCESSED_NEW_DATA = "PROCESSED_NEW_DATA"
 799
 800
 801SENSOR_SCHEMA = StructType(
 802    [
 803        StructField("sensor_id", StringType(), False),
 804        StructField("assets", ArrayType(StringType(), False), True),
 805        StructField("status", StringType(), False),
 806        StructField("status_change_timestamp", TimestampType(), False),
 807        StructField("checkpoint_location", StringType(), True),
 808        StructField("upstream_key", StringType(), True),
 809        StructField("upstream_value", StringType(), True),
 810    ]
 811)
 812
 813SENSOR_UPDATE_SET: dict = {
 814    "sensors.sensor_id": "updates.sensor_id",
 815    "sensors.status": "updates.status",
 816    "sensors.status_change_timestamp": "updates.status_change_timestamp",
 817}
 818
 819SENSOR_ALLOWED_DATA_FORMATS = {
 820    ReadType.STREAMING.value: [InputFormat.KAFKA.value, *FILE_INPUT_FORMATS],
 821    ReadType.BATCH.value: [
 822        InputFormat.DELTAFILES.value,
 823        InputFormat.JDBC.value,
 824    ],
 825}
 826
 827
 828class SAPLogchain(Enum):
 829    """Defaults used on consuming data from SAP Logchain."""
 830
 831    DBTABLE = "SAPPHA.RSPCLOGCHAIN"
 832    GREEN_STATUS = "G"
 833    ENGINE_TABLE = "sensor_new_data"
 834
 835
 836class RestoreType(Enum):
 837    """Archive types."""
 838
 839    BULK = "Bulk"
 840    STANDARD = "Standard"
 841    EXPEDITED = "Expedited"
 842
 843    @classmethod
 844    def values(cls):  # type: ignore
 845        """Generates a list containing all enum values.
 846
 847        Return:
 848            A list with all enum values.
 849        """
 850        return (c.value for c in cls)
 851
 852    @classmethod
 853    def exists(cls, restore_type: str) -> bool:
 854        """Checks if the restore type exists in the enum values.
 855
 856        Args:
 857            restore_type: restore type to check if exists.
 858
 859        Return:
 860            If the restore type exists in our enum.
 861        """
 862        return restore_type in cls.values()
 863
 864
 865class RestoreStatus(Enum):
 866    """Archive types."""
 867
 868    NOT_STARTED = "not_started"
 869    ONGOING = "ongoing"
 870    RESTORED = "restored"
 871
 872
 873ARCHIVE_STORAGE_CLASS = [
 874    "GLACIER",
 875    "DEEP_ARCHIVE",
 876    "GLACIER_IR",
 877]
 878
 879
 880class SQLParser(Enum):
 881    """Defaults to use for parsing."""
 882
 883    DOUBLE_QUOTES = '"'
 884    SINGLE_QUOTES = "'"
 885    BACKSLASH = "\\"
 886    SINGLE_TRACE = "-"
 887    DOUBLE_TRACES = "--"
 888    SLASH = "/"
 889    OPENING_MULTIPLE_LINE_COMMENT = "/*"
 890    CLOSING_MULTIPLE_LINE_COMMENT = "*/"
 891    PARAGRAPH = "\n"
 892    STAR = "*"
 893
 894    MULTIPLE_LINE_COMMENT = [
 895        OPENING_MULTIPLE_LINE_COMMENT,
 896        CLOSING_MULTIPLE_LINE_COMMENT,
 897    ]
 898
 899
 900class GABDefaults(Enum):
 901    """Defaults used on the GAB process."""
 902
 903    DATE_FORMAT = "%Y-%m-%d"
 904    DIMENSIONS_DEFAULT_COLUMNS = ["from_date", "to_date"]
 905    DEFAULT_DIMENSION_CALENDAR_TABLE = "dim_calendar"
 906    DEFAULT_LOOKUP_QUERY_BUILDER_TABLE = "lkp_query_builder"
 907
 908
 909class GABStartOfWeek(Enum):
 910    """Representation of start of week values on GAB."""
 911
 912    SUNDAY = "S"
 913    MONDAY = "M"
 914
 915    @classmethod
 916    def get_start_of_week(cls) -> dict:
 917        """Get the start of week enum as a dict.
 918
 919        Returns:
 920            dict containing all enum entries as `{name:value}`.
 921        """
 922        return {
 923            start_of_week.name: start_of_week.value
 924            for start_of_week in list(GABStartOfWeek)
 925        }
 926
 927    @classmethod
 928    def get_values(cls) -> set[str]:
 929        """Get the start of week enum values as set.
 930
 931        Returns:
 932            set containing all possible values `{value}`.
 933        """
 934        return {start_of_week.value for start_of_week in list(GABStartOfWeek)}
 935
 936
 937@dataclass
 938class GABSpec(object):
 939    """Gab Specification.
 940
 941    query_label_filter: query use-case label to execute.
 942    queue_filter: queue to execute the job.
 943    cadence_filter: selected cadences to build the asset.
 944    target_database: target database to write.
 945    curr_date: current date.
 946    start_date: period start date.
 947    end_date: period end date.
 948    rerun_flag: rerun flag.
 949    target_table: target table to write.
 950    source_database: source database.
 951    gab_base_path: base path to read the use cases.
 952    lookup_table: gab configuration table.
 953    calendar_table: gab calendar table.
 954    """
 955
 956    query_label_filter: list[str]
 957    queue_filter: list[str]
 958    cadence_filter: list[str]
 959    target_database: str
 960    current_date: datetime
 961    start_date: datetime
 962    end_date: datetime
 963    rerun_flag: str
 964    target_table: str
 965    source_database: str
 966    gab_base_path: str
 967    lookup_table: str
 968    calendar_table: str
 969
 970    @classmethod
 971    def create_from_acon(cls, acon: dict):  # type: ignore
 972        """Create GabSpec from acon.
 973
 974        Args:
 975            acon: gab ACON.
 976        """
 977        lookup_table = f"{acon['source_database']}." + (
 978            acon.get(
 979                "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value
 980            )
 981        )
 982
 983        calendar_table = f"{acon['source_database']}." + (
 984            acon.get(
 985                "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value
 986            )
 987        )
 988
 989        def format_date(date_to_format: Union[datetime, str]) -> datetime:
 990            if isinstance(date_to_format, str):
 991                return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value)
 992            else:
 993                return date_to_format
 994
 995        return cls(
 996            query_label_filter=acon["query_label_filter"],
 997            queue_filter=acon["queue_filter"],
 998            cadence_filter=acon["cadence_filter"],
 999            target_database=acon["target_database"],
1000            current_date=datetime.now(),
1001            start_date=format_date(acon["start_date"]),
1002            end_date=format_date(acon["end_date"]),
1003            rerun_flag=acon["rerun_flag"],
1004            target_table=acon["target_table"],
1005            source_database=acon["source_database"],
1006            gab_base_path=acon["gab_base_path"],
1007            lookup_table=lookup_table,
1008            calendar_table=calendar_table,
1009        )
1010
1011
1012class GABCadence(Enum):
1013    """Representation of the supported cadences on GAB."""
1014
1015    DAY = 1
1016    WEEK = 2
1017    MONTH = 3
1018    QUARTER = 4
1019    YEAR = 5
1020
1021    @classmethod
1022    def get_ordered_cadences(cls) -> dict:
1023        """Get the cadences ordered by the value.
1024
1025        Returns:
1026            dict containing ordered cadences as `{name:value}`.
1027        """
1028        cadences = list(GABCadence)
1029        return {
1030            cadence.name: cadence.value
1031            for cadence in sorted(cadences, key=lambda gab_cadence: gab_cadence.value)
1032        }
1033
1034    @classmethod
1035    def get_cadences(cls) -> set[str]:
1036        """Get the cadences values as set.
1037
1038        Returns:
1039            set containing all possible cadence values as `{value}`.
1040        """
1041        return {cadence.name for cadence in list(GABCadence)}
1042
1043    @classmethod
1044    def order_cadences(cls, cadences_to_order: list[str]) -> list[str]:
1045        """Order a list of cadences by value.
1046
1047        Returns:
1048            ordered set containing the received cadences.
1049        """
1050        return sorted(
1051            cadences_to_order,
1052            key=lambda item: cls.get_ordered_cadences().get(item),  # type: ignore
1053        )
1054
1055
1056class GABKeys:
1057    """Constants used to update pre-configured gab dict key."""
1058
1059    JOIN_SELECT = "join_select"
1060    PROJECT_START = "project_start"
1061    PROJECT_END = "project_end"
1062
1063
1064class GABReplaceableKeys:
1065    """Constants used to replace pre-configured gab dict values."""
1066
1067    CADENCE = "${cad}"
1068    DATE_COLUMN = "${date_column}"
1069    CONFIG_WEEK_START = "${config_week_start}"
1070    RECONCILIATION_CADENCE = "${rec_cadence}"
1071
1072
1073class GABCombinedConfiguration(Enum):
1074    """GAB combined configuration.
1075
1076    Based on the use case configuration return the values to override in the SQL file.
1077    This enum aims to exhaustively map each combination of `cadence`, `reconciliation`,
1078        `week_start` and `snap_flag` return the corresponding values `join_select`,
1079        `project_start` and `project_end` to replace this values in the stages SQL file.
1080
1081    Return corresponding configuration (join_select, project_start, project_end) for
1082        each combination (cadence x recon x week_start x snap_flag).
1083    """
1084
1085    _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE = (
1086        "date(date_trunc('${cad}',${date_column}))"
1087    )
1088    _DEFAULT_PROJECT_START = "df_cal.cadence_start_date"
1089    _DEFAULT_PROJECT_END = "df_cal.cadence_end_date"
1090
1091    COMBINED_CONFIGURATION = {
1092        # Combination of:
1093        # - cadence: `DAY`
1094        # - reconciliation_window: `DAY`, `WEEK`, `MONTH`, `QUARTER`, `YEAR`
1095        # - week_start: `S`, `M`
1096        # - snapshot_flag: `Y`, `N`
1097        1: {
1098            "cadence": GABCadence.DAY.name,
1099            "recon": GABCadence.get_cadences(),
1100            "week_start": GABStartOfWeek.get_values(),
1101            "snap_flag": {"Y", "N"},
1102            "join_select": "",
1103            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1104            "project_end": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1105        },
1106        # Combination of:
1107        # - cadence: `WEEK`
1108        # - reconciliation_window: `DAY`
1109        # - week_start: `S`, `M`
1110        # - snapshot_flag: `Y`
1111        2: {
1112            "cadence": GABCadence.WEEK.name,
1113            "recon": GABCadence.DAY.name,
1114            "week_start": GABStartOfWeek.get_values(),
1115            "snap_flag": "Y",
1116            "join_select": """
1117            select distinct case
1118                when '${config_week_start}' = 'Monday' then weekstart_mon
1119                when '${config_week_start}' = 'Sunday' then weekstart_sun
1120            end as cadence_start_date,
1121            calendar_date as cadence_end_date
1122        """,
1123            "project_start": _DEFAULT_PROJECT_START,
1124            "project_end": _DEFAULT_PROJECT_END,
1125        },
1126        # Combination of:
1127        # - cadence: `WEEK`
1128        # - reconciliation_window: `DAY, `MONTH`, `QUARTER`, `YEAR`
1129        # - week_start: `M`
1130        # - snapshot_flag: `Y`, `N`
1131        3: {
1132            "cadence": GABCadence.WEEK.name,
1133            "recon": {
1134                GABCadence.DAY.name,
1135                GABCadence.MONTH.name,
1136                GABCadence.QUARTER.name,
1137                GABCadence.YEAR.name,
1138            },
1139            "week_start": "M",
1140            "snap_flag": {"Y", "N"},
1141            "join_select": """
1142            select distinct case
1143                when '${config_week_start}'  = 'Monday' then weekstart_mon
1144                when '${config_week_start}' = 'Sunday' then weekstart_sun
1145            end as cadence_start_date,
1146            case
1147                when '${config_week_start}' = 'Monday' then weekend_mon
1148                when '${config_week_start}' = 'Sunday' then weekend_sun
1149            end as cadence_end_date""",
1150            "project_start": _DEFAULT_PROJECT_START,
1151            "project_end": _DEFAULT_PROJECT_END,
1152        },
1153        4: {
1154            "cadence": GABCadence.MONTH.name,
1155            "recon": GABCadence.DAY.name,
1156            "week_start": GABStartOfWeek.get_values(),
1157            "snap_flag": "Y",
1158            "join_select": """
1159            select distinct month_start as cadence_start_date,
1160            calendar_date as cadence_end_date
1161        """,
1162            "project_start": _DEFAULT_PROJECT_START,
1163            "project_end": _DEFAULT_PROJECT_END,
1164        },
1165        5: {
1166            "cadence": GABCadence.MONTH.name,
1167            "recon": GABCadence.WEEK.name,
1168            "week_start": GABStartOfWeek.MONDAY.value,
1169            "snap_flag": "Y",
1170            "join_select": """
1171            select distinct month_start as cadence_start_date,
1172            case
1173                when date(
1174                    date_trunc('MONTH',add_months(calendar_date, 1))
1175                )-1 < weekend_mon
1176                    then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1
1177                else weekend_mon
1178            end as cadence_end_date""",
1179            "project_start": _DEFAULT_PROJECT_START,
1180            "project_end": _DEFAULT_PROJECT_END,
1181        },
1182        6: {
1183            "cadence": GABCadence.MONTH.name,
1184            "recon": GABCadence.WEEK.name,
1185            "week_start": GABStartOfWeek.SUNDAY.value,
1186            "snap_flag": "Y",
1187            "join_select": """
1188            select distinct month_start as cadence_start_date,
1189            case
1190                when date(
1191                    date_trunc('MONTH',add_months(calendar_date, 1))
1192                )-1 < weekend_sun
1193                    then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1
1194                else weekend_sun
1195            end as cadence_end_date""",
1196            "project_start": _DEFAULT_PROJECT_START,
1197            "project_end": _DEFAULT_PROJECT_END,
1198        },
1199        7: {
1200            "cadence": GABCadence.MONTH.name,
1201            "recon": GABCadence.get_cadences(),
1202            "week_start": GABStartOfWeek.get_values(),
1203            "snap_flag": {"Y", "N"},
1204            "join_select": "",
1205            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1206            "project_end": "date(date_trunc('MONTH',add_months(${date_column}, 1)))-1",
1207        },
1208        8: {
1209            "cadence": GABCadence.QUARTER.name,
1210            "recon": GABCadence.DAY.name,
1211            "week_start": GABStartOfWeek.get_values(),
1212            "snap_flag": "Y",
1213            "join_select": """
1214            select distinct quarter_start as cadence_start_date,
1215            calendar_date as cadence_end_date
1216        """,
1217            "project_start": _DEFAULT_PROJECT_START,
1218            "project_end": _DEFAULT_PROJECT_END,
1219        },
1220        9: {
1221            "cadence": GABCadence.QUARTER.name,
1222            "recon": GABCadence.WEEK.name,
1223            "week_start": GABStartOfWeek.MONDAY.value,
1224            "snap_flag": "Y",
1225            "join_select": """
1226            select distinct quarter_start as cadence_start_date,
1227            case
1228                when weekend_mon > date(
1229                    date_trunc('QUARTER',add_months(calendar_date, 3))
1230                )-1
1231                    then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1
1232                else weekend_mon
1233            end as cadence_end_date""",
1234            "project_start": _DEFAULT_PROJECT_START,
1235            "project_end": _DEFAULT_PROJECT_END,
1236        },
1237        10: {
1238            "cadence": GABCadence.QUARTER.name,
1239            "recon": GABCadence.WEEK.name,
1240            "week_start": GABStartOfWeek.SUNDAY.value,
1241            "snap_flag": "Y",
1242            "join_select": """
1243            select distinct quarter_start as cadence_start_date,
1244            case
1245                when weekend_sun > date(
1246                    date_trunc('QUARTER',add_months(calendar_date, 3))
1247                )-1
1248                    then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1
1249                else weekend_sun
1250            end as cadence_end_date""",
1251            "project_start": _DEFAULT_PROJECT_START,
1252            "project_end": _DEFAULT_PROJECT_END,
1253        },
1254        11: {
1255            "cadence": GABCadence.QUARTER.name,
1256            "recon": GABCadence.MONTH.name,
1257            "week_start": GABStartOfWeek.get_values(),
1258            "snap_flag": "Y",
1259            "join_select": """
1260            select distinct quarter_start as cadence_start_date,
1261            month_end as cadence_end_date
1262        """,
1263            "project_start": _DEFAULT_PROJECT_START,
1264            "project_end": _DEFAULT_PROJECT_END,
1265        },
1266        12: {
1267            "cadence": GABCadence.QUARTER.name,
1268            "recon": GABCadence.YEAR.name,
1269            "week_start": GABStartOfWeek.get_values(),
1270            "snap_flag": "N",
1271            "join_select": "",
1272            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1273            "project_end": """
1274            date(
1275                date_trunc(
1276                    '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 3)
1277                )
1278            )-1
1279        """,
1280        },
1281        13: {
1282            "cadence": GABCadence.QUARTER.name,
1283            "recon": GABCadence.get_cadences(),
1284            "week_start": GABStartOfWeek.get_values(),
1285            "snap_flag": "N",
1286            "join_select": "",
1287            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1288            "project_end": """
1289            date(
1290                date_trunc(
1291                    '${cad}',add_months( date(date_trunc('${cad}',${date_column})), 3)
1292                )
1293            )-1
1294        """,
1295        },
1296        14: {
1297            "cadence": GABCadence.YEAR.name,
1298            "recon": GABCadence.WEEK.name,
1299            "week_start": GABStartOfWeek.MONDAY.value,
1300            "snap_flag": "Y",
1301            "join_select": """
1302            select distinct year_start as cadence_start_date,
1303            case
1304                when weekend_mon > date(
1305                    date_trunc('YEAR',add_months(calendar_date, 12))
1306                )-1
1307                    then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1
1308                else weekend_mon
1309            end as cadence_end_date""",
1310            "project_start": _DEFAULT_PROJECT_START,
1311            "project_end": _DEFAULT_PROJECT_END,
1312        },
1313        15: {
1314            "cadence": GABCadence.YEAR.name,
1315            "recon": GABCadence.WEEK.name,
1316            "week_start": GABStartOfWeek.SUNDAY.value,
1317            "snap_flag": "Y",
1318            "join_select": """
1319            select distinct year_start as cadence_start_date,
1320            case
1321                when weekend_sun > date(
1322                    date_trunc('YEAR',add_months(calendar_date, 12))
1323                )-1
1324                    then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1
1325                else weekend_sun
1326            end as cadence_end_date""",
1327            "project_start": _DEFAULT_PROJECT_START,
1328            "project_end": _DEFAULT_PROJECT_END,
1329        },
1330        16: {
1331            "cadence": GABCadence.YEAR.name,
1332            "recon": GABCadence.get_cadences(),
1333            "week_start": GABStartOfWeek.get_values(),
1334            "snap_flag": "N",
1335            "inverse_flag": "Y",
1336            "join_select": "",
1337            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1338            "project_end": """
1339            date(
1340                date_trunc(
1341                    '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 12)
1342                )
1343            )-1
1344        """,
1345        },
1346        17: {
1347            "cadence": GABCadence.YEAR.name,
1348            "recon": {
1349                GABCadence.DAY.name,
1350                GABCadence.MONTH.name,
1351                GABCadence.QUARTER.name,
1352            },
1353            "week_start": GABStartOfWeek.get_values(),
1354            "snap_flag": "Y",
1355            "join_select": """
1356            select distinct year_start as cadence_start_date,
1357            case
1358                when '${rec_cadence}' = 'DAY' then calendar_date
1359                when '${rec_cadence}' = 'MONTH' then month_end
1360                when '${rec_cadence}' = 'QUARTER' then quarter_end
1361            end as cadence_end_date
1362        """,
1363            "project_start": _DEFAULT_PROJECT_START,
1364            "project_end": _DEFAULT_PROJECT_END,
1365        },
1366        18: {
1367            "cadence": GABCadence.get_cadences(),
1368            "recon": GABCadence.get_cadences(),
1369            "week_start": GABStartOfWeek.get_values(),
1370            "snap_flag": {"Y", "N"},
1371            "join_select": """
1372            select distinct
1373            case
1374                when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'
1375                    then weekstart_mon
1376                when  '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'
1377                    then weekstart_sun
1378                else
1379                    date(date_trunc('${cad}',calendar_date))
1380            end as cadence_start_date,
1381            case
1382                when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'
1383                    then weekend_mon
1384                when  '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'
1385                    then weekend_sun
1386                when '${cad}' = 'DAY'
1387                    then date(date_trunc('${cad}',calendar_date))
1388                when '${cad}' = 'MONTH'
1389                    then date(
1390                        date_trunc(
1391                            'MONTH',
1392                            add_months(date(date_trunc('${cad}',calendar_date)), 1)
1393                        )
1394                    )-1
1395                when '${cad}' = 'QUARTER'
1396                    then date(
1397                        date_trunc(
1398                            'QUARTER',
1399                            add_months(date(date_trunc('${cad}',calendar_date)) , 3)
1400                        )
1401                    )-1
1402                when '${cad}' = 'YEAR'
1403                    then date(
1404                        date_trunc(
1405                            'YEAR',
1406                            add_months(date(date_trunc('${cad}',calendar_date)), 12)
1407                        )
1408                    )-1
1409            end as cadence_end_date
1410        """,
1411            "project_start": _DEFAULT_PROJECT_START,
1412            "project_end": _DEFAULT_PROJECT_END,
1413        },
1414    }
class CollectEngineUsage(enum.Enum):
20class CollectEngineUsage(Enum):
21    """Options for collecting engine usage stats.
22
23    - enabled, enables the collection and storage of Lakehouse Engine
24    usage statistics for any environment.
25    - prod_only, enables the collection and storage of Lakehouse Engine
26    usage statistics for production environment only.
27    - disabled, disables the collection and storage of Lakehouse Engine
28    usage statistics, for all environments.
29    """
30
31    ENABLED = "enabled"
32    PROD_ONLY = "prod_only"
33    DISABLED = "disabled"

Options for collecting engine usage stats.

  • enabled, enables the collection and storage of Lakehouse Engine usage statistics for any environment.
  • prod_only, enables the collection and storage of Lakehouse Engine usage statistics for production environment only.
  • disabled, disables the collection and storage of Lakehouse Engine usage statistics, for all environments.
ENABLED = <CollectEngineUsage.ENABLED: 'enabled'>
PROD_ONLY = <CollectEngineUsage.PROD_ONLY: 'prod_only'>
DISABLED = <CollectEngineUsage.DISABLED: 'disabled'>
Inherited Members
enum.Enum
name
value
@dataclass
class EngineConfig:
36@dataclass
37class EngineConfig(object):
38    """Definitions that can come from the Engine Config file.
39
40    - dq_bucket: S3 prod bucket used to store data quality related artifacts.
41    - dq_dev_bucket: S3 dev bucket used to store data quality related artifacts.
42    - notif_disallowed_email_servers: email servers not allowed to be used
43        for sending notifications.
44    - engine_usage_path: path where the engine prod usage stats are stored.
45    - engine_dev_usage_path: path where the engine dev usage stats are stored.
46    - collect_engine_usage: whether to enable the collection of lakehouse
47        engine usage stats or not.
48    - dq_functions_column_list: list of columns to be added to the meta argument
49        of GX when using PRISMA.
50    """
51
52    dq_bucket: Optional[str] = None
53    dq_dev_bucket: Optional[str] = None
54    notif_disallowed_email_servers: Optional[list] = None
55    engine_usage_path: Optional[str] = None
56    engine_dev_usage_path: Optional[str] = None
57    collect_engine_usage: str = CollectEngineUsage.ENABLED.value
58    dq_functions_column_list: Optional[list] = None

Definitions that can come from the Engine Config file.

  • dq_bucket: S3 prod bucket used to store data quality related artifacts.
  • dq_dev_bucket: S3 dev bucket used to store data quality related artifacts.
  • notif_disallowed_email_servers: email servers not allowed to be used for sending notifications.
  • engine_usage_path: path where the engine prod usage stats are stored.
  • engine_dev_usage_path: path where the engine dev usage stats are stored.
  • collect_engine_usage: whether to enable the collection of lakehouse engine usage stats or not.
  • dq_functions_column_list: list of columns to be added to the meta argument of GX when using PRISMA.
EngineConfig( dq_bucket: Optional[str] = None, dq_dev_bucket: Optional[str] = None, notif_disallowed_email_servers: Optional[list] = None, engine_usage_path: Optional[str] = None, engine_dev_usage_path: Optional[str] = None, collect_engine_usage: str = 'enabled', dq_functions_column_list: Optional[list] = None)
dq_bucket: Optional[str] = None
dq_dev_bucket: Optional[str] = None
notif_disallowed_email_servers: Optional[list] = None
engine_usage_path: Optional[str] = None
engine_dev_usage_path: Optional[str] = None
collect_engine_usage: str = 'enabled'
dq_functions_column_list: Optional[list] = None
class EngineStats(enum.Enum):
61class EngineStats(Enum):
62    """Definitions for collection of Lakehouse Engine Stats.
63
64    .. note::
65        Note: whenever the value comes from a key inside a Spark Config
66        that returns an array, it can be specified with a '#' so that it
67        is adequately processed.
68    """
69
70    CLUSTER_USAGE_TAGS = "spark.databricks.clusterUsageTags"
71    DEF_SPARK_CONFS = {
72        "dp_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#accountName",
73        "environment": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#environment",
74        "workspace_id": f"{CLUSTER_USAGE_TAGS}.orgId",
75        "job_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#JobId",
76        "job_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#RunName",
77        "run_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#ClusterName",
78    }

Definitions for collection of Lakehouse Engine Stats.

Note: whenever the value comes from a key inside a Spark Config that returns an array, it can be specified with a '#' so that it is adequately processed.

CLUSTER_USAGE_TAGS = <EngineStats.CLUSTER_USAGE_TAGS: 'spark.databricks.clusterUsageTags'>
DEF_SPARK_CONFS = <EngineStats.DEF_SPARK_CONFS: {'dp_name': 'spark.databricks.clusterUsageTags.clusterAllTags#accountName', 'environment': 'spark.databricks.clusterUsageTags.clusterAllTags#environment', 'workspace_id': 'spark.databricks.clusterUsageTags.orgId', 'job_id': 'spark.databricks.clusterUsageTags.clusterAllTags#JobId', 'job_name': 'spark.databricks.clusterUsageTags.clusterAllTags#RunName', 'run_id': 'spark.databricks.clusterUsageTags.clusterAllTags#ClusterName'}>
Inherited Members
enum.Enum
name
value
class InputFormat(enum.Enum):
 81class InputFormat(Enum):
 82    """Formats of algorithm input."""
 83
 84    JDBC = "jdbc"
 85    AVRO = "avro"
 86    JSON = "json"
 87    CSV = "csv"
 88    PARQUET = "parquet"
 89    DELTAFILES = "delta"
 90    CLOUDFILES = "cloudfiles"
 91    KAFKA = "kafka"
 92    SQL = "sql"
 93    SAP_BW = "sap_bw"
 94    SAP_B4 = "sap_b4"
 95    DATAFRAME = "dataframe"
 96    SFTP = "sftp"
 97
 98    @classmethod
 99    def values(cls):  # type: ignore
100        """Generates a list containing all enum values.
101
102        Return:
103            A list with all enum values.
104        """
105        return (c.value for c in cls)
106
107    @classmethod
108    def exists(cls, input_format: str) -> bool:
109        """Checks if the input format exists in the enum values.
110
111        Args:
112            input_format: format to check if exists.
113
114        Return:
115            If the input format exists in our enum.
116        """
117        return input_format in cls.values()

Formats of algorithm input.

JDBC = <InputFormat.JDBC: 'jdbc'>
AVRO = <InputFormat.AVRO: 'avro'>
JSON = <InputFormat.JSON: 'json'>
CSV = <InputFormat.CSV: 'csv'>
PARQUET = <InputFormat.PARQUET: 'parquet'>
DELTAFILES = <InputFormat.DELTAFILES: 'delta'>
CLOUDFILES = <InputFormat.CLOUDFILES: 'cloudfiles'>
KAFKA = <InputFormat.KAFKA: 'kafka'>
SQL = <InputFormat.SQL: 'sql'>
SAP_BW = <InputFormat.SAP_BW: 'sap_bw'>
SAP_B4 = <InputFormat.SAP_B4: 'sap_b4'>
DATAFRAME = <InputFormat.DATAFRAME: 'dataframe'>
SFTP = <InputFormat.SFTP: 'sftp'>
@classmethod
def values(cls):
 98    @classmethod
 99    def values(cls):  # type: ignore
100        """Generates a list containing all enum values.
101
102        Return:
103            A list with all enum values.
104        """
105        return (c.value for c in cls)

Generates a list containing all enum values.

Return:

A list with all enum values.

@classmethod
def exists(cls, input_format: str) -> bool:
107    @classmethod
108    def exists(cls, input_format: str) -> bool:
109        """Checks if the input format exists in the enum values.
110
111        Args:
112            input_format: format to check if exists.
113
114        Return:
115            If the input format exists in our enum.
116        """
117        return input_format in cls.values()

Checks if the input format exists in the enum values.

Arguments:
  • input_format: format to check if exists.
Return:

If the input format exists in our enum.

Inherited Members
enum.Enum
name
value
FILE_INPUT_FORMATS = ['avro', 'json', 'parquet', 'csv', 'delta', 'cloudfiles']
class OutputFormat(enum.Enum):
131class OutputFormat(Enum):
132    """Formats of algorithm output."""
133
134    JDBC = "jdbc"
135    AVRO = "avro"
136    JSON = "json"
137    CSV = "csv"
138    PARQUET = "parquet"
139    DELTAFILES = "delta"
140    KAFKA = "kafka"
141    CONSOLE = "console"
142    NOOP = "noop"
143    DATAFRAME = "dataframe"
144    REST_API = "rest_api"
145    FILE = "file"  # Internal use only
146    TABLE = "table"  # Internal use only
147
148    @classmethod
149    def values(cls):  # type: ignore
150        """Generates a list containing all enum values.
151
152        Return:
153            A list with all enum values.
154        """
155        return (c.value for c in cls)
156
157    @classmethod
158    def exists(cls, output_format: str) -> bool:
159        """Checks if the output format exists in the enum values.
160
161        Args:
162            output_format: format to check if exists.
163
164        Return:
165            If the output format exists in our enum.
166        """
167        return output_format in cls.values()

Formats of algorithm output.

JDBC = <OutputFormat.JDBC: 'jdbc'>
AVRO = <OutputFormat.AVRO: 'avro'>
JSON = <OutputFormat.JSON: 'json'>
CSV = <OutputFormat.CSV: 'csv'>
PARQUET = <OutputFormat.PARQUET: 'parquet'>
DELTAFILES = <OutputFormat.DELTAFILES: 'delta'>
KAFKA = <OutputFormat.KAFKA: 'kafka'>
CONSOLE = <OutputFormat.CONSOLE: 'console'>
NOOP = <OutputFormat.NOOP: 'noop'>
DATAFRAME = <OutputFormat.DATAFRAME: 'dataframe'>
REST_API = <OutputFormat.REST_API: 'rest_api'>
FILE = <OutputFormat.FILE: 'file'>
TABLE = <OutputFormat.TABLE: 'table'>
@classmethod
def values(cls):
148    @classmethod
149    def values(cls):  # type: ignore
150        """Generates a list containing all enum values.
151
152        Return:
153            A list with all enum values.
154        """
155        return (c.value for c in cls)

Generates a list containing all enum values.

Return:

A list with all enum values.

@classmethod
def exists(cls, output_format: str) -> bool:
157    @classmethod
158    def exists(cls, output_format: str) -> bool:
159        """Checks if the output format exists in the enum values.
160
161        Args:
162            output_format: format to check if exists.
163
164        Return:
165            If the output format exists in our enum.
166        """
167        return output_format in cls.values()

Checks if the output format exists in the enum values.

Arguments:
  • output_format: format to check if exists.
Return:

If the output format exists in our enum.

Inherited Members
enum.Enum
name
value
FILE_OUTPUT_FORMATS = ['avro', 'json', 'parquet', 'csv', 'delta']
class NotifierType(enum.Enum):
180class NotifierType(Enum):
181    """Type of notifier available."""
182
183    EMAIL = "email"

Type of notifier available.

EMAIL = <NotifierType.EMAIL: 'email'>
Inherited Members
enum.Enum
name
value
class NotificationRuntimeParameters(enum.Enum):
186class NotificationRuntimeParameters(Enum):
187    """Parameters to be replaced in runtime."""
188
189    DATABRICKS_JOB_NAME = "databricks_job_name"
190    DATABRICKS_WORKSPACE_ID = "databricks_workspace_id"

Parameters to be replaced in runtime.

DATABRICKS_JOB_NAME = <NotificationRuntimeParameters.DATABRICKS_JOB_NAME: 'databricks_job_name'>
DATABRICKS_WORKSPACE_ID = <NotificationRuntimeParameters.DATABRICKS_WORKSPACE_ID: 'databricks_workspace_id'>
Inherited Members
enum.Enum
name
value
NOTIFICATION_RUNTIME_PARAMETERS = ['databricks_job_name', 'databricks_workspace_id']
class ReadType(enum.Enum):
199class ReadType(Enum):
200    """Define the types of read operations.
201
202    - BATCH - read the data in batch mode (e.g., Spark batch).
203    - STREAMING - read the data in streaming mode (e.g., Spark streaming).
204    """
205
206    BATCH = "batch"
207    STREAMING = "streaming"

Define the types of read operations.

  • BATCH - read the data in batch mode (e.g., Spark batch).
  • STREAMING - read the data in streaming mode (e.g., Spark streaming).
BATCH = <ReadType.BATCH: 'batch'>
STREAMING = <ReadType.STREAMING: 'streaming'>
Inherited Members
enum.Enum
name
value
class ReadMode(enum.Enum):
210class ReadMode(Enum):
211    """Different modes that control how we handle compliance to the provided schema.
212
213    These read modes map to Spark's read modes at the moment.
214    """
215
216    PERMISSIVE = "PERMISSIVE"
217    FAILFAST = "FAILFAST"
218    DROPMALFORMED = "DROPMALFORMED"

Different modes that control how we handle compliance to the provided schema.

These read modes map to Spark's read modes at the moment.

PERMISSIVE = <ReadMode.PERMISSIVE: 'PERMISSIVE'>
FAILFAST = <ReadMode.FAILFAST: 'FAILFAST'>
DROPMALFORMED = <ReadMode.DROPMALFORMED: 'DROPMALFORMED'>
Inherited Members
enum.Enum
name
value
class DQDefaults(enum.Enum):
221class DQDefaults(Enum):
222    """Defaults used on the data quality process."""
223
224    FILE_SYSTEM_STORE = "file_system"
225    FILE_SYSTEM_S3_STORE = "s3"
226    DQ_BATCH_IDENTIFIERS = ["spec_id", "input_id", "timestamp"]
227    DATASOURCE_CLASS_NAME = "Datasource"
228    DATASOURCE_EXECUTION_ENGINE = "SparkDFExecutionEngine"
229    DATA_CONNECTORS_CLASS_NAME = "RuntimeDataConnector"
230    DATA_CONNECTORS_MODULE_NAME = "great_expectations.datasource.data_connector"
231    DATA_CHECKPOINTS_CLASS_NAME = "SimpleCheckpoint"
232    DATA_CHECKPOINTS_CONFIG_VERSION = 1.0
233    STORE_BACKEND = "s3"
234    EXPECTATIONS_STORE_PREFIX = "dq/expectations/"
235    VALIDATIONS_STORE_PREFIX = "dq/validations/"
236    DATA_DOCS_PREFIX = "dq/data_docs/site/"
237    CHECKPOINT_STORE_PREFIX = "dq/checkpoints/"
238    VALIDATION_COLUMN_IDENTIFIER = "validationresultidentifier"
239    CUSTOM_EXPECTATION_LIST = [
240        "expect_column_values_to_be_date_not_older_than",
241        "expect_column_pair_a_to_be_smaller_or_equal_than_b",
242        "expect_multicolumn_column_a_must_equal_b_or_c",
243        "expect_queried_column_agg_value_to_be",
244        "expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b",
245        "expect_column_pair_a_to_be_not_equal_to_b",
246    ]
247    DQ_VALIDATIONS_SCHEMA = StructType(
248        [
249            StructField(
250                "dq_validations",
251                StructType(
252                    [
253                        StructField("run_name", StringType()),
254                        StructField("run_success", BooleanType()),
255                        StructField("raised_exceptions", BooleanType()),
256                        StructField("run_row_success", BooleanType()),
257                        StructField(
258                            "dq_failure_details",
259                            ArrayType(
260                                StructType(
261                                    [
262                                        StructField("expectation_type", StringType()),
263                                        StructField("kwargs", StringType()),
264                                    ]
265                                ),
266                            ),
267                        ),
268                    ]
269                ),
270            )
271        ]
272    )

Defaults used on the data quality process.

FILE_SYSTEM_STORE = <DQDefaults.FILE_SYSTEM_STORE: 'file_system'>
FILE_SYSTEM_S3_STORE = <DQDefaults.FILE_SYSTEM_S3_STORE: 's3'>
DQ_BATCH_IDENTIFIERS = <DQDefaults.DQ_BATCH_IDENTIFIERS: ['spec_id', 'input_id', 'timestamp']>
DATASOURCE_CLASS_NAME = <DQDefaults.DATASOURCE_CLASS_NAME: 'Datasource'>
DATASOURCE_EXECUTION_ENGINE = <DQDefaults.DATASOURCE_EXECUTION_ENGINE: 'SparkDFExecutionEngine'>
DATA_CONNECTORS_CLASS_NAME = <DQDefaults.DATA_CONNECTORS_CLASS_NAME: 'RuntimeDataConnector'>
DATA_CONNECTORS_MODULE_NAME = <DQDefaults.DATA_CONNECTORS_MODULE_NAME: 'great_expectations.datasource.data_connector'>
DATA_CHECKPOINTS_CLASS_NAME = <DQDefaults.DATA_CHECKPOINTS_CLASS_NAME: 'SimpleCheckpoint'>
DATA_CHECKPOINTS_CONFIG_VERSION = <DQDefaults.DATA_CHECKPOINTS_CONFIG_VERSION: 1.0>
STORE_BACKEND = <DQDefaults.FILE_SYSTEM_S3_STORE: 's3'>
EXPECTATIONS_STORE_PREFIX = <DQDefaults.EXPECTATIONS_STORE_PREFIX: 'dq/expectations/'>
VALIDATIONS_STORE_PREFIX = <DQDefaults.VALIDATIONS_STORE_PREFIX: 'dq/validations/'>
DATA_DOCS_PREFIX = <DQDefaults.DATA_DOCS_PREFIX: 'dq/data_docs/site/'>
CHECKPOINT_STORE_PREFIX = <DQDefaults.CHECKPOINT_STORE_PREFIX: 'dq/checkpoints/'>
VALIDATION_COLUMN_IDENTIFIER = <DQDefaults.VALIDATION_COLUMN_IDENTIFIER: 'validationresultidentifier'>
CUSTOM_EXPECTATION_LIST = <DQDefaults.CUSTOM_EXPECTATION_LIST: ['expect_column_values_to_be_date_not_older_than', 'expect_column_pair_a_to_be_smaller_or_equal_than_b', 'expect_multicolumn_column_a_must_equal_b_or_c', 'expect_queried_column_agg_value_to_be', 'expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b', 'expect_column_pair_a_to_be_not_equal_to_b']>
DQ_VALIDATIONS_SCHEMA = <DQDefaults.DQ_VALIDATIONS_SCHEMA: StructType([StructField('dq_validations', StructType([StructField('run_name', StringType(), True), StructField('run_success', BooleanType(), True), StructField('raised_exceptions', BooleanType(), True), StructField('run_row_success', BooleanType(), True), StructField('dq_failure_details', ArrayType(StructType([StructField('expectation_type', StringType(), True), StructField('kwargs', StringType(), True)]), True), True)]), True)])>
Inherited Members
enum.Enum
name
value
class WriteType(enum.Enum):
275class WriteType(Enum):
276    """Types of write operations."""
277
278    OVERWRITE = "overwrite"
279    COMPLETE = "complete"
280    APPEND = "append"
281    UPDATE = "update"
282    MERGE = "merge"
283    ERROR_IF_EXISTS = "error"
284    IGNORE_IF_EXISTS = "ignore"

Types of write operations.

OVERWRITE = <WriteType.OVERWRITE: 'overwrite'>
COMPLETE = <WriteType.COMPLETE: 'complete'>
APPEND = <WriteType.APPEND: 'append'>
UPDATE = <WriteType.UPDATE: 'update'>
MERGE = <WriteType.MERGE: 'merge'>
ERROR_IF_EXISTS = <WriteType.ERROR_IF_EXISTS: 'error'>
IGNORE_IF_EXISTS = <WriteType.IGNORE_IF_EXISTS: 'ignore'>
Inherited Members
enum.Enum
name
value
@dataclass
class InputSpec:
287@dataclass
288class InputSpec(object):
289    """Specification of an algorithm input.
290
291    This is very aligned with the way the execution environment connects to the sources
292    (e.g., spark sources).
293
294    - spec_id: spec_id of the input specification read_type: ReadType type of read
295        operation.
296    - data_format: format of the input.
297    - sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp
298        directory.
299    - df_name: dataframe name.
300    - db_table: table name in the form of `<db>.<table>`.
301    - location: uri that identifies from where to read data in the specified format.
302    - enforce_schema_from_table: if we want to enforce the table schema or not, by
303        providing a table name in the form of `<db>.<table>`.
304    - query: sql query to execute and return the dataframe. Use it if you do not want to
305        read from a file system nor from a table, but rather from a sql query instead.
306    - schema: dict representation of a schema of the input (e.g., Spark struct type
307        schema).
308    - schema_path: path to a file with a representation of a schema of the input (e.g.,
309        Spark struct type schema).
310    - disable_dbfs_retry: optional flag to disable file storage dbfs.
311    - with_filepath: if we want to include the path of the file that is being read. Only
312        works with the file reader (batch and streaming modes are supported).
313    - options: dict with other relevant options according to the execution
314        environment (e.g., spark) possible sources.
315    - calculate_upper_bound: when to calculate upper bound to extract from SAP BW
316        or not.
317    - calc_upper_bound_schema: specific schema for the calculated upper_bound.
318    - generate_predicates: when to generate predicates to extract from SAP BW or not.
319    - predicates_add_null: if we want to include is null on partition by predicates.
320    - temp_view: optional name of a view to point to the input dataframe to be used
321        to create or replace a temp view on top of the dataframe.
322    """
323
324    spec_id: str
325    read_type: str
326    data_format: Optional[str] = None
327    sftp_files_format: Optional[str] = None
328    df_name: Optional[DataFrame] = None
329    db_table: Optional[str] = None
330    location: Optional[str] = None
331    query: Optional[str] = None
332    enforce_schema_from_table: Optional[str] = None
333    schema: Optional[dict] = None
334    schema_path: Optional[str] = None
335    disable_dbfs_retry: bool = False
336    with_filepath: bool = False
337    options: Optional[dict] = None
338    jdbc_args: Optional[dict] = None
339    calculate_upper_bound: bool = False
340    calc_upper_bound_schema: Optional[str] = None
341    generate_predicates: bool = False
342    predicates_add_null: bool = True
343    temp_view: Optional[str] = None

Specification of an algorithm input.

This is very aligned with the way the execution environment connects to the sources (e.g., spark sources).

  • spec_id: spec_id of the input specification read_type: ReadType type of read operation.
  • data_format: format of the input.
  • sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp directory.
  • df_name: dataframe name.
  • db_table: table name in the form of <db>.<table>.
  • location: uri that identifies from where to read data in the specified format.
  • enforce_schema_from_table: if we want to enforce the table schema or not, by providing a table name in the form of <db>.<table>.
  • query: sql query to execute and return the dataframe. Use it if you do not want to read from a file system nor from a table, but rather from a sql query instead.
  • schema: dict representation of a schema of the input (e.g., Spark struct type schema).
  • schema_path: path to a file with a representation of a schema of the input (e.g., Spark struct type schema).
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
  • with_filepath: if we want to include the path of the file that is being read. Only works with the file reader (batch and streaming modes are supported).
  • options: dict with other relevant options according to the execution environment (e.g., spark) possible sources.
  • calculate_upper_bound: when to calculate upper bound to extract from SAP BW or not.
  • calc_upper_bound_schema: specific schema for the calculated upper_bound.
  • generate_predicates: when to generate predicates to extract from SAP BW or not.
  • predicates_add_null: if we want to include is null on partition by predicates.
  • temp_view: optional name of a view to point to the input dataframe to be used to create or replace a temp view on top of the dataframe.
InputSpec( spec_id: str, read_type: str, data_format: Optional[str] = None, sftp_files_format: Optional[str] = None, df_name: Optional[pyspark.sql.dataframe.DataFrame] = None, db_table: Optional[str] = None, location: Optional[str] = None, query: Optional[str] = None, enforce_schema_from_table: Optional[str] = None, schema: Optional[dict] = None, schema_path: Optional[str] = None, disable_dbfs_retry: bool = False, with_filepath: bool = False, options: Optional[dict] = None, jdbc_args: Optional[dict] = None, calculate_upper_bound: bool = False, calc_upper_bound_schema: Optional[str] = None, generate_predicates: bool = False, predicates_add_null: bool = True, temp_view: Optional[str] = None)
spec_id: str
read_type: str
data_format: Optional[str] = None
sftp_files_format: Optional[str] = None
df_name: Optional[pyspark.sql.dataframe.DataFrame] = None
db_table: Optional[str] = None
location: Optional[str] = None
query: Optional[str] = None
enforce_schema_from_table: Optional[str] = None
schema: Optional[dict] = None
schema_path: Optional[str] = None
disable_dbfs_retry: bool = False
with_filepath: bool = False
options: Optional[dict] = None
jdbc_args: Optional[dict] = None
calculate_upper_bound: bool = False
calc_upper_bound_schema: Optional[str] = None
generate_predicates: bool = False
predicates_add_null: bool = True
temp_view: Optional[str] = None
@dataclass
class TransformerSpec:
346@dataclass
347class TransformerSpec(object):
348    """Transformer Specification, i.e., a single transformation amongst many.
349
350    - function: name of the function (or callable function) to be executed.
351    - args: (not applicable if using a callable function) dict with the arguments
352        to pass to the function `<k,v>` pairs with the name of the parameter of
353        the function and the respective value.
354    """
355
356    function: str
357    args: dict

Transformer Specification, i.e., a single transformation amongst many.

  • function: name of the function (or callable function) to be executed.
  • args: (not applicable if using a callable function) dict with the arguments to pass to the function <k,v> pairs with the name of the parameter of the function and the respective value.
TransformerSpec(function: str, args: dict)
function: str
args: dict
@dataclass
class TransformSpec:
360@dataclass
361class TransformSpec(object):
362    """Transformation Specification.
363
364    I.e., the specification that defines the many transformations to be done to the data
365    that was read.
366
367    - spec_id: id of the terminate specification
368    - input_id: id of the corresponding input
369    specification.
370    - transformers: list of transformers to execute.
371    - force_streaming_foreach_batch_processing: sometimes, when using streaming, we want
372        to force the transform to be executed in the foreachBatch function to ensure
373        non-supported streaming operations can be properly executed.
374    """
375
376    spec_id: str
377    input_id: str
378    transformers: List[TransformerSpec]
379    force_streaming_foreach_batch_processing: bool = False

Transformation Specification.

I.e., the specification that defines the many transformations to be done to the data that was read.

  • spec_id: id of the terminate specification
  • input_id: id of the corresponding input specification.
  • transformers: list of transformers to execute.
  • force_streaming_foreach_batch_processing: sometimes, when using streaming, we want to force the transform to be executed in the foreachBatch function to ensure non-supported streaming operations can be properly executed.
TransformSpec( spec_id: str, input_id: str, transformers: List[TransformerSpec], force_streaming_foreach_batch_processing: bool = False)
spec_id: str
input_id: str
transformers: List[TransformerSpec]
force_streaming_foreach_batch_processing: bool = False
class DQType(enum.Enum):
382class DQType(Enum):
383    """Available data quality tasks."""
384
385    VALIDATOR = "validator"
386    PRISMA = "prisma"

Available data quality tasks.

VALIDATOR = <DQType.VALIDATOR: 'validator'>
PRISMA = <DQType.PRISMA: 'prisma'>
Inherited Members
enum.Enum
name
value
class DQExecutionPoint(enum.Enum):
389class DQExecutionPoint(Enum):
390    """Available data quality execution points."""
391
392    IN_MOTION = "in_motion"
393    AT_REST = "at_rest"

Available data quality execution points.

IN_MOTION = <DQExecutionPoint.IN_MOTION: 'in_motion'>
AT_REST = <DQExecutionPoint.AT_REST: 'at_rest'>
Inherited Members
enum.Enum
name
value
class DQTableBaseParameters(enum.Enum):
396class DQTableBaseParameters(Enum):
397    """Base parameters for importing DQ rules from a table."""
398
399    PRISMA_BASE_PARAMETERS = ["arguments", "dq_tech_function"]

Base parameters for importing DQ rules from a table.

PRISMA_BASE_PARAMETERS = <DQTableBaseParameters.PRISMA_BASE_PARAMETERS: ['arguments', 'dq_tech_function']>
Inherited Members
enum.Enum
name
value
@dataclass
class DQFunctionSpec:
402@dataclass
403class DQFunctionSpec(object):
404    """Defines a data quality function specification.
405
406    - function - name of the data quality function (expectation) to execute.
407    It follows the great_expectations api https://greatexpectations.io/expectations/.
408    - args - args of the function (expectation). Follow the same api as above.
409    """
410
411    function: str
412    args: Optional[dict] = None

Defines a data quality function specification.

  • function - name of the data quality function (expectation) to execute. It follows the great_expectations api https://greatexpectations.io/expectations/.
  • args - args of the function (expectation). Follow the same api as above.
DQFunctionSpec(function: str, args: Optional[dict] = None)
function: str
args: Optional[dict] = None
@dataclass
class DQSpec:
415@dataclass
416class DQSpec(object):
417    """Data quality overall specification.
418
419    - spec_id - id of the specification.
420    - input_id - id of the input specification.
421    - dq_type - type of DQ process to execute (e.g. validator).
422    - dq_functions - list of function specifications to execute.
423    - dq_db_table - name of table to derive the dq functions from.
424    - dq_table_table_filter - name of the table which rules are to be applied in the
425        validations (Only used when deriving dq functions).
426    - dq_table_extra_filters - extra filters to be used when deriving dq functions.
427        This is a sql expression to be applied to the dq_db_table.
428    - execution_point - execution point of the dq functions. [at_rest, in_motion].
429        This is set during the load_data or dq_validator functions.
430    - unexpected_rows_pk - the list of columns composing the primary key of the
431        source data to identify the rows failing the DQ validations. Note: only one
432        of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It
433        is mandatory to provide one of these arguments when using tag_source_data
434        as True. When tag_source_data is False, this is not mandatory, but still
435        recommended.
436    - tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from.
437        Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to
438        be provided. It is mandatory to provide one of these arguments when using
439        tag_source_data as True. hen tag_source_data is False, this is not
440        mandatory, but still recommended.
441    - sort_processed_keys - when using the `prisma` `dq_type`, a column `processed_keys`
442        is automatically added to give observability over the PK values that were
443        processed during a run. This parameter (`sort_processed_keys`) controls whether
444        the processed keys column value should be sorted or not. Default: False.
445    - gx_result_format - great expectations result format. Default: "COMPLETE".
446    - tag_source_data - when set to true, this will ensure that the DQ process ends by
447        tagging the source data with an additional column with information about the
448        DQ results. This column makes it possible to identify if the DQ run was
449        succeeded in general and, if not, it unlocks the insights to know what
450        specific rows have made the DQ validations fail and why. Default: False.
451        Note: it only works if result_sink_explode is True, gx_result_format is
452        COMPLETE, fail_on_error is False (which is done automatically when
453        you specify tag_source_data as True) and tbl_to_derive_pk or
454        unexpected_rows_pk is configured.
455    - store_backend - which store_backend to use (e.g. s3 or file_system).
456    - local_fs_root_dir - path of the root directory. Note: only applicable for
457        store_backend file_system.
458    - data_docs_local_fs - the path for data docs only for store_backend
459        file_system.
460    - bucket - the bucket name to consider for the store_backend (store DQ artefacts).
461        Note: only applicable for store_backend s3.
462    - data_docs_bucket - the bucket name for data docs only. When defined, it will
463        supersede bucket parameter. Note: only applicable for store_backend s3.
464    - expectations_store_prefix - prefix where to store expectations' data. Note: only
465        applicable for store_backend s3.
466    - validations_store_prefix - prefix where to store validations' data. Note: only
467        applicable for store_backend s3.
468    - data_docs_prefix - prefix where to store data_docs' data.
469    - checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only
470        applicable for store_backend s3.
471    - data_asset_name - name of the data asset to consider when configuring the great
472        expectations' data source.
473    - expectation_suite_name - name to consider for great expectations' suite.
474    - result_sink_db_table - db.table_name indicating the database and table in which
475        to save the results of the DQ process.
476    - result_sink_location - file system location in which to save the results of the
477        DQ process.
478    - data_product_name - name of the data product.
479    - result_sink_partitions - the list of partitions to consider.
480    - result_sink_format - format of the result table (e.g. delta, parquet, kafka...).
481    - result_sink_options - extra spark options for configuring the result sink.
482        E.g: can be used to configure a Kafka sink if result_sink_format is kafka.
483    - result_sink_explode - flag to determine if the output table/location should have
484        the columns exploded (as True) or not (as False). Default: True.
485    - result_sink_extra_columns - list of extra columns to be exploded (following
486        the pattern "<name>.*") or columns to be selected. It is only used when
487        result_sink_explode is set to True.
488    - source - name of data source, to be easier to identify in analysis. If not
489        specified, it is set as default <input_id>. This will be only used
490        when result_sink_explode is set to True.
491    - fail_on_error - whether to fail the algorithm if the validations of your data in
492        the DQ process failed.
493    - cache_df - whether to cache the dataframe before running the DQ process or not.
494    - critical_functions - functions that should not fail. When this argument is
495        defined, fail_on_error is nullified.
496    - max_percentage_failure - percentage of failure that should be allowed.
497        This argument has priority over both fail_on_error and critical_functions.
498    """
499
500    spec_id: str
501    input_id: str
502    dq_type: str
503    dq_functions: Optional[List[DQFunctionSpec]] = None
504    dq_db_table: Optional[str] = None
505    dq_table_table_filter: Optional[str] = None
506    dq_table_extra_filters: Optional[str] = None
507    execution_point: Optional[str] = None
508    unexpected_rows_pk: Optional[List[str]] = None
509    tbl_to_derive_pk: Optional[str] = None
510    sort_processed_keys: Optional[bool] = False
511    gx_result_format: Optional[str] = "COMPLETE"
512    tag_source_data: Optional[bool] = False
513    store_backend: str = DQDefaults.STORE_BACKEND.value
514    local_fs_root_dir: Optional[str] = None
515    data_docs_local_fs: Optional[str] = None
516    bucket: Optional[str] = None
517    data_docs_bucket: Optional[str] = None
518    expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value
519    validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value
520    data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value
521    checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value
522    data_asset_name: Optional[str] = None
523    expectation_suite_name: Optional[str] = None
524    result_sink_db_table: Optional[str] = None
525    result_sink_location: Optional[str] = None
526    data_product_name: Optional[str] = None
527    result_sink_partitions: Optional[List[str]] = None
528    result_sink_format: str = OutputFormat.DELTAFILES.value
529    result_sink_options: Optional[dict] = None
530    result_sink_explode: bool = True
531    result_sink_extra_columns: Optional[List[str]] = None
532    source: Optional[str] = None
533    fail_on_error: bool = True
534    cache_df: bool = False
535    critical_functions: Optional[List[DQFunctionSpec]] = None
536    max_percentage_failure: Optional[float] = None

Data quality overall specification.

  • spec_id - id of the specification.
  • input_id - id of the input specification.
  • dq_type - type of DQ process to execute (e.g. validator).
  • dq_functions - list of function specifications to execute.
  • dq_db_table - name of table to derive the dq functions from.
  • dq_table_table_filter - name of the table which rules are to be applied in the validations (Only used when deriving dq functions).
  • dq_table_extra_filters - extra filters to be used when deriving dq functions. This is a sql expression to be applied to the dq_db_table.
  • execution_point - execution point of the dq functions. [at_rest, in_motion]. This is set during the load_data or dq_validator functions.
  • unexpected_rows_pk - the list of columns composing the primary key of the source data to identify the rows failing the DQ validations. Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It is mandatory to provide one of these arguments when using tag_source_data as True. When tag_source_data is False, this is not mandatory, but still recommended.
  • tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from. Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It is mandatory to provide one of these arguments when using tag_source_data as True. hen tag_source_data is False, this is not mandatory, but still recommended.
  • sort_processed_keys - when using the prisma dq_type, a column processed_keys is automatically added to give observability over the PK values that were processed during a run. This parameter (sort_processed_keys) controls whether the processed keys column value should be sorted or not. Default: False.
  • gx_result_format - great expectations result format. Default: "COMPLETE".
  • tag_source_data - when set to true, this will ensure that the DQ process ends by tagging the source data with an additional column with information about the DQ results. This column makes it possible to identify if the DQ run was succeeded in general and, if not, it unlocks the insights to know what specific rows have made the DQ validations fail and why. Default: False. Note: it only works if result_sink_explode is True, gx_result_format is COMPLETE, fail_on_error is False (which is done automatically when you specify tag_source_data as True) and tbl_to_derive_pk or unexpected_rows_pk is configured.
  • store_backend - which store_backend to use (e.g. s3 or file_system).
  • local_fs_root_dir - path of the root directory. Note: only applicable for store_backend file_system.
  • data_docs_local_fs - the path for data docs only for store_backend file_system.
  • bucket - the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3.
  • data_docs_bucket - the bucket name for data docs only. When defined, it will supersede bucket parameter. Note: only applicable for store_backend s3.
  • expectations_store_prefix - prefix where to store expectations' data. Note: only applicable for store_backend s3.
  • validations_store_prefix - prefix where to store validations' data. Note: only applicable for store_backend s3.
  • data_docs_prefix - prefix where to store data_docs' data.
  • checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only applicable for store_backend s3.
  • data_asset_name - name of the data asset to consider when configuring the great expectations' data source.
  • expectation_suite_name - name to consider for great expectations' suite.
  • result_sink_db_table - db.table_name indicating the database and table in which to save the results of the DQ process.
  • result_sink_location - file system location in which to save the results of the DQ process.
  • data_product_name - name of the data product.
  • result_sink_partitions - the list of partitions to consider.
  • result_sink_format - format of the result table (e.g. delta, parquet, kafka...).
  • result_sink_options - extra spark options for configuring the result sink. E.g: can be used to configure a Kafka sink if result_sink_format is kafka.
  • result_sink_explode - flag to determine if the output table/location should have the columns exploded (as True) or not (as False). Default: True.
  • result_sink_extra_columns - list of extra columns to be exploded (following the pattern ".*") or columns to be selected. It is only used when result_sink_explode is set to True.
  • source - name of data source, to be easier to identify in analysis. If not specified, it is set as default . This will be only used when result_sink_explode is set to True.
  • fail_on_error - whether to fail the algorithm if the validations of your data in the DQ process failed.
  • cache_df - whether to cache the dataframe before running the DQ process or not.
  • critical_functions - functions that should not fail. When this argument is defined, fail_on_error is nullified.
  • max_percentage_failure - percentage of failure that should be allowed. This argument has priority over both fail_on_error and critical_functions.
DQSpec( spec_id: str, input_id: str, dq_type: str, dq_functions: Optional[List[DQFunctionSpec]] = None, dq_db_table: Optional[str] = None, dq_table_table_filter: Optional[str] = None, dq_table_extra_filters: Optional[str] = None, execution_point: Optional[str] = None, unexpected_rows_pk: Optional[List[str]] = None, tbl_to_derive_pk: Optional[str] = None, sort_processed_keys: Optional[bool] = False, gx_result_format: Optional[str] = 'COMPLETE', tag_source_data: Optional[bool] = False, store_backend: str = 's3', local_fs_root_dir: Optional[str] = None, data_docs_local_fs: Optional[str] = None, bucket: Optional[str] = None, data_docs_bucket: Optional[str] = None, expectations_store_prefix: str = 'dq/expectations/', validations_store_prefix: str = 'dq/validations/', data_docs_prefix: str = 'dq/data_docs/site/', checkpoint_store_prefix: str = 'dq/checkpoints/', data_asset_name: Optional[str] = None, expectation_suite_name: Optional[str] = None, result_sink_db_table: Optional[str] = None, result_sink_location: Optional[str] = None, data_product_name: Optional[str] = None, result_sink_partitions: Optional[List[str]] = None, result_sink_format: str = 'delta', result_sink_options: Optional[dict] = None, result_sink_explode: bool = True, result_sink_extra_columns: Optional[List[str]] = None, source: Optional[str] = None, fail_on_error: bool = True, cache_df: bool = False, critical_functions: Optional[List[DQFunctionSpec]] = None, max_percentage_failure: Optional[float] = None)
spec_id: str
input_id: str
dq_type: str
dq_functions: Optional[List[DQFunctionSpec]] = None
dq_db_table: Optional[str] = None
dq_table_table_filter: Optional[str] = None
dq_table_extra_filters: Optional[str] = None
execution_point: Optional[str] = None
unexpected_rows_pk: Optional[List[str]] = None
tbl_to_derive_pk: Optional[str] = None
sort_processed_keys: Optional[bool] = False
gx_result_format: Optional[str] = 'COMPLETE'
tag_source_data: Optional[bool] = False
store_backend: str = 's3'
local_fs_root_dir: Optional[str] = None
data_docs_local_fs: Optional[str] = None
bucket: Optional[str] = None
data_docs_bucket: Optional[str] = None
expectations_store_prefix: str = 'dq/expectations/'
validations_store_prefix: str = 'dq/validations/'
data_docs_prefix: str = 'dq/data_docs/site/'
checkpoint_store_prefix: str = 'dq/checkpoints/'
data_asset_name: Optional[str] = None
expectation_suite_name: Optional[str] = None
result_sink_db_table: Optional[str] = None
result_sink_location: Optional[str] = None
data_product_name: Optional[str] = None
result_sink_partitions: Optional[List[str]] = None
result_sink_format: str = 'delta'
result_sink_options: Optional[dict] = None
result_sink_explode: bool = True
result_sink_extra_columns: Optional[List[str]] = None
source: Optional[str] = None
fail_on_error: bool = True
cache_df: bool = False
critical_functions: Optional[List[DQFunctionSpec]] = None
max_percentage_failure: Optional[float] = None
@dataclass
class MergeOptions:
539@dataclass
540class MergeOptions(object):
541    """Options for a merge operation.
542
543    - merge_predicate: predicate to apply to the merge operation so that we can
544        check if a new record corresponds to a record already included in the
545        historical data.
546    - insert_only: indicates if the merge should only insert data (e.g., deduplicate
547        scenarios).
548    - delete_predicate: predicate to apply to the delete operation.
549    - update_predicate: predicate to apply to the update operation.
550    - insert_predicate: predicate to apply to the insert operation.
551    - update_column_set: rules to apply to the update operation which allows to
552        set the value for each column to be updated.
553        (e.g. {"data": "new.data", "count": "current.count + 1"} )
554    - insert_column_set: rules to apply to the insert operation which allows to
555        set the value for each column to be inserted.
556        (e.g. {"date": "updates.date", "count": "1"} )
557    """
558
559    merge_predicate: str
560    insert_only: bool = False
561    delete_predicate: Optional[str] = None
562    update_predicate: Optional[str] = None
563    insert_predicate: Optional[str] = None
564    update_column_set: Optional[dict] = None
565    insert_column_set: Optional[dict] = None

Options for a merge operation.

  • merge_predicate: predicate to apply to the merge operation so that we can check if a new record corresponds to a record already included in the historical data.
  • insert_only: indicates if the merge should only insert data (e.g., deduplicate scenarios).
  • delete_predicate: predicate to apply to the delete operation.
  • update_predicate: predicate to apply to the update operation.
  • insert_predicate: predicate to apply to the insert operation.
  • update_column_set: rules to apply to the update operation which allows to set the value for each column to be updated. (e.g. {"data": "new.data", "count": "current.count + 1"} )
  • insert_column_set: rules to apply to the insert operation which allows to set the value for each column to be inserted. (e.g. {"date": "updates.date", "count": "1"} )
MergeOptions( merge_predicate: str, insert_only: bool = False, delete_predicate: Optional[str] = None, update_predicate: Optional[str] = None, insert_predicate: Optional[str] = None, update_column_set: Optional[dict] = None, insert_column_set: Optional[dict] = None)
merge_predicate: str
insert_only: bool = False
delete_predicate: Optional[str] = None
update_predicate: Optional[str] = None
insert_predicate: Optional[str] = None
update_column_set: Optional[dict] = None
insert_column_set: Optional[dict] = None
@dataclass
class OutputSpec:
568@dataclass
569class OutputSpec(object):
570    """Specification of an algorithm output.
571
572    This is very aligned with the way the execution environment connects to the output
573    systems (e.g., spark outputs).
574
575    - spec_id: id of the output specification.
576    - input_id: id of the corresponding input specification.
577    - write_type: type of write operation.
578    - data_format: format of the output. Defaults to DELTA.
579    - db_table: table name in the form of `<db>.<table>`.
580    - location: uri that identifies from where to write data in the specified format.
581    - partitions: list of partition input_col names.
582    - merge_opts: options to apply to the merge operation.
583    - streaming_micro_batch_transformers: transformers to invoke for each streaming
584        micro batch, before writing (i.e., in Spark's foreachBatch structured
585        streaming function). Note: the lakehouse engine manages this for you, so
586        you don't have to manually specify streaming transformations here, so we don't
587        advise you to manually specify transformations through this parameter. Supply
588        them as regular transformers in the transform_specs sections of an ACON.
589    - streaming_once: if the streaming query is to be executed just once, or not,
590        generating just one micro batch.
591    - streaming_processing_time: if streaming query is to be kept alive, this indicates
592        the processing time of each micro batch.
593    - streaming_available_now: if set to True, set a trigger that processes all
594        available data in multiple batches then terminates the query.
595        When using streaming, this is the default trigger that the lakehouse-engine will
596        use, unless you configure a different one.
597    - streaming_continuous: set a trigger that runs a continuous query with a given
598        checkpoint interval.
599    - streaming_await_termination: whether to wait (True) for the termination of the
600        streaming query (e.g. timeout or exception) or not (False). Default: True.
601    - streaming_await_termination_timeout: a timeout to set to the
602        streaming_await_termination. Default: None.
603    - with_batch_id: whether to include the streaming batch id in the final data,
604        or not. It only takes effect in streaming mode.
605    - options: dict with other relevant options according to the execution environment
606        (e.g., spark) possible outputs.  E.g.,: JDBC options, checkpoint location for
607        streaming, etc.
608    - streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers
609        but for the DQ functions to be executed. Used internally by the lakehouse
610        engine, so you don't have to supply DQ functions through this parameter. Use the
611        dq_specs of the acon instead.
612    """
613
614    spec_id: str
615    input_id: str
616    write_type: str
617    data_format: str = OutputFormat.DELTAFILES.value
618    db_table: Optional[str] = None
619    location: Optional[str] = None
620    merge_opts: Optional[MergeOptions] = None
621    partitions: Optional[List[str]] = None
622    streaming_micro_batch_transformers: Optional[List[TransformerSpec]] = None
623    streaming_once: Optional[bool] = None
624    streaming_processing_time: Optional[str] = None
625    streaming_available_now: bool = True
626    streaming_continuous: Optional[str] = None
627    streaming_await_termination: bool = True
628    streaming_await_termination_timeout: Optional[int] = None
629    with_batch_id: bool = False
630    options: Optional[dict] = None
631    streaming_micro_batch_dq_processors: Optional[List[DQSpec]] = None

Specification of an algorithm output.

This is very aligned with the way the execution environment connects to the output systems (e.g., spark outputs).

  • spec_id: id of the output specification.
  • input_id: id of the corresponding input specification.
  • write_type: type of write operation.
  • data_format: format of the output. Defaults to DELTA.
  • db_table: table name in the form of <db>.<table>.
  • location: uri that identifies from where to write data in the specified format.
  • partitions: list of partition input_col names.
  • merge_opts: options to apply to the merge operation.
  • streaming_micro_batch_transformers: transformers to invoke for each streaming micro batch, before writing (i.e., in Spark's foreachBatch structured streaming function). Note: the lakehouse engine manages this for you, so you don't have to manually specify streaming transformations here, so we don't advise you to manually specify transformations through this parameter. Supply them as regular transformers in the transform_specs sections of an ACON.
  • streaming_once: if the streaming query is to be executed just once, or not, generating just one micro batch.
  • streaming_processing_time: if streaming query is to be kept alive, this indicates the processing time of each micro batch.
  • streaming_available_now: if set to True, set a trigger that processes all available data in multiple batches then terminates the query. When using streaming, this is the default trigger that the lakehouse-engine will use, unless you configure a different one.
  • streaming_continuous: set a trigger that runs a continuous query with a given checkpoint interval.
  • streaming_await_termination: whether to wait (True) for the termination of the streaming query (e.g. timeout or exception) or not (False). Default: True.
  • streaming_await_termination_timeout: a timeout to set to the streaming_await_termination. Default: None.
  • with_batch_id: whether to include the streaming batch id in the final data, or not. It only takes effect in streaming mode.
  • options: dict with other relevant options according to the execution environment (e.g., spark) possible outputs. E.g.,: JDBC options, checkpoint location for streaming, etc.
  • streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers but for the DQ functions to be executed. Used internally by the lakehouse engine, so you don't have to supply DQ functions through this parameter. Use the dq_specs of the acon instead.
OutputSpec( spec_id: str, input_id: str, write_type: str, data_format: str = 'delta', db_table: Optional[str] = None, location: Optional[str] = None, merge_opts: Optional[MergeOptions] = None, partitions: Optional[List[str]] = None, streaming_micro_batch_transformers: Optional[List[TransformerSpec]] = None, streaming_once: Optional[bool] = None, streaming_processing_time: Optional[str] = None, streaming_available_now: bool = True, streaming_continuous: Optional[str] = None, streaming_await_termination: bool = True, streaming_await_termination_timeout: Optional[int] = None, with_batch_id: bool = False, options: Optional[dict] = None, streaming_micro_batch_dq_processors: Optional[List[DQSpec]] = None)
spec_id: str
input_id: str
write_type: str
data_format: str = 'delta'
db_table: Optional[str] = None
location: Optional[str] = None
merge_opts: Optional[MergeOptions] = None
partitions: Optional[List[str]] = None
streaming_micro_batch_transformers: Optional[List[TransformerSpec]] = None
streaming_once: Optional[bool] = None
streaming_processing_time: Optional[str] = None
streaming_available_now: bool = True
streaming_continuous: Optional[str] = None
streaming_await_termination: bool = True
streaming_await_termination_timeout: Optional[int] = None
with_batch_id: bool = False
options: Optional[dict] = None
streaming_micro_batch_dq_processors: Optional[List[DQSpec]] = None
@dataclass
class TerminatorSpec:
634@dataclass
635class TerminatorSpec(object):
636    """Terminator Specification.
637
638    I.e., the specification that defines a terminator operation to be executed. Examples
639    are compute statistics, vacuum, optimize, etc.
640
641    - function: terminator function to execute.
642    - args: arguments of the terminator function.
643    - input_id: id of the corresponding output specification (Optional).
644    """
645
646    function: str
647    args: Optional[dict] = None
648    input_id: Optional[str] = None

Terminator Specification.

I.e., the specification that defines a terminator operation to be executed. Examples are compute statistics, vacuum, optimize, etc.

  • function: terminator function to execute.
  • args: arguments of the terminator function.
  • input_id: id of the corresponding output specification (Optional).
TerminatorSpec( function: str, args: Optional[dict] = None, input_id: Optional[str] = None)
function: str
args: Optional[dict] = None
input_id: Optional[str] = None
@dataclass
class ReconciliatorSpec:
651@dataclass
652class ReconciliatorSpec(object):
653    """Reconciliator Specification.
654
655    - metrics: list of metrics in the form of:
656        [{
657            metric: name of the column present in both truth and current datasets,
658            aggregation: sum, avg, max, min, ...,
659            type: percentage or absolute,
660            yellow: value,
661            red: value
662        }].
663    - recon_type: reconciliation type (percentage or absolute). Percentage calculates
664        the difference between truth and current results as a percentage (x-y/x), and
665        absolute calculates the raw difference (x - y).
666    - truth_input_spec: input specification of the truth data.
667    - current_input_spec: input specification of the current results data
668    - truth_preprocess_query: additional query on top of the truth input data to
669        preprocess the truth data before it gets fueled into the reconciliation process.
670        Important note: you need to assume that the data out of
671        the truth_input_spec is referencable by a table called 'truth'.
672    - truth_preprocess_query_args: optional dict having the functions/transformations to
673        apply on top of the truth_preprocess_query and respective arguments. Note: cache
674        is being applied on the Dataframe, by default. For turning the default behavior
675        off, pass `"truth_preprocess_query_args": []`.
676    - current_preprocess_query: additional query on top of the current results input
677        data to preprocess the current results data before it gets fueled into the
678        reconciliation process. Important note: you need to assume that the data out of
679        the current_results_input_spec is referencable by a table called 'current'.
680    - current_preprocess_query_args: optional dict having the
681        functions/transformations to apply on top of the current_preprocess_query
682        and respective arguments. Note: cache is being applied on the Dataframe,
683        by default. For turning the default behavior off, pass
684        `"current_preprocess_query_args": []`.
685    - ignore_empty_df: optional boolean, to ignore the recon process if source & target
686       dataframes are empty, recon will exit success code (passed)
687    """
688
689    metrics: List[dict]
690    truth_input_spec: InputSpec
691    current_input_spec: InputSpec
692    truth_preprocess_query: Optional[str] = None
693    truth_preprocess_query_args: Optional[List[dict]] = None
694    current_preprocess_query: Optional[str] = None
695    current_preprocess_query_args: Optional[List[dict]] = None
696    ignore_empty_df: Optional[bool] = False

Reconciliator Specification.

  • metrics: list of metrics in the form of: [{ metric: name of the column present in both truth and current datasets, aggregation: sum, avg, max, min, ..., type: percentage or absolute, yellow: value, red: value }].
  • recon_type: reconciliation type (percentage or absolute). Percentage calculates the difference between truth and current results as a percentage (x-y/x), and absolute calculates the raw difference (x - y).
  • truth_input_spec: input specification of the truth data.
  • current_input_spec: input specification of the current results data
  • truth_preprocess_query: additional query on top of the truth input data to preprocess the truth data before it gets fueled into the reconciliation process. Important note: you need to assume that the data out of the truth_input_spec is referencable by a table called 'truth'.
  • truth_preprocess_query_args: optional dict having the functions/transformations to apply on top of the truth_preprocess_query and respective arguments. Note: cache is being applied on the Dataframe, by default. For turning the default behavior off, pass "truth_preprocess_query_args": [].
  • current_preprocess_query: additional query on top of the current results input data to preprocess the current results data before it gets fueled into the reconciliation process. Important note: you need to assume that the data out of the current_results_input_spec is referencable by a table called 'current'.
  • current_preprocess_query_args: optional dict having the functions/transformations to apply on top of the current_preprocess_query and respective arguments. Note: cache is being applied on the Dataframe, by default. For turning the default behavior off, pass "current_preprocess_query_args": [].
  • ignore_empty_df: optional boolean, to ignore the recon process if source & target dataframes are empty, recon will exit success code (passed)
ReconciliatorSpec( metrics: List[dict], truth_input_spec: InputSpec, current_input_spec: InputSpec, truth_preprocess_query: Optional[str] = None, truth_preprocess_query_args: Optional[List[dict]] = None, current_preprocess_query: Optional[str] = None, current_preprocess_query_args: Optional[List[dict]] = None, ignore_empty_df: Optional[bool] = False)
metrics: List[dict]
truth_input_spec: InputSpec
current_input_spec: InputSpec
truth_preprocess_query: Optional[str] = None
truth_preprocess_query_args: Optional[List[dict]] = None
current_preprocess_query: Optional[str] = None
current_preprocess_query_args: Optional[List[dict]] = None
ignore_empty_df: Optional[bool] = False
@dataclass
class DQValidatorSpec:
699@dataclass
700class DQValidatorSpec(object):
701    """Data Quality Validator Specification.
702
703    - input_spec: input specification of the data to be checked/validated.
704    - dq_spec: data quality specification.
705    - restore_prev_version: specify if, having
706        delta table/files as input, they should be restored to the
707        previous version if the data quality process fails. Note: this
708        is only considered if fail_on_error is kept as True.
709    """
710
711    input_spec: InputSpec
712    dq_spec: DQSpec
713    restore_prev_version: Optional[bool] = False

Data Quality Validator Specification.

  • input_spec: input specification of the data to be checked/validated.
  • dq_spec: data quality specification.
  • restore_prev_version: specify if, having delta table/files as input, they should be restored to the previous version if the data quality process fails. Note: this is only considered if fail_on_error is kept as True.
DQValidatorSpec( input_spec: InputSpec, dq_spec: DQSpec, restore_prev_version: Optional[bool] = False)
input_spec: InputSpec
dq_spec: DQSpec
restore_prev_version: Optional[bool] = False
class SQLDefinitions(enum.Enum):
716class SQLDefinitions(Enum):
717    """SQL definitions statements."""
718
719    compute_table_stats = "ANALYZE TABLE {} COMPUTE STATISTICS"
720    drop_table_stmt = "DROP TABLE IF EXISTS"
721    drop_view_stmt = "DROP VIEW IF EXISTS"
722    truncate_stmt = "TRUNCATE TABLE"
723    describe_stmt = "DESCRIBE TABLE"
724    optimize_stmt = "OPTIMIZE"
725    show_tbl_props_stmt = "SHOW TBLPROPERTIES"
726    delete_where_stmt = "DELETE FROM {} WHERE {}"

SQL definitions statements.

compute_table_stats = <SQLDefinitions.compute_table_stats: 'ANALYZE TABLE {} COMPUTE STATISTICS'>
drop_table_stmt = <SQLDefinitions.drop_table_stmt: 'DROP TABLE IF EXISTS'>
drop_view_stmt = <SQLDefinitions.drop_view_stmt: 'DROP VIEW IF EXISTS'>
truncate_stmt = <SQLDefinitions.truncate_stmt: 'TRUNCATE TABLE'>
describe_stmt = <SQLDefinitions.describe_stmt: 'DESCRIBE TABLE'>
optimize_stmt = <SQLDefinitions.optimize_stmt: 'OPTIMIZE'>
show_tbl_props_stmt = <SQLDefinitions.show_tbl_props_stmt: 'SHOW TBLPROPERTIES'>
delete_where_stmt = <SQLDefinitions.delete_where_stmt: 'DELETE FROM {} WHERE {}'>
Inherited Members
enum.Enum
name
value
class FileManagerAPIKeys(enum.Enum):
729class FileManagerAPIKeys(Enum):
730    """File Manager s3 api keys."""
731
732    CONTENTS = "Contents"
733    KEY = "Key"
734    CONTINUATION = "NextContinuationToken"
735    BUCKET = "Bucket"
736    OBJECTS = "Objects"

File Manager s3 api keys.

CONTENTS = <FileManagerAPIKeys.CONTENTS: 'Contents'>
KEY = <FileManagerAPIKeys.KEY: 'Key'>
CONTINUATION = <FileManagerAPIKeys.CONTINUATION: 'NextContinuationToken'>
BUCKET = <FileManagerAPIKeys.BUCKET: 'Bucket'>
OBJECTS = <FileManagerAPIKeys.OBJECTS: 'Objects'>
Inherited Members
enum.Enum
name
value
@dataclass
class SensorSpec:
739@dataclass
740class SensorSpec(object):
741    """Sensor Specification.
742
743    - sensor_id: sensor id.
744    - assets: a list of assets that are considered as available to
745        consume downstream after this sensor has status
746        PROCESSED_NEW_DATA.
747    - control_db_table_name: db.table to store sensor metadata.
748    - input_spec: input specification of the source to be checked for new data.
749    - preprocess_query: SQL query to transform/filter the result from the
750        upstream. Consider that we should refer to 'new_data' whenever
751        we are referring to the input of the sensor. E.g.:
752            "SELECT dummy_col FROM new_data WHERE ..."
753    - checkpoint_location: optional location to store checkpoints to resume
754        from. These checkpoints use the same as Spark checkpoint strategy.
755        For Spark readers that do not support checkpoints, use the
756        preprocess_query parameter to form a SQL query to filter the result
757        from the upstream accordingly.
758    - fail_on_empty_result: if the sensor should throw an error if there is no new
759        data in the upstream. Default: True.
760    """
761
762    sensor_id: str
763    assets: List[str]
764    control_db_table_name: str
765    input_spec: InputSpec
766    preprocess_query: Optional[str]
767    checkpoint_location: Optional[str]
768    fail_on_empty_result: bool = True
769
770    @classmethod
771    def create_from_acon(cls, acon: dict):  # type: ignore
772        """Create SensorSpec from acon.
773
774        Args:
775            acon: sensor ACON.
776        """
777        checkpoint_location = acon.get("base_checkpoint_location")
778        if checkpoint_location:
779            checkpoint_location = (
780                f"{checkpoint_location.rstrip('/')}/lakehouse_engine/"
781                f"sensors/{acon['sensor_id']}"
782            )
783
784        return cls(
785            sensor_id=acon["sensor_id"],
786            assets=acon["assets"],
787            control_db_table_name=acon["control_db_table_name"],
788            input_spec=InputSpec(**acon["input_spec"]),
789            preprocess_query=acon.get("preprocess_query"),
790            checkpoint_location=checkpoint_location,
791            fail_on_empty_result=acon.get("fail_on_empty_result", True),
792        )

Sensor Specification.

  • sensor_id: sensor id.
  • assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.
  • control_db_table_name: db.table to store sensor metadata.
  • input_spec: input specification of the source to be checked for new data.
  • preprocess_query: SQL query to transform/filter the result from the upstream. Consider that we should refer to 'new_data' whenever we are referring to the input of the sensor. E.g.: "SELECT dummy_col FROM new_data WHERE ..."
  • checkpoint_location: optional location to store checkpoints to resume from. These checkpoints use the same as Spark checkpoint strategy. For Spark readers that do not support checkpoints, use the preprocess_query parameter to form a SQL query to filter the result from the upstream accordingly.
  • fail_on_empty_result: if the sensor should throw an error if there is no new data in the upstream. Default: True.
SensorSpec( sensor_id: str, assets: List[str], control_db_table_name: str, input_spec: InputSpec, preprocess_query: Optional[str], checkpoint_location: Optional[str], fail_on_empty_result: bool = True)
sensor_id: str
assets: List[str]
control_db_table_name: str
input_spec: InputSpec
preprocess_query: Optional[str]
checkpoint_location: Optional[str]
fail_on_empty_result: bool = True
@classmethod
def create_from_acon(cls, acon: dict):
770    @classmethod
771    def create_from_acon(cls, acon: dict):  # type: ignore
772        """Create SensorSpec from acon.
773
774        Args:
775            acon: sensor ACON.
776        """
777        checkpoint_location = acon.get("base_checkpoint_location")
778        if checkpoint_location:
779            checkpoint_location = (
780                f"{checkpoint_location.rstrip('/')}/lakehouse_engine/"
781                f"sensors/{acon['sensor_id']}"
782            )
783
784        return cls(
785            sensor_id=acon["sensor_id"],
786            assets=acon["assets"],
787            control_db_table_name=acon["control_db_table_name"],
788            input_spec=InputSpec(**acon["input_spec"]),
789            preprocess_query=acon.get("preprocess_query"),
790            checkpoint_location=checkpoint_location,
791            fail_on_empty_result=acon.get("fail_on_empty_result", True),
792        )

Create SensorSpec from acon.

Arguments:
  • acon: sensor ACON.
class SensorStatus(enum.Enum):
795class SensorStatus(Enum):
796    """Status for a sensor."""
797
798    ACQUIRED_NEW_DATA = "ACQUIRED_NEW_DATA"
799    PROCESSED_NEW_DATA = "PROCESSED_NEW_DATA"

Status for a sensor.

ACQUIRED_NEW_DATA = <SensorStatus.ACQUIRED_NEW_DATA: 'ACQUIRED_NEW_DATA'>
PROCESSED_NEW_DATA = <SensorStatus.PROCESSED_NEW_DATA: 'PROCESSED_NEW_DATA'>
Inherited Members
enum.Enum
name
value
SENSOR_SCHEMA = StructType([StructField('sensor_id', StringType(), False), StructField('assets', ArrayType(StringType(), False), True), StructField('status', StringType(), False), StructField('status_change_timestamp', TimestampType(), False), StructField('checkpoint_location', StringType(), True), StructField('upstream_key', StringType(), True), StructField('upstream_value', StringType(), True)])
SENSOR_UPDATE_SET: dict = {'sensors.sensor_id': 'updates.sensor_id', 'sensors.status': 'updates.status', 'sensors.status_change_timestamp': 'updates.status_change_timestamp'}
SENSOR_ALLOWED_DATA_FORMATS = {'streaming': ['kafka', 'avro', 'json', 'parquet', 'csv', 'delta', 'cloudfiles'], 'batch': ['delta', 'jdbc']}
class SAPLogchain(enum.Enum):
829class SAPLogchain(Enum):
830    """Defaults used on consuming data from SAP Logchain."""
831
832    DBTABLE = "SAPPHA.RSPCLOGCHAIN"
833    GREEN_STATUS = "G"
834    ENGINE_TABLE = "sensor_new_data"

Defaults used on consuming data from SAP Logchain.

DBTABLE = <SAPLogchain.DBTABLE: 'SAPPHA.RSPCLOGCHAIN'>
GREEN_STATUS = <SAPLogchain.GREEN_STATUS: 'G'>
ENGINE_TABLE = <SAPLogchain.ENGINE_TABLE: 'sensor_new_data'>
Inherited Members
enum.Enum
name
value
class RestoreType(enum.Enum):
837class RestoreType(Enum):
838    """Archive types."""
839
840    BULK = "Bulk"
841    STANDARD = "Standard"
842    EXPEDITED = "Expedited"
843
844    @classmethod
845    def values(cls):  # type: ignore
846        """Generates a list containing all enum values.
847
848        Return:
849            A list with all enum values.
850        """
851        return (c.value for c in cls)
852
853    @classmethod
854    def exists(cls, restore_type: str) -> bool:
855        """Checks if the restore type exists in the enum values.
856
857        Args:
858            restore_type: restore type to check if exists.
859
860        Return:
861            If the restore type exists in our enum.
862        """
863        return restore_type in cls.values()

Archive types.

BULK = <RestoreType.BULK: 'Bulk'>
STANDARD = <RestoreType.STANDARD: 'Standard'>
EXPEDITED = <RestoreType.EXPEDITED: 'Expedited'>
@classmethod
def values(cls):
844    @classmethod
845    def values(cls):  # type: ignore
846        """Generates a list containing all enum values.
847
848        Return:
849            A list with all enum values.
850        """
851        return (c.value for c in cls)

Generates a list containing all enum values.

Return:

A list with all enum values.

@classmethod
def exists(cls, restore_type: str) -> bool:
853    @classmethod
854    def exists(cls, restore_type: str) -> bool:
855        """Checks if the restore type exists in the enum values.
856
857        Args:
858            restore_type: restore type to check if exists.
859
860        Return:
861            If the restore type exists in our enum.
862        """
863        return restore_type in cls.values()

Checks if the restore type exists in the enum values.

Arguments:
  • restore_type: restore type to check if exists.
Return:

If the restore type exists in our enum.

Inherited Members
enum.Enum
name
value
class RestoreStatus(enum.Enum):
866class RestoreStatus(Enum):
867    """Archive types."""
868
869    NOT_STARTED = "not_started"
870    ONGOING = "ongoing"
871    RESTORED = "restored"

Archive types.

NOT_STARTED = <RestoreStatus.NOT_STARTED: 'not_started'>
ONGOING = <RestoreStatus.ONGOING: 'ongoing'>
RESTORED = <RestoreStatus.RESTORED: 'restored'>
Inherited Members
enum.Enum
name
value
ARCHIVE_STORAGE_CLASS = ['GLACIER', 'DEEP_ARCHIVE', 'GLACIER_IR']
class SQLParser(enum.Enum):
881class SQLParser(Enum):
882    """Defaults to use for parsing."""
883
884    DOUBLE_QUOTES = '"'
885    SINGLE_QUOTES = "'"
886    BACKSLASH = "\\"
887    SINGLE_TRACE = "-"
888    DOUBLE_TRACES = "--"
889    SLASH = "/"
890    OPENING_MULTIPLE_LINE_COMMENT = "/*"
891    CLOSING_MULTIPLE_LINE_COMMENT = "*/"
892    PARAGRAPH = "\n"
893    STAR = "*"
894
895    MULTIPLE_LINE_COMMENT = [
896        OPENING_MULTIPLE_LINE_COMMENT,
897        CLOSING_MULTIPLE_LINE_COMMENT,
898    ]

Defaults to use for parsing.

DOUBLE_QUOTES = <SQLParser.DOUBLE_QUOTES: '"'>
SINGLE_QUOTES = <SQLParser.SINGLE_QUOTES: "'">
BACKSLASH = <SQLParser.BACKSLASH: '\\'>
SINGLE_TRACE = <SQLParser.SINGLE_TRACE: '-'>
DOUBLE_TRACES = <SQLParser.DOUBLE_TRACES: '--'>
SLASH = <SQLParser.SLASH: '/'>
OPENING_MULTIPLE_LINE_COMMENT = <SQLParser.OPENING_MULTIPLE_LINE_COMMENT: '/*'>
CLOSING_MULTIPLE_LINE_COMMENT = <SQLParser.CLOSING_MULTIPLE_LINE_COMMENT: '*/'>
PARAGRAPH = <SQLParser.PARAGRAPH: '\n'>
STAR = <SQLParser.STAR: '*'>
MULTIPLE_LINE_COMMENT = <SQLParser.MULTIPLE_LINE_COMMENT: ['/*', '*/']>
Inherited Members
enum.Enum
name
value
class GABDefaults(enum.Enum):
901class GABDefaults(Enum):
902    """Defaults used on the GAB process."""
903
904    DATE_FORMAT = "%Y-%m-%d"
905    DIMENSIONS_DEFAULT_COLUMNS = ["from_date", "to_date"]
906    DEFAULT_DIMENSION_CALENDAR_TABLE = "dim_calendar"
907    DEFAULT_LOOKUP_QUERY_BUILDER_TABLE = "lkp_query_builder"

Defaults used on the GAB process.

DATE_FORMAT = <GABDefaults.DATE_FORMAT: '%Y-%m-%d'>
DIMENSIONS_DEFAULT_COLUMNS = <GABDefaults.DIMENSIONS_DEFAULT_COLUMNS: ['from_date', 'to_date']>
DEFAULT_DIMENSION_CALENDAR_TABLE = <GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE: 'dim_calendar'>
DEFAULT_LOOKUP_QUERY_BUILDER_TABLE = <GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE: 'lkp_query_builder'>
Inherited Members
enum.Enum
name
value
class GABStartOfWeek(enum.Enum):
910class GABStartOfWeek(Enum):
911    """Representation of start of week values on GAB."""
912
913    SUNDAY = "S"
914    MONDAY = "M"
915
916    @classmethod
917    def get_start_of_week(cls) -> dict:
918        """Get the start of week enum as a dict.
919
920        Returns:
921            dict containing all enum entries as `{name:value}`.
922        """
923        return {
924            start_of_week.name: start_of_week.value
925            for start_of_week in list(GABStartOfWeek)
926        }
927
928    @classmethod
929    def get_values(cls) -> set[str]:
930        """Get the start of week enum values as set.
931
932        Returns:
933            set containing all possible values `{value}`.
934        """
935        return {start_of_week.value for start_of_week in list(GABStartOfWeek)}

Representation of start of week values on GAB.

SUNDAY = <GABStartOfWeek.SUNDAY: 'S'>
MONDAY = <GABStartOfWeek.MONDAY: 'M'>
@classmethod
def get_start_of_week(cls) -> dict:
916    @classmethod
917    def get_start_of_week(cls) -> dict:
918        """Get the start of week enum as a dict.
919
920        Returns:
921            dict containing all enum entries as `{name:value}`.
922        """
923        return {
924            start_of_week.name: start_of_week.value
925            for start_of_week in list(GABStartOfWeek)
926        }

Get the start of week enum as a dict.

Returns:

dict containing all enum entries as {name:value}.

@classmethod
def get_values(cls) -> set[str]:
928    @classmethod
929    def get_values(cls) -> set[str]:
930        """Get the start of week enum values as set.
931
932        Returns:
933            set containing all possible values `{value}`.
934        """
935        return {start_of_week.value for start_of_week in list(GABStartOfWeek)}

Get the start of week enum values as set.

Returns:

set containing all possible values {value}.

Inherited Members
enum.Enum
name
value
@dataclass
class GABSpec:
 938@dataclass
 939class GABSpec(object):
 940    """Gab Specification.
 941
 942    query_label_filter: query use-case label to execute.
 943    queue_filter: queue to execute the job.
 944    cadence_filter: selected cadences to build the asset.
 945    target_database: target database to write.
 946    curr_date: current date.
 947    start_date: period start date.
 948    end_date: period end date.
 949    rerun_flag: rerun flag.
 950    target_table: target table to write.
 951    source_database: source database.
 952    gab_base_path: base path to read the use cases.
 953    lookup_table: gab configuration table.
 954    calendar_table: gab calendar table.
 955    """
 956
 957    query_label_filter: list[str]
 958    queue_filter: list[str]
 959    cadence_filter: list[str]
 960    target_database: str
 961    current_date: datetime
 962    start_date: datetime
 963    end_date: datetime
 964    rerun_flag: str
 965    target_table: str
 966    source_database: str
 967    gab_base_path: str
 968    lookup_table: str
 969    calendar_table: str
 970
 971    @classmethod
 972    def create_from_acon(cls, acon: dict):  # type: ignore
 973        """Create GabSpec from acon.
 974
 975        Args:
 976            acon: gab ACON.
 977        """
 978        lookup_table = f"{acon['source_database']}." + (
 979            acon.get(
 980                "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value
 981            )
 982        )
 983
 984        calendar_table = f"{acon['source_database']}." + (
 985            acon.get(
 986                "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value
 987            )
 988        )
 989
 990        def format_date(date_to_format: Union[datetime, str]) -> datetime:
 991            if isinstance(date_to_format, str):
 992                return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value)
 993            else:
 994                return date_to_format
 995
 996        return cls(
 997            query_label_filter=acon["query_label_filter"],
 998            queue_filter=acon["queue_filter"],
 999            cadence_filter=acon["cadence_filter"],
1000            target_database=acon["target_database"],
1001            current_date=datetime.now(),
1002            start_date=format_date(acon["start_date"]),
1003            end_date=format_date(acon["end_date"]),
1004            rerun_flag=acon["rerun_flag"],
1005            target_table=acon["target_table"],
1006            source_database=acon["source_database"],
1007            gab_base_path=acon["gab_base_path"],
1008            lookup_table=lookup_table,
1009            calendar_table=calendar_table,
1010        )

Gab Specification.

query_label_filter: query use-case label to execute. queue_filter: queue to execute the job. cadence_filter: selected cadences to build the asset. target_database: target database to write. curr_date: current date. start_date: period start date. end_date: period end date. rerun_flag: rerun flag. target_table: target table to write. source_database: source database. gab_base_path: base path to read the use cases. lookup_table: gab configuration table. calendar_table: gab calendar table.

GABSpec( query_label_filter: list[str], queue_filter: list[str], cadence_filter: list[str], target_database: str, current_date: datetime.datetime, start_date: datetime.datetime, end_date: datetime.datetime, rerun_flag: str, target_table: str, source_database: str, gab_base_path: str, lookup_table: str, calendar_table: str)
query_label_filter: list[str]
queue_filter: list[str]
cadence_filter: list[str]
target_database: str
current_date: datetime.datetime
start_date: datetime.datetime
end_date: datetime.datetime
rerun_flag: str
target_table: str
source_database: str
gab_base_path: str
lookup_table: str
calendar_table: str
@classmethod
def create_from_acon(cls, acon: dict):
 971    @classmethod
 972    def create_from_acon(cls, acon: dict):  # type: ignore
 973        """Create GabSpec from acon.
 974
 975        Args:
 976            acon: gab ACON.
 977        """
 978        lookup_table = f"{acon['source_database']}." + (
 979            acon.get(
 980                "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value
 981            )
 982        )
 983
 984        calendar_table = f"{acon['source_database']}." + (
 985            acon.get(
 986                "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value
 987            )
 988        )
 989
 990        def format_date(date_to_format: Union[datetime, str]) -> datetime:
 991            if isinstance(date_to_format, str):
 992                return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value)
 993            else:
 994                return date_to_format
 995
 996        return cls(
 997            query_label_filter=acon["query_label_filter"],
 998            queue_filter=acon["queue_filter"],
 999            cadence_filter=acon["cadence_filter"],
1000            target_database=acon["target_database"],
1001            current_date=datetime.now(),
1002            start_date=format_date(acon["start_date"]),
1003            end_date=format_date(acon["end_date"]),
1004            rerun_flag=acon["rerun_flag"],
1005            target_table=acon["target_table"],
1006            source_database=acon["source_database"],
1007            gab_base_path=acon["gab_base_path"],
1008            lookup_table=lookup_table,
1009            calendar_table=calendar_table,
1010        )

Create GabSpec from acon.

Arguments:
  • acon: gab ACON.
class GABCadence(enum.Enum):
1013class GABCadence(Enum):
1014    """Representation of the supported cadences on GAB."""
1015
1016    DAY = 1
1017    WEEK = 2
1018    MONTH = 3
1019    QUARTER = 4
1020    YEAR = 5
1021
1022    @classmethod
1023    def get_ordered_cadences(cls) -> dict:
1024        """Get the cadences ordered by the value.
1025
1026        Returns:
1027            dict containing ordered cadences as `{name:value}`.
1028        """
1029        cadences = list(GABCadence)
1030        return {
1031            cadence.name: cadence.value
1032            for cadence in sorted(cadences, key=lambda gab_cadence: gab_cadence.value)
1033        }
1034
1035    @classmethod
1036    def get_cadences(cls) -> set[str]:
1037        """Get the cadences values as set.
1038
1039        Returns:
1040            set containing all possible cadence values as `{value}`.
1041        """
1042        return {cadence.name for cadence in list(GABCadence)}
1043
1044    @classmethod
1045    def order_cadences(cls, cadences_to_order: list[str]) -> list[str]:
1046        """Order a list of cadences by value.
1047
1048        Returns:
1049            ordered set containing the received cadences.
1050        """
1051        return sorted(
1052            cadences_to_order,
1053            key=lambda item: cls.get_ordered_cadences().get(item),  # type: ignore
1054        )

Representation of the supported cadences on GAB.

DAY = <GABCadence.DAY: 1>
WEEK = <GABCadence.WEEK: 2>
MONTH = <GABCadence.MONTH: 3>
QUARTER = <GABCadence.QUARTER: 4>
YEAR = <GABCadence.YEAR: 5>
@classmethod
def get_ordered_cadences(cls) -> dict:
1022    @classmethod
1023    def get_ordered_cadences(cls) -> dict:
1024        """Get the cadences ordered by the value.
1025
1026        Returns:
1027            dict containing ordered cadences as `{name:value}`.
1028        """
1029        cadences = list(GABCadence)
1030        return {
1031            cadence.name: cadence.value
1032            for cadence in sorted(cadences, key=lambda gab_cadence: gab_cadence.value)
1033        }

Get the cadences ordered by the value.

Returns:

dict containing ordered cadences as {name:value}.

@classmethod
def get_cadences(cls) -> set[str]:
1035    @classmethod
1036    def get_cadences(cls) -> set[str]:
1037        """Get the cadences values as set.
1038
1039        Returns:
1040            set containing all possible cadence values as `{value}`.
1041        """
1042        return {cadence.name for cadence in list(GABCadence)}

Get the cadences values as set.

Returns:

set containing all possible cadence values as {value}.

@classmethod
def order_cadences(cls, cadences_to_order: list[str]) -> list[str]:
1044    @classmethod
1045    def order_cadences(cls, cadences_to_order: list[str]) -> list[str]:
1046        """Order a list of cadences by value.
1047
1048        Returns:
1049            ordered set containing the received cadences.
1050        """
1051        return sorted(
1052            cadences_to_order,
1053            key=lambda item: cls.get_ordered_cadences().get(item),  # type: ignore
1054        )

Order a list of cadences by value.

Returns:

ordered set containing the received cadences.

Inherited Members
enum.Enum
name
value
class GABKeys:
1057class GABKeys:
1058    """Constants used to update pre-configured gab dict key."""
1059
1060    JOIN_SELECT = "join_select"
1061    PROJECT_START = "project_start"
1062    PROJECT_END = "project_end"

Constants used to update pre-configured gab dict key.

JOIN_SELECT = 'join_select'
PROJECT_START = 'project_start'
PROJECT_END = 'project_end'
class GABReplaceableKeys:
1065class GABReplaceableKeys:
1066    """Constants used to replace pre-configured gab dict values."""
1067
1068    CADENCE = "${cad}"
1069    DATE_COLUMN = "${date_column}"
1070    CONFIG_WEEK_START = "${config_week_start}"
1071    RECONCILIATION_CADENCE = "${rec_cadence}"

Constants used to replace pre-configured gab dict values.

CADENCE = '${cad}'
DATE_COLUMN = '${date_column}'
CONFIG_WEEK_START = '${config_week_start}'
RECONCILIATION_CADENCE = '${rec_cadence}'
class GABCombinedConfiguration(enum.Enum):
1074class GABCombinedConfiguration(Enum):
1075    """GAB combined configuration.
1076
1077    Based on the use case configuration return the values to override in the SQL file.
1078    This enum aims to exhaustively map each combination of `cadence`, `reconciliation`,
1079        `week_start` and `snap_flag` return the corresponding values `join_select`,
1080        `project_start` and `project_end` to replace this values in the stages SQL file.
1081
1082    Return corresponding configuration (join_select, project_start, project_end) for
1083        each combination (cadence x recon x week_start x snap_flag).
1084    """
1085
1086    _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE = (
1087        "date(date_trunc('${cad}',${date_column}))"
1088    )
1089    _DEFAULT_PROJECT_START = "df_cal.cadence_start_date"
1090    _DEFAULT_PROJECT_END = "df_cal.cadence_end_date"
1091
1092    COMBINED_CONFIGURATION = {
1093        # Combination of:
1094        # - cadence: `DAY`
1095        # - reconciliation_window: `DAY`, `WEEK`, `MONTH`, `QUARTER`, `YEAR`
1096        # - week_start: `S`, `M`
1097        # - snapshot_flag: `Y`, `N`
1098        1: {
1099            "cadence": GABCadence.DAY.name,
1100            "recon": GABCadence.get_cadences(),
1101            "week_start": GABStartOfWeek.get_values(),
1102            "snap_flag": {"Y", "N"},
1103            "join_select": "",
1104            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1105            "project_end": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1106        },
1107        # Combination of:
1108        # - cadence: `WEEK`
1109        # - reconciliation_window: `DAY`
1110        # - week_start: `S`, `M`
1111        # - snapshot_flag: `Y`
1112        2: {
1113            "cadence": GABCadence.WEEK.name,
1114            "recon": GABCadence.DAY.name,
1115            "week_start": GABStartOfWeek.get_values(),
1116            "snap_flag": "Y",
1117            "join_select": """
1118            select distinct case
1119                when '${config_week_start}' = 'Monday' then weekstart_mon
1120                when '${config_week_start}' = 'Sunday' then weekstart_sun
1121            end as cadence_start_date,
1122            calendar_date as cadence_end_date
1123        """,
1124            "project_start": _DEFAULT_PROJECT_START,
1125            "project_end": _DEFAULT_PROJECT_END,
1126        },
1127        # Combination of:
1128        # - cadence: `WEEK`
1129        # - reconciliation_window: `DAY, `MONTH`, `QUARTER`, `YEAR`
1130        # - week_start: `M`
1131        # - snapshot_flag: `Y`, `N`
1132        3: {
1133            "cadence": GABCadence.WEEK.name,
1134            "recon": {
1135                GABCadence.DAY.name,
1136                GABCadence.MONTH.name,
1137                GABCadence.QUARTER.name,
1138                GABCadence.YEAR.name,
1139            },
1140            "week_start": "M",
1141            "snap_flag": {"Y", "N"},
1142            "join_select": """
1143            select distinct case
1144                when '${config_week_start}'  = 'Monday' then weekstart_mon
1145                when '${config_week_start}' = 'Sunday' then weekstart_sun
1146            end as cadence_start_date,
1147            case
1148                when '${config_week_start}' = 'Monday' then weekend_mon
1149                when '${config_week_start}' = 'Sunday' then weekend_sun
1150            end as cadence_end_date""",
1151            "project_start": _DEFAULT_PROJECT_START,
1152            "project_end": _DEFAULT_PROJECT_END,
1153        },
1154        4: {
1155            "cadence": GABCadence.MONTH.name,
1156            "recon": GABCadence.DAY.name,
1157            "week_start": GABStartOfWeek.get_values(),
1158            "snap_flag": "Y",
1159            "join_select": """
1160            select distinct month_start as cadence_start_date,
1161            calendar_date as cadence_end_date
1162        """,
1163            "project_start": _DEFAULT_PROJECT_START,
1164            "project_end": _DEFAULT_PROJECT_END,
1165        },
1166        5: {
1167            "cadence": GABCadence.MONTH.name,
1168            "recon": GABCadence.WEEK.name,
1169            "week_start": GABStartOfWeek.MONDAY.value,
1170            "snap_flag": "Y",
1171            "join_select": """
1172            select distinct month_start as cadence_start_date,
1173            case
1174                when date(
1175                    date_trunc('MONTH',add_months(calendar_date, 1))
1176                )-1 < weekend_mon
1177                    then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1
1178                else weekend_mon
1179            end as cadence_end_date""",
1180            "project_start": _DEFAULT_PROJECT_START,
1181            "project_end": _DEFAULT_PROJECT_END,
1182        },
1183        6: {
1184            "cadence": GABCadence.MONTH.name,
1185            "recon": GABCadence.WEEK.name,
1186            "week_start": GABStartOfWeek.SUNDAY.value,
1187            "snap_flag": "Y",
1188            "join_select": """
1189            select distinct month_start as cadence_start_date,
1190            case
1191                when date(
1192                    date_trunc('MONTH',add_months(calendar_date, 1))
1193                )-1 < weekend_sun
1194                    then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1
1195                else weekend_sun
1196            end as cadence_end_date""",
1197            "project_start": _DEFAULT_PROJECT_START,
1198            "project_end": _DEFAULT_PROJECT_END,
1199        },
1200        7: {
1201            "cadence": GABCadence.MONTH.name,
1202            "recon": GABCadence.get_cadences(),
1203            "week_start": GABStartOfWeek.get_values(),
1204            "snap_flag": {"Y", "N"},
1205            "join_select": "",
1206            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1207            "project_end": "date(date_trunc('MONTH',add_months(${date_column}, 1)))-1",
1208        },
1209        8: {
1210            "cadence": GABCadence.QUARTER.name,
1211            "recon": GABCadence.DAY.name,
1212            "week_start": GABStartOfWeek.get_values(),
1213            "snap_flag": "Y",
1214            "join_select": """
1215            select distinct quarter_start as cadence_start_date,
1216            calendar_date as cadence_end_date
1217        """,
1218            "project_start": _DEFAULT_PROJECT_START,
1219            "project_end": _DEFAULT_PROJECT_END,
1220        },
1221        9: {
1222            "cadence": GABCadence.QUARTER.name,
1223            "recon": GABCadence.WEEK.name,
1224            "week_start": GABStartOfWeek.MONDAY.value,
1225            "snap_flag": "Y",
1226            "join_select": """
1227            select distinct quarter_start as cadence_start_date,
1228            case
1229                when weekend_mon > date(
1230                    date_trunc('QUARTER',add_months(calendar_date, 3))
1231                )-1
1232                    then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1
1233                else weekend_mon
1234            end as cadence_end_date""",
1235            "project_start": _DEFAULT_PROJECT_START,
1236            "project_end": _DEFAULT_PROJECT_END,
1237        },
1238        10: {
1239            "cadence": GABCadence.QUARTER.name,
1240            "recon": GABCadence.WEEK.name,
1241            "week_start": GABStartOfWeek.SUNDAY.value,
1242            "snap_flag": "Y",
1243            "join_select": """
1244            select distinct quarter_start as cadence_start_date,
1245            case
1246                when weekend_sun > date(
1247                    date_trunc('QUARTER',add_months(calendar_date, 3))
1248                )-1
1249                    then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1
1250                else weekend_sun
1251            end as cadence_end_date""",
1252            "project_start": _DEFAULT_PROJECT_START,
1253            "project_end": _DEFAULT_PROJECT_END,
1254        },
1255        11: {
1256            "cadence": GABCadence.QUARTER.name,
1257            "recon": GABCadence.MONTH.name,
1258            "week_start": GABStartOfWeek.get_values(),
1259            "snap_flag": "Y",
1260            "join_select": """
1261            select distinct quarter_start as cadence_start_date,
1262            month_end as cadence_end_date
1263        """,
1264            "project_start": _DEFAULT_PROJECT_START,
1265            "project_end": _DEFAULT_PROJECT_END,
1266        },
1267        12: {
1268            "cadence": GABCadence.QUARTER.name,
1269            "recon": GABCadence.YEAR.name,
1270            "week_start": GABStartOfWeek.get_values(),
1271            "snap_flag": "N",
1272            "join_select": "",
1273            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1274            "project_end": """
1275            date(
1276                date_trunc(
1277                    '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 3)
1278                )
1279            )-1
1280        """,
1281        },
1282        13: {
1283            "cadence": GABCadence.QUARTER.name,
1284            "recon": GABCadence.get_cadences(),
1285            "week_start": GABStartOfWeek.get_values(),
1286            "snap_flag": "N",
1287            "join_select": "",
1288            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1289            "project_end": """
1290            date(
1291                date_trunc(
1292                    '${cad}',add_months( date(date_trunc('${cad}',${date_column})), 3)
1293                )
1294            )-1
1295        """,
1296        },
1297        14: {
1298            "cadence": GABCadence.YEAR.name,
1299            "recon": GABCadence.WEEK.name,
1300            "week_start": GABStartOfWeek.MONDAY.value,
1301            "snap_flag": "Y",
1302            "join_select": """
1303            select distinct year_start as cadence_start_date,
1304            case
1305                when weekend_mon > date(
1306                    date_trunc('YEAR',add_months(calendar_date, 12))
1307                )-1
1308                    then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1
1309                else weekend_mon
1310            end as cadence_end_date""",
1311            "project_start": _DEFAULT_PROJECT_START,
1312            "project_end": _DEFAULT_PROJECT_END,
1313        },
1314        15: {
1315            "cadence": GABCadence.YEAR.name,
1316            "recon": GABCadence.WEEK.name,
1317            "week_start": GABStartOfWeek.SUNDAY.value,
1318            "snap_flag": "Y",
1319            "join_select": """
1320            select distinct year_start as cadence_start_date,
1321            case
1322                when weekend_sun > date(
1323                    date_trunc('YEAR',add_months(calendar_date, 12))
1324                )-1
1325                    then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1
1326                else weekend_sun
1327            end as cadence_end_date""",
1328            "project_start": _DEFAULT_PROJECT_START,
1329            "project_end": _DEFAULT_PROJECT_END,
1330        },
1331        16: {
1332            "cadence": GABCadence.YEAR.name,
1333            "recon": GABCadence.get_cadences(),
1334            "week_start": GABStartOfWeek.get_values(),
1335            "snap_flag": "N",
1336            "inverse_flag": "Y",
1337            "join_select": "",
1338            "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE,
1339            "project_end": """
1340            date(
1341                date_trunc(
1342                    '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 12)
1343                )
1344            )-1
1345        """,
1346        },
1347        17: {
1348            "cadence": GABCadence.YEAR.name,
1349            "recon": {
1350                GABCadence.DAY.name,
1351                GABCadence.MONTH.name,
1352                GABCadence.QUARTER.name,
1353            },
1354            "week_start": GABStartOfWeek.get_values(),
1355            "snap_flag": "Y",
1356            "join_select": """
1357            select distinct year_start as cadence_start_date,
1358            case
1359                when '${rec_cadence}' = 'DAY' then calendar_date
1360                when '${rec_cadence}' = 'MONTH' then month_end
1361                when '${rec_cadence}' = 'QUARTER' then quarter_end
1362            end as cadence_end_date
1363        """,
1364            "project_start": _DEFAULT_PROJECT_START,
1365            "project_end": _DEFAULT_PROJECT_END,
1366        },
1367        18: {
1368            "cadence": GABCadence.get_cadences(),
1369            "recon": GABCadence.get_cadences(),
1370            "week_start": GABStartOfWeek.get_values(),
1371            "snap_flag": {"Y", "N"},
1372            "join_select": """
1373            select distinct
1374            case
1375                when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'
1376                    then weekstart_mon
1377                when  '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'
1378                    then weekstart_sun
1379                else
1380                    date(date_trunc('${cad}',calendar_date))
1381            end as cadence_start_date,
1382            case
1383                when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'
1384                    then weekend_mon
1385                when  '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'
1386                    then weekend_sun
1387                when '${cad}' = 'DAY'
1388                    then date(date_trunc('${cad}',calendar_date))
1389                when '${cad}' = 'MONTH'
1390                    then date(
1391                        date_trunc(
1392                            'MONTH',
1393                            add_months(date(date_trunc('${cad}',calendar_date)), 1)
1394                        )
1395                    )-1
1396                when '${cad}' = 'QUARTER'
1397                    then date(
1398                        date_trunc(
1399                            'QUARTER',
1400                            add_months(date(date_trunc('${cad}',calendar_date)) , 3)
1401                        )
1402                    )-1
1403                when '${cad}' = 'YEAR'
1404                    then date(
1405                        date_trunc(
1406                            'YEAR',
1407                            add_months(date(date_trunc('${cad}',calendar_date)), 12)
1408                        )
1409                    )-1
1410            end as cadence_end_date
1411        """,
1412            "project_start": _DEFAULT_PROJECT_START,
1413            "project_end": _DEFAULT_PROJECT_END,
1414        },
1415    }

GAB combined configuration.

Based on the use case configuration return the values to override in the SQL file. This enum aims to exhaustively map each combination of cadence, reconciliation, week_start and snap_flag return the corresponding values join_select, project_start and project_end to replace this values in the stages SQL file.

Return corresponding configuration (join_select, project_start, project_end) for each combination (cadence x recon x week_start x snap_flag).

COMBINED_CONFIGURATION = <GABCombinedConfiguration.COMBINED_CONFIGURATION: {1: {'cadence': 'DAY', 'recon': {'WEEK', 'YEAR', 'MONTH', 'DAY', 'QUARTER'}, 'week_start': {'S', 'M'}, 'snap_flag': {'Y', 'N'}, 'join_select': '', 'project_start': "date(date_trunc('${cad}',${date_column}))", 'project_end': "date(date_trunc('${cad}',${date_column}))"}, 2: {'cadence': 'WEEK', 'recon': 'DAY', 'week_start': {'S', 'M'}, 'snap_flag': 'Y', 'join_select': "\n select distinct case\n when '${config_week_start}' = 'Monday' then weekstart_mon\n when '${config_week_start}' = 'Sunday' then weekstart_sun\n end as cadence_start_date,\n calendar_date as cadence_end_date\n ", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 3: {'cadence': 'WEEK', 'recon': {'MONTH', 'DAY', 'YEAR', 'QUARTER'}, 'week_start': 'M', 'snap_flag': {'Y', 'N'}, 'join_select': "\n select distinct case\n when '${config_week_start}' = 'Monday' then weekstart_mon\n when '${config_week_start}' = 'Sunday' then weekstart_sun\n end as cadence_start_date,\n case\n when '${config_week_start}' = 'Monday' then weekend_mon\n when '${config_week_start}' = 'Sunday' then weekend_sun\n end as cadence_end_date", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 4: {'cadence': 'MONTH', 'recon': 'DAY', 'week_start': {'S', 'M'}, 'snap_flag': 'Y', 'join_select': '\n select distinct month_start as cadence_start_date,\n calendar_date as cadence_end_date\n ', 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 5: {'cadence': 'MONTH', 'recon': 'WEEK', 'week_start': 'M', 'snap_flag': 'Y', 'join_select': "\n select distinct month_start as cadence_start_date,\n case\n when date(\n date_trunc('MONTH',add_months(calendar_date, 1))\n )-1 < weekend_mon\n then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1\n else weekend_mon\n end as cadence_end_date", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 6: {'cadence': 'MONTH', 'recon': 'WEEK', 'week_start': 'S', 'snap_flag': 'Y', 'join_select': "\n select distinct month_start as cadence_start_date,\n case\n when date(\n date_trunc('MONTH',add_months(calendar_date, 1))\n )-1 < weekend_sun\n then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1\n else weekend_sun\n end as cadence_end_date", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 7: {'cadence': 'MONTH', 'recon': {'WEEK', 'YEAR', 'MONTH', 'DAY', 'QUARTER'}, 'week_start': {'S', 'M'}, 'snap_flag': {'Y', 'N'}, 'join_select': '', 'project_start': "date(date_trunc('${cad}',${date_column}))", 'project_end': "date(date_trunc('MONTH',add_months(${date_column}, 1)))-1"}, 8: {'cadence': 'QUARTER', 'recon': 'DAY', 'week_start': {'S', 'M'}, 'snap_flag': 'Y', 'join_select': '\n select distinct quarter_start as cadence_start_date,\n calendar_date as cadence_end_date\n ', 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 9: {'cadence': 'QUARTER', 'recon': 'WEEK', 'week_start': 'M', 'snap_flag': 'Y', 'join_select': "\n select distinct quarter_start as cadence_start_date,\n case\n when weekend_mon > date(\n date_trunc('QUARTER',add_months(calendar_date, 3))\n )-1\n then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1\n else weekend_mon\n end as cadence_end_date", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 10: {'cadence': 'QUARTER', 'recon': 'WEEK', 'week_start': 'S', 'snap_flag': 'Y', 'join_select': "\n select distinct quarter_start as cadence_start_date,\n case\n when weekend_sun > date(\n date_trunc('QUARTER',add_months(calendar_date, 3))\n )-1\n then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1\n else weekend_sun\n end as cadence_end_date", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 11: {'cadence': 'QUARTER', 'recon': 'MONTH', 'week_start': {'S', 'M'}, 'snap_flag': 'Y', 'join_select': '\n select distinct quarter_start as cadence_start_date,\n month_end as cadence_end_date\n ', 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 12: {'cadence': 'QUARTER', 'recon': 'YEAR', 'week_start': {'S', 'M'}, 'snap_flag': 'N', 'join_select': '', 'project_start': "date(date_trunc('${cad}',${date_column}))", 'project_end': "\n date(\n date_trunc(\n '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 3)\n )\n )-1\n "}, 13: {'cadence': 'QUARTER', 'recon': {'WEEK', 'YEAR', 'MONTH', 'DAY', 'QUARTER'}, 'week_start': {'S', 'M'}, 'snap_flag': 'N', 'join_select': '', 'project_start': "date(date_trunc('${cad}',${date_column}))", 'project_end': "\n date(\n date_trunc(\n '${cad}',add_months( date(date_trunc('${cad}',${date_column})), 3)\n )\n )-1\n "}, 14: {'cadence': 'YEAR', 'recon': 'WEEK', 'week_start': 'M', 'snap_flag': 'Y', 'join_select': "\n select distinct year_start as cadence_start_date,\n case\n when weekend_mon > date(\n date_trunc('YEAR',add_months(calendar_date, 12))\n )-1\n then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1\n else weekend_mon\n end as cadence_end_date", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 15: {'cadence': 'YEAR', 'recon': 'WEEK', 'week_start': 'S', 'snap_flag': 'Y', 'join_select': "\n select distinct year_start as cadence_start_date,\n case\n when weekend_sun > date(\n date_trunc('YEAR',add_months(calendar_date, 12))\n )-1\n then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1\n else weekend_sun\n end as cadence_end_date", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 16: {'cadence': 'YEAR', 'recon': {'WEEK', 'YEAR', 'MONTH', 'DAY', 'QUARTER'}, 'week_start': {'S', 'M'}, 'snap_flag': 'N', 'inverse_flag': 'Y', 'join_select': '', 'project_start': "date(date_trunc('${cad}',${date_column}))", 'project_end': "\n date(\n date_trunc(\n '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 12)\n )\n )-1\n "}, 17: {'cadence': 'YEAR', 'recon': {'MONTH', 'DAY', 'QUARTER'}, 'week_start': {'S', 'M'}, 'snap_flag': 'Y', 'join_select': "\n select distinct year_start as cadence_start_date,\n case\n when '${rec_cadence}' = 'DAY' then calendar_date\n when '${rec_cadence}' = 'MONTH' then month_end\n when '${rec_cadence}' = 'QUARTER' then quarter_end\n end as cadence_end_date\n ", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}, 18: {'cadence': {'WEEK', 'YEAR', 'MONTH', 'DAY', 'QUARTER'}, 'recon': {'WEEK', 'YEAR', 'MONTH', 'DAY', 'QUARTER'}, 'week_start': {'S', 'M'}, 'snap_flag': {'Y', 'N'}, 'join_select': "\n select distinct\n case\n when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'\n then weekstart_mon\n when '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'\n then weekstart_sun\n else\n date(date_trunc('${cad}',calendar_date))\n end as cadence_start_date,\n case\n when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday'\n then weekend_mon\n when '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday'\n then weekend_sun\n when '${cad}' = 'DAY'\n then date(date_trunc('${cad}',calendar_date))\n when '${cad}' = 'MONTH'\n then date(\n date_trunc(\n 'MONTH',\n add_months(date(date_trunc('${cad}',calendar_date)), 1)\n )\n )-1\n when '${cad}' = 'QUARTER'\n then date(\n date_trunc(\n 'QUARTER',\n add_months(date(date_trunc('${cad}',calendar_date)) , 3)\n )\n )-1\n when '${cad}' = 'YEAR'\n then date(\n date_trunc(\n 'YEAR',\n add_months(date(date_trunc('${cad}',calendar_date)), 12)\n )\n )-1\n end as cadence_end_date\n ", 'project_start': 'df_cal.cadence_start_date', 'project_end': 'df_cal.cadence_end_date'}}>
Inherited Members
enum.Enum
name
value