lakehouse_engine.core.definitions
Definitions of standard values and structures for core components.
1"""Definitions of standard values and structures for core components.""" 2 3from dataclasses import dataclass 4from datetime import datetime 5from enum import Enum 6from typing import List, Optional, Union 7 8from pyspark.sql import DataFrame 9from pyspark.sql.types import ( 10 ArrayType, 11 BooleanType, 12 StringType, 13 StructField, 14 StructType, 15 TimestampType, 16) 17 18 19class CollectEngineUsage(Enum): 20 """Options for collecting engine usage stats. 21 22 - enabled, enables the collection and storage of Lakehouse Engine 23 usage statistics for any environment. 24 - prod_only, enables the collection and storage of Lakehouse Engine 25 usage statistics for production environment only. 26 - disabled, disables the collection and storage of Lakehouse Engine 27 usage statistics, for all environments. 28 """ 29 30 ENABLED = "enabled" 31 PROD_ONLY = "prod_only" 32 DISABLED = "disabled" 33 34 35@dataclass 36class EngineConfig(object): 37 """Definitions that can come from the Engine Config file. 38 39 - dq_bucket: S3 prod bucket used to store data quality related artifacts. 40 - dq_dev_bucket: S3 dev bucket used to store data quality related artifacts. 41 - notif_disallowed_email_servers: email servers not allowed to be used 42 for sending notifications. 43 - engine_usage_path: path where the engine prod usage stats are stored. 44 - engine_dev_usage_path: path where the engine dev usage stats are stored. 45 - collect_engine_usage: whether to enable the collection of lakehouse 46 engine usage stats or not. 47 - dq_functions_column_list: list of columns to be added to the meta argument 48 of GX when using PRISMA. 49 """ 50 51 dq_bucket: Optional[str] = None 52 dq_dev_bucket: Optional[str] = None 53 notif_disallowed_email_servers: Optional[list] = None 54 engine_usage_path: Optional[str] = None 55 engine_dev_usage_path: Optional[str] = None 56 collect_engine_usage: str = CollectEngineUsage.ENABLED.value 57 dq_functions_column_list: Optional[list] = None 58 59 60class EngineStats(Enum): 61 """Definitions for collection of Lakehouse Engine Stats. 62 63 .. note:: 64 Note: whenever the value comes from a key inside a Spark Config 65 that returns an array, it can be specified with a '#' so that it 66 is adequately processed. 67 """ 68 69 CLUSTER_USAGE_TAGS = "spark.databricks.clusterUsageTags" 70 DEF_SPARK_CONFS = { 71 "dp_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#accountName", 72 "environment": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#environment", 73 "workspace_id": f"{CLUSTER_USAGE_TAGS}.orgId", 74 "job_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#JobId", 75 "job_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#RunName", 76 "run_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#ClusterName", 77 } 78 79 80class InputFormat(Enum): 81 """Formats of algorithm input.""" 82 83 JDBC = "jdbc" 84 AVRO = "avro" 85 JSON = "json" 86 CSV = "csv" 87 PARQUET = "parquet" 88 DELTAFILES = "delta" 89 CLOUDFILES = "cloudfiles" 90 KAFKA = "kafka" 91 SQL = "sql" 92 SAP_BW = "sap_bw" 93 SAP_B4 = "sap_b4" 94 DATAFRAME = "dataframe" 95 SFTP = "sftp" 96 97 @classmethod 98 def values(cls): # type: ignore 99 """Generates a list containing all enum values. 100 101 Return: 102 A list with all enum values. 103 """ 104 return (c.value for c in cls) 105 106 @classmethod 107 def exists(cls, input_format: str) -> bool: 108 """Checks if the input format exists in the enum values. 109 110 Args: 111 input_format: format to check if exists. 112 113 Return: 114 If the input format exists in our enum. 115 """ 116 return input_format in cls.values() 117 118 119# Formats of input that are considered files. 120FILE_INPUT_FORMATS = [ 121 InputFormat.AVRO.value, 122 InputFormat.JSON.value, 123 InputFormat.PARQUET.value, 124 InputFormat.CSV.value, 125 InputFormat.DELTAFILES.value, 126 InputFormat.CLOUDFILES.value, 127] 128 129 130class OutputFormat(Enum): 131 """Formats of algorithm output.""" 132 133 JDBC = "jdbc" 134 AVRO = "avro" 135 JSON = "json" 136 CSV = "csv" 137 PARQUET = "parquet" 138 DELTAFILES = "delta" 139 KAFKA = "kafka" 140 CONSOLE = "console" 141 NOOP = "noop" 142 DATAFRAME = "dataframe" 143 REST_API = "rest_api" 144 FILE = "file" # Internal use only 145 TABLE = "table" # Internal use only 146 147 @classmethod 148 def values(cls): # type: ignore 149 """Generates a list containing all enum values. 150 151 Return: 152 A list with all enum values. 153 """ 154 return (c.value for c in cls) 155 156 @classmethod 157 def exists(cls, output_format: str) -> bool: 158 """Checks if the output format exists in the enum values. 159 160 Args: 161 output_format: format to check if exists. 162 163 Return: 164 If the output format exists in our enum. 165 """ 166 return output_format in cls.values() 167 168 169# Formats of output that are considered files. 170FILE_OUTPUT_FORMATS = [ 171 OutputFormat.AVRO.value, 172 OutputFormat.JSON.value, 173 OutputFormat.PARQUET.value, 174 OutputFormat.CSV.value, 175 OutputFormat.DELTAFILES.value, 176] 177 178 179class NotifierType(Enum): 180 """Type of notifier available.""" 181 182 EMAIL = "email" 183 184 185class NotificationRuntimeParameters(Enum): 186 """Parameters to be replaced in runtime.""" 187 188 DATABRICKS_JOB_NAME = "databricks_job_name" 189 DATABRICKS_WORKSPACE_ID = "databricks_workspace_id" 190 191 192NOTIFICATION_RUNTIME_PARAMETERS = [ 193 NotificationRuntimeParameters.DATABRICKS_JOB_NAME.value, 194 NotificationRuntimeParameters.DATABRICKS_WORKSPACE_ID.value, 195] 196 197 198class ReadType(Enum): 199 """Define the types of read operations. 200 201 - BATCH - read the data in batch mode (e.g., Spark batch). 202 - STREAMING - read the data in streaming mode (e.g., Spark streaming). 203 """ 204 205 BATCH = "batch" 206 STREAMING = "streaming" 207 208 209class ReadMode(Enum): 210 """Different modes that control how we handle compliance to the provided schema. 211 212 These read modes map to Spark's read modes at the moment. 213 """ 214 215 PERMISSIVE = "PERMISSIVE" 216 FAILFAST = "FAILFAST" 217 DROPMALFORMED = "DROPMALFORMED" 218 219 220class DQDefaults(Enum): 221 """Defaults used on the data quality process.""" 222 223 FILE_SYSTEM_STORE = "file_system" 224 FILE_SYSTEM_S3_STORE = "s3" 225 DQ_BATCH_IDENTIFIERS = ["spec_id", "input_id", "timestamp"] 226 DATASOURCE_CLASS_NAME = "Datasource" 227 DATASOURCE_EXECUTION_ENGINE = "SparkDFExecutionEngine" 228 DATA_CONNECTORS_CLASS_NAME = "RuntimeDataConnector" 229 DATA_CONNECTORS_MODULE_NAME = "great_expectations.datasource.data_connector" 230 DATA_CHECKPOINTS_CLASS_NAME = "SimpleCheckpoint" 231 DATA_CHECKPOINTS_CONFIG_VERSION = 1.0 232 STORE_BACKEND = "s3" 233 EXPECTATIONS_STORE_PREFIX = "dq/expectations/" 234 VALIDATIONS_STORE_PREFIX = "dq/validations/" 235 DATA_DOCS_PREFIX = "dq/data_docs/site/" 236 CHECKPOINT_STORE_PREFIX = "dq/checkpoints/" 237 VALIDATION_COLUMN_IDENTIFIER = "validationresultidentifier" 238 CUSTOM_EXPECTATION_LIST = [ 239 "expect_column_values_to_be_date_not_older_than", 240 "expect_column_pair_a_to_be_smaller_or_equal_than_b", 241 "expect_multicolumn_column_a_must_equal_b_or_c", 242 "expect_queried_column_agg_value_to_be", 243 "expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b", 244 "expect_column_pair_a_to_be_not_equal_to_b", 245 ] 246 DQ_VALIDATIONS_SCHEMA = StructType( 247 [ 248 StructField( 249 "dq_validations", 250 StructType( 251 [ 252 StructField("run_name", StringType()), 253 StructField("run_success", BooleanType()), 254 StructField("raised_exceptions", BooleanType()), 255 StructField("run_row_success", BooleanType()), 256 StructField( 257 "dq_failure_details", 258 ArrayType( 259 StructType( 260 [ 261 StructField("expectation_type", StringType()), 262 StructField("kwargs", StringType()), 263 ] 264 ), 265 ), 266 ), 267 ] 268 ), 269 ) 270 ] 271 ) 272 273 274class WriteType(Enum): 275 """Types of write operations.""" 276 277 OVERWRITE = "overwrite" 278 COMPLETE = "complete" 279 APPEND = "append" 280 UPDATE = "update" 281 MERGE = "merge" 282 ERROR_IF_EXISTS = "error" 283 IGNORE_IF_EXISTS = "ignore" 284 285 286@dataclass 287class InputSpec(object): 288 """Specification of an algorithm input. 289 290 This is very aligned with the way the execution environment connects to the sources 291 (e.g., spark sources). 292 293 - spec_id: spec_id of the input specification read_type: ReadType type of read 294 operation. 295 - data_format: format of the input. 296 - sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp 297 directory. 298 - df_name: dataframe name. 299 - db_table: table name in the form of `<db>.<table>`. 300 - location: uri that identifies from where to read data in the specified format. 301 - enforce_schema_from_table: if we want to enforce the table schema or not, by 302 providing a table name in the form of `<db>.<table>`. 303 - query: sql query to execute and return the dataframe. Use it if you do not want to 304 read from a file system nor from a table, but rather from a sql query instead. 305 - schema: dict representation of a schema of the input (e.g., Spark struct type 306 schema). 307 - schema_path: path to a file with a representation of a schema of the input (e.g., 308 Spark struct type schema). 309 - disable_dbfs_retry: optional flag to disable file storage dbfs. 310 - with_filepath: if we want to include the path of the file that is being read. Only 311 works with the file reader (batch and streaming modes are supported). 312 - options: dict with other relevant options according to the execution 313 environment (e.g., spark) possible sources. 314 - calculate_upper_bound: when to calculate upper bound to extract from SAP BW 315 or not. 316 - calc_upper_bound_schema: specific schema for the calculated upper_bound. 317 - generate_predicates: when to generate predicates to extract from SAP BW or not. 318 - predicates_add_null: if we want to include is null on partition by predicates. 319 - temp_view: optional name of a view to point to the input dataframe to be used 320 to create or replace a temp view on top of the dataframe. 321 """ 322 323 spec_id: str 324 read_type: str 325 data_format: Optional[str] = None 326 sftp_files_format: Optional[str] = None 327 df_name: Optional[DataFrame] = None 328 db_table: Optional[str] = None 329 location: Optional[str] = None 330 query: Optional[str] = None 331 enforce_schema_from_table: Optional[str] = None 332 schema: Optional[dict] = None 333 schema_path: Optional[str] = None 334 disable_dbfs_retry: bool = False 335 with_filepath: bool = False 336 options: Optional[dict] = None 337 jdbc_args: Optional[dict] = None 338 calculate_upper_bound: bool = False 339 calc_upper_bound_schema: Optional[str] = None 340 generate_predicates: bool = False 341 predicates_add_null: bool = True 342 temp_view: Optional[str] = None 343 344 345@dataclass 346class TransformerSpec(object): 347 """Transformer Specification, i.e., a single transformation amongst many. 348 349 - function: name of the function (or callable function) to be executed. 350 - args: (not applicable if using a callable function) dict with the arguments 351 to pass to the function `<k,v>` pairs with the name of the parameter of 352 the function and the respective value. 353 """ 354 355 function: str 356 args: dict 357 358 359@dataclass 360class TransformSpec(object): 361 """Transformation Specification. 362 363 I.e., the specification that defines the many transformations to be done to the data 364 that was read. 365 366 - spec_id: id of the terminate specification 367 - input_id: id of the corresponding input 368 specification. 369 - transformers: list of transformers to execute. 370 - force_streaming_foreach_batch_processing: sometimes, when using streaming, we want 371 to force the transform to be executed in the foreachBatch function to ensure 372 non-supported streaming operations can be properly executed. 373 """ 374 375 spec_id: str 376 input_id: str 377 transformers: List[TransformerSpec] 378 force_streaming_foreach_batch_processing: bool = False 379 380 381class DQType(Enum): 382 """Available data quality tasks.""" 383 384 VALIDATOR = "validator" 385 PRISMA = "prisma" 386 387 388class DQExecutionPoint(Enum): 389 """Available data quality execution points.""" 390 391 IN_MOTION = "in_motion" 392 AT_REST = "at_rest" 393 394 395class DQTableBaseParameters(Enum): 396 """Base parameters for importing DQ rules from a table.""" 397 398 PRISMA_BASE_PARAMETERS = ["arguments", "dq_tech_function"] 399 400 401@dataclass 402class DQFunctionSpec(object): 403 """Defines a data quality function specification. 404 405 - function - name of the data quality function (expectation) to execute. 406 It follows the great_expectations api https://greatexpectations.io/expectations/. 407 - args - args of the function (expectation). Follow the same api as above. 408 """ 409 410 function: str 411 args: Optional[dict] = None 412 413 414@dataclass 415class DQSpec(object): 416 """Data quality overall specification. 417 418 - spec_id - id of the specification. 419 - input_id - id of the input specification. 420 - dq_type - type of DQ process to execute (e.g. validator). 421 - dq_functions - list of function specifications to execute. 422 - dq_db_table - name of table to derive the dq functions from. 423 - dq_table_table_filter - name of the table which rules are to be applied in the 424 validations (Only used when deriving dq functions). 425 - dq_table_extra_filters - extra filters to be used when deriving dq functions. 426 This is a sql expression to be applied to the dq_db_table. 427 - execution_point - execution point of the dq functions. [at_rest, in_motion]. 428 This is set during the load_data or dq_validator functions. 429 - unexpected_rows_pk - the list of columns composing the primary key of the 430 source data to identify the rows failing the DQ validations. Note: only one 431 of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It 432 is mandatory to provide one of these arguments when using tag_source_data 433 as True. When tag_source_data is False, this is not mandatory, but still 434 recommended. 435 - tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from. 436 Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to 437 be provided. It is mandatory to provide one of these arguments when using 438 tag_source_data as True. hen tag_source_data is False, this is not 439 mandatory, but still recommended. 440 - sort_processed_keys - when using the `prisma` `dq_type`, a column `processed_keys` 441 is automatically added to give observability over the PK values that were 442 processed during a run. This parameter (`sort_processed_keys`) controls whether 443 the processed keys column value should be sorted or not. Default: False. 444 - gx_result_format - great expectations result format. Default: "COMPLETE". 445 - tag_source_data - when set to true, this will ensure that the DQ process ends by 446 tagging the source data with an additional column with information about the 447 DQ results. This column makes it possible to identify if the DQ run was 448 succeeded in general and, if not, it unlocks the insights to know what 449 specific rows have made the DQ validations fail and why. Default: False. 450 Note: it only works if result_sink_explode is True, gx_result_format is 451 COMPLETE, fail_on_error is False (which is done automatically when 452 you specify tag_source_data as True) and tbl_to_derive_pk or 453 unexpected_rows_pk is configured. 454 - store_backend - which store_backend to use (e.g. s3 or file_system). 455 - local_fs_root_dir - path of the root directory. Note: only applicable for 456 store_backend file_system. 457 - data_docs_local_fs - the path for data docs only for store_backend 458 file_system. 459 - bucket - the bucket name to consider for the store_backend (store DQ artefacts). 460 Note: only applicable for store_backend s3. 461 - data_docs_bucket - the bucket name for data docs only. When defined, it will 462 supersede bucket parameter. Note: only applicable for store_backend s3. 463 - expectations_store_prefix - prefix where to store expectations' data. Note: only 464 applicable for store_backend s3. 465 - validations_store_prefix - prefix where to store validations' data. Note: only 466 applicable for store_backend s3. 467 - data_docs_prefix - prefix where to store data_docs' data. 468 - checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only 469 applicable for store_backend s3. 470 - data_asset_name - name of the data asset to consider when configuring the great 471 expectations' data source. 472 - expectation_suite_name - name to consider for great expectations' suite. 473 - result_sink_db_table - db.table_name indicating the database and table in which 474 to save the results of the DQ process. 475 - result_sink_location - file system location in which to save the results of the 476 DQ process. 477 - data_product_name - name of the data product. 478 - result_sink_partitions - the list of partitions to consider. 479 - result_sink_format - format of the result table (e.g. delta, parquet, kafka...). 480 - result_sink_options - extra spark options for configuring the result sink. 481 E.g: can be used to configure a Kafka sink if result_sink_format is kafka. 482 - result_sink_explode - flag to determine if the output table/location should have 483 the columns exploded (as True) or not (as False). Default: True. 484 - result_sink_extra_columns - list of extra columns to be exploded (following 485 the pattern "<name>.*") or columns to be selected. It is only used when 486 result_sink_explode is set to True. 487 - source - name of data source, to be easier to identify in analysis. If not 488 specified, it is set as default <input_id>. This will be only used 489 when result_sink_explode is set to True. 490 - fail_on_error - whether to fail the algorithm if the validations of your data in 491 the DQ process failed. 492 - cache_df - whether to cache the dataframe before running the DQ process or not. 493 - critical_functions - functions that should not fail. When this argument is 494 defined, fail_on_error is nullified. 495 - max_percentage_failure - percentage of failure that should be allowed. 496 This argument has priority over both fail_on_error and critical_functions. 497 """ 498 499 spec_id: str 500 input_id: str 501 dq_type: str 502 dq_functions: Optional[List[DQFunctionSpec]] = None 503 dq_db_table: Optional[str] = None 504 dq_table_table_filter: Optional[str] = None 505 dq_table_extra_filters: Optional[str] = None 506 execution_point: Optional[str] = None 507 unexpected_rows_pk: Optional[List[str]] = None 508 tbl_to_derive_pk: Optional[str] = None 509 sort_processed_keys: Optional[bool] = False 510 gx_result_format: Optional[str] = "COMPLETE" 511 tag_source_data: Optional[bool] = False 512 store_backend: str = DQDefaults.STORE_BACKEND.value 513 local_fs_root_dir: Optional[str] = None 514 data_docs_local_fs: Optional[str] = None 515 bucket: Optional[str] = None 516 data_docs_bucket: Optional[str] = None 517 expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value 518 validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value 519 data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value 520 checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value 521 data_asset_name: Optional[str] = None 522 expectation_suite_name: Optional[str] = None 523 result_sink_db_table: Optional[str] = None 524 result_sink_location: Optional[str] = None 525 data_product_name: Optional[str] = None 526 result_sink_partitions: Optional[List[str]] = None 527 result_sink_format: str = OutputFormat.DELTAFILES.value 528 result_sink_options: Optional[dict] = None 529 result_sink_explode: bool = True 530 result_sink_extra_columns: Optional[List[str]] = None 531 source: Optional[str] = None 532 fail_on_error: bool = True 533 cache_df: bool = False 534 critical_functions: Optional[List[DQFunctionSpec]] = None 535 max_percentage_failure: Optional[float] = None 536 537 538@dataclass 539class MergeOptions(object): 540 """Options for a merge operation. 541 542 - merge_predicate: predicate to apply to the merge operation so that we can 543 check if a new record corresponds to a record already included in the 544 historical data. 545 - insert_only: indicates if the merge should only insert data (e.g., deduplicate 546 scenarios). 547 - delete_predicate: predicate to apply to the delete operation. 548 - update_predicate: predicate to apply to the update operation. 549 - insert_predicate: predicate to apply to the insert operation. 550 - update_column_set: rules to apply to the update operation which allows to 551 set the value for each column to be updated. 552 (e.g. {"data": "new.data", "count": "current.count + 1"} ) 553 - insert_column_set: rules to apply to the insert operation which allows to 554 set the value for each column to be inserted. 555 (e.g. {"date": "updates.date", "count": "1"} ) 556 """ 557 558 merge_predicate: str 559 insert_only: bool = False 560 delete_predicate: Optional[str] = None 561 update_predicate: Optional[str] = None 562 insert_predicate: Optional[str] = None 563 update_column_set: Optional[dict] = None 564 insert_column_set: Optional[dict] = None 565 566 567@dataclass 568class OutputSpec(object): 569 """Specification of an algorithm output. 570 571 This is very aligned with the way the execution environment connects to the output 572 systems (e.g., spark outputs). 573 574 - spec_id: id of the output specification. 575 - input_id: id of the corresponding input specification. 576 - write_type: type of write operation. 577 - data_format: format of the output. Defaults to DELTA. 578 - db_table: table name in the form of `<db>.<table>`. 579 - location: uri that identifies from where to write data in the specified format. 580 - partitions: list of partition input_col names. 581 - merge_opts: options to apply to the merge operation. 582 - streaming_micro_batch_transformers: transformers to invoke for each streaming 583 micro batch, before writing (i.e., in Spark's foreachBatch structured 584 streaming function). Note: the lakehouse engine manages this for you, so 585 you don't have to manually specify streaming transformations here, so we don't 586 advise you to manually specify transformations through this parameter. Supply 587 them as regular transformers in the transform_specs sections of an ACON. 588 - streaming_once: if the streaming query is to be executed just once, or not, 589 generating just one micro batch. 590 - streaming_processing_time: if streaming query is to be kept alive, this indicates 591 the processing time of each micro batch. 592 - streaming_available_now: if set to True, set a trigger that processes all 593 available data in multiple batches then terminates the query. 594 When using streaming, this is the default trigger that the lakehouse-engine will 595 use, unless you configure a different one. 596 - streaming_continuous: set a trigger that runs a continuous query with a given 597 checkpoint interval. 598 - streaming_await_termination: whether to wait (True) for the termination of the 599 streaming query (e.g. timeout or exception) or not (False). Default: True. 600 - streaming_await_termination_timeout: a timeout to set to the 601 streaming_await_termination. Default: None. 602 - with_batch_id: whether to include the streaming batch id in the final data, 603 or not. It only takes effect in streaming mode. 604 - options: dict with other relevant options according to the execution environment 605 (e.g., spark) possible outputs. E.g.,: JDBC options, checkpoint location for 606 streaming, etc. 607 - streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers 608 but for the DQ functions to be executed. Used internally by the lakehouse 609 engine, so you don't have to supply DQ functions through this parameter. Use the 610 dq_specs of the acon instead. 611 """ 612 613 spec_id: str 614 input_id: str 615 write_type: str 616 data_format: str = OutputFormat.DELTAFILES.value 617 db_table: Optional[str] = None 618 location: Optional[str] = None 619 merge_opts: Optional[MergeOptions] = None 620 partitions: Optional[List[str]] = None 621 streaming_micro_batch_transformers: Optional[List[TransformerSpec]] = None 622 streaming_once: Optional[bool] = None 623 streaming_processing_time: Optional[str] = None 624 streaming_available_now: bool = True 625 streaming_continuous: Optional[str] = None 626 streaming_await_termination: bool = True 627 streaming_await_termination_timeout: Optional[int] = None 628 with_batch_id: bool = False 629 options: Optional[dict] = None 630 streaming_micro_batch_dq_processors: Optional[List[DQSpec]] = None 631 632 633@dataclass 634class TerminatorSpec(object): 635 """Terminator Specification. 636 637 I.e., the specification that defines a terminator operation to be executed. Examples 638 are compute statistics, vacuum, optimize, etc. 639 640 - function: terminator function to execute. 641 - args: arguments of the terminator function. 642 - input_id: id of the corresponding output specification (Optional). 643 """ 644 645 function: str 646 args: Optional[dict] = None 647 input_id: Optional[str] = None 648 649 650@dataclass 651class ReconciliatorSpec(object): 652 """Reconciliator Specification. 653 654 - metrics: list of metrics in the form of: 655 [{ 656 metric: name of the column present in both truth and current datasets, 657 aggregation: sum, avg, max, min, ..., 658 type: percentage or absolute, 659 yellow: value, 660 red: value 661 }]. 662 - recon_type: reconciliation type (percentage or absolute). Percentage calculates 663 the difference between truth and current results as a percentage (x-y/x), and 664 absolute calculates the raw difference (x - y). 665 - truth_input_spec: input specification of the truth data. 666 - current_input_spec: input specification of the current results data 667 - truth_preprocess_query: additional query on top of the truth input data to 668 preprocess the truth data before it gets fueled into the reconciliation process. 669 Important note: you need to assume that the data out of 670 the truth_input_spec is referencable by a table called 'truth'. 671 - truth_preprocess_query_args: optional dict having the functions/transformations to 672 apply on top of the truth_preprocess_query and respective arguments. Note: cache 673 is being applied on the Dataframe, by default. For turning the default behavior 674 off, pass `"truth_preprocess_query_args": []`. 675 - current_preprocess_query: additional query on top of the current results input 676 data to preprocess the current results data before it gets fueled into the 677 reconciliation process. Important note: you need to assume that the data out of 678 the current_results_input_spec is referencable by a table called 'current'. 679 - current_preprocess_query_args: optional dict having the 680 functions/transformations to apply on top of the current_preprocess_query 681 and respective arguments. Note: cache is being applied on the Dataframe, 682 by default. For turning the default behavior off, pass 683 `"current_preprocess_query_args": []`. 684 - ignore_empty_df: optional boolean, to ignore the recon process if source & target 685 dataframes are empty, recon will exit success code (passed) 686 """ 687 688 metrics: List[dict] 689 truth_input_spec: InputSpec 690 current_input_spec: InputSpec 691 truth_preprocess_query: Optional[str] = None 692 truth_preprocess_query_args: Optional[List[dict]] = None 693 current_preprocess_query: Optional[str] = None 694 current_preprocess_query_args: Optional[List[dict]] = None 695 ignore_empty_df: Optional[bool] = False 696 697 698@dataclass 699class DQValidatorSpec(object): 700 """Data Quality Validator Specification. 701 702 - input_spec: input specification of the data to be checked/validated. 703 - dq_spec: data quality specification. 704 - restore_prev_version: specify if, having 705 delta table/files as input, they should be restored to the 706 previous version if the data quality process fails. Note: this 707 is only considered if fail_on_error is kept as True. 708 """ 709 710 input_spec: InputSpec 711 dq_spec: DQSpec 712 restore_prev_version: Optional[bool] = False 713 714 715class SQLDefinitions(Enum): 716 """SQL definitions statements.""" 717 718 compute_table_stats = "ANALYZE TABLE {} COMPUTE STATISTICS" 719 drop_table_stmt = "DROP TABLE IF EXISTS" 720 drop_view_stmt = "DROP VIEW IF EXISTS" 721 truncate_stmt = "TRUNCATE TABLE" 722 describe_stmt = "DESCRIBE TABLE" 723 optimize_stmt = "OPTIMIZE" 724 show_tbl_props_stmt = "SHOW TBLPROPERTIES" 725 delete_where_stmt = "DELETE FROM {} WHERE {}" 726 727 728class FileManagerAPIKeys(Enum): 729 """File Manager s3 api keys.""" 730 731 CONTENTS = "Contents" 732 KEY = "Key" 733 CONTINUATION = "NextContinuationToken" 734 BUCKET = "Bucket" 735 OBJECTS = "Objects" 736 737 738@dataclass 739class SensorSpec(object): 740 """Sensor Specification. 741 742 - sensor_id: sensor id. 743 - assets: a list of assets that are considered as available to 744 consume downstream after this sensor has status 745 PROCESSED_NEW_DATA. 746 - control_db_table_name: db.table to store sensor metadata. 747 - input_spec: input specification of the source to be checked for new data. 748 - preprocess_query: SQL query to transform/filter the result from the 749 upstream. Consider that we should refer to 'new_data' whenever 750 we are referring to the input of the sensor. E.g.: 751 "SELECT dummy_col FROM new_data WHERE ..." 752 - checkpoint_location: optional location to store checkpoints to resume 753 from. These checkpoints use the same as Spark checkpoint strategy. 754 For Spark readers that do not support checkpoints, use the 755 preprocess_query parameter to form a SQL query to filter the result 756 from the upstream accordingly. 757 - fail_on_empty_result: if the sensor should throw an error if there is no new 758 data in the upstream. Default: True. 759 """ 760 761 sensor_id: str 762 assets: List[str] 763 control_db_table_name: str 764 input_spec: InputSpec 765 preprocess_query: Optional[str] 766 checkpoint_location: Optional[str] 767 fail_on_empty_result: bool = True 768 769 @classmethod 770 def create_from_acon(cls, acon: dict): # type: ignore 771 """Create SensorSpec from acon. 772 773 Args: 774 acon: sensor ACON. 775 """ 776 checkpoint_location = acon.get("base_checkpoint_location") 777 if checkpoint_location: 778 checkpoint_location = ( 779 f"{checkpoint_location.rstrip('/')}/lakehouse_engine/" 780 f"sensors/{acon['sensor_id']}" 781 ) 782 783 return cls( 784 sensor_id=acon["sensor_id"], 785 assets=acon["assets"], 786 control_db_table_name=acon["control_db_table_name"], 787 input_spec=InputSpec(**acon["input_spec"]), 788 preprocess_query=acon.get("preprocess_query"), 789 checkpoint_location=checkpoint_location, 790 fail_on_empty_result=acon.get("fail_on_empty_result", True), 791 ) 792 793 794class SensorStatus(Enum): 795 """Status for a sensor.""" 796 797 ACQUIRED_NEW_DATA = "ACQUIRED_NEW_DATA" 798 PROCESSED_NEW_DATA = "PROCESSED_NEW_DATA" 799 800 801SENSOR_SCHEMA = StructType( 802 [ 803 StructField("sensor_id", StringType(), False), 804 StructField("assets", ArrayType(StringType(), False), True), 805 StructField("status", StringType(), False), 806 StructField("status_change_timestamp", TimestampType(), False), 807 StructField("checkpoint_location", StringType(), True), 808 StructField("upstream_key", StringType(), True), 809 StructField("upstream_value", StringType(), True), 810 ] 811) 812 813SENSOR_UPDATE_SET: dict = { 814 "sensors.sensor_id": "updates.sensor_id", 815 "sensors.status": "updates.status", 816 "sensors.status_change_timestamp": "updates.status_change_timestamp", 817} 818 819SENSOR_ALLOWED_DATA_FORMATS = { 820 ReadType.STREAMING.value: [InputFormat.KAFKA.value, *FILE_INPUT_FORMATS], 821 ReadType.BATCH.value: [ 822 InputFormat.DELTAFILES.value, 823 InputFormat.JDBC.value, 824 ], 825} 826 827 828class SAPLogchain(Enum): 829 """Defaults used on consuming data from SAP Logchain.""" 830 831 DBTABLE = "SAPPHA.RSPCLOGCHAIN" 832 GREEN_STATUS = "G" 833 ENGINE_TABLE = "sensor_new_data" 834 835 836class RestoreType(Enum): 837 """Archive types.""" 838 839 BULK = "Bulk" 840 STANDARD = "Standard" 841 EXPEDITED = "Expedited" 842 843 @classmethod 844 def values(cls): # type: ignore 845 """Generates a list containing all enum values. 846 847 Return: 848 A list with all enum values. 849 """ 850 return (c.value for c in cls) 851 852 @classmethod 853 def exists(cls, restore_type: str) -> bool: 854 """Checks if the restore type exists in the enum values. 855 856 Args: 857 restore_type: restore type to check if exists. 858 859 Return: 860 If the restore type exists in our enum. 861 """ 862 return restore_type in cls.values() 863 864 865class RestoreStatus(Enum): 866 """Archive types.""" 867 868 NOT_STARTED = "not_started" 869 ONGOING = "ongoing" 870 RESTORED = "restored" 871 872 873ARCHIVE_STORAGE_CLASS = [ 874 "GLACIER", 875 "DEEP_ARCHIVE", 876 "GLACIER_IR", 877] 878 879 880class SQLParser(Enum): 881 """Defaults to use for parsing.""" 882 883 DOUBLE_QUOTES = '"' 884 SINGLE_QUOTES = "'" 885 BACKSLASH = "\\" 886 SINGLE_TRACE = "-" 887 DOUBLE_TRACES = "--" 888 SLASH = "/" 889 OPENING_MULTIPLE_LINE_COMMENT = "/*" 890 CLOSING_MULTIPLE_LINE_COMMENT = "*/" 891 PARAGRAPH = "\n" 892 STAR = "*" 893 894 MULTIPLE_LINE_COMMENT = [ 895 OPENING_MULTIPLE_LINE_COMMENT, 896 CLOSING_MULTIPLE_LINE_COMMENT, 897 ] 898 899 900class GABDefaults(Enum): 901 """Defaults used on the GAB process.""" 902 903 DATE_FORMAT = "%Y-%m-%d" 904 DIMENSIONS_DEFAULT_COLUMNS = ["from_date", "to_date"] 905 DEFAULT_DIMENSION_CALENDAR_TABLE = "dim_calendar" 906 DEFAULT_LOOKUP_QUERY_BUILDER_TABLE = "lkp_query_builder" 907 908 909class GABStartOfWeek(Enum): 910 """Representation of start of week values on GAB.""" 911 912 SUNDAY = "S" 913 MONDAY = "M" 914 915 @classmethod 916 def get_start_of_week(cls) -> dict: 917 """Get the start of week enum as a dict. 918 919 Returns: 920 dict containing all enum entries as `{name:value}`. 921 """ 922 return { 923 start_of_week.name: start_of_week.value 924 for start_of_week in list(GABStartOfWeek) 925 } 926 927 @classmethod 928 def get_values(cls) -> set[str]: 929 """Get the start of week enum values as set. 930 931 Returns: 932 set containing all possible values `{value}`. 933 """ 934 return {start_of_week.value for start_of_week in list(GABStartOfWeek)} 935 936 937@dataclass 938class GABSpec(object): 939 """Gab Specification. 940 941 query_label_filter: query use-case label to execute. 942 queue_filter: queue to execute the job. 943 cadence_filter: selected cadences to build the asset. 944 target_database: target database to write. 945 curr_date: current date. 946 start_date: period start date. 947 end_date: period end date. 948 rerun_flag: rerun flag. 949 target_table: target table to write. 950 source_database: source database. 951 gab_base_path: base path to read the use cases. 952 lookup_table: gab configuration table. 953 calendar_table: gab calendar table. 954 """ 955 956 query_label_filter: list[str] 957 queue_filter: list[str] 958 cadence_filter: list[str] 959 target_database: str 960 current_date: datetime 961 start_date: datetime 962 end_date: datetime 963 rerun_flag: str 964 target_table: str 965 source_database: str 966 gab_base_path: str 967 lookup_table: str 968 calendar_table: str 969 970 @classmethod 971 def create_from_acon(cls, acon: dict): # type: ignore 972 """Create GabSpec from acon. 973 974 Args: 975 acon: gab ACON. 976 """ 977 lookup_table = f"{acon['source_database']}." + ( 978 acon.get( 979 "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value 980 ) 981 ) 982 983 calendar_table = f"{acon['source_database']}." + ( 984 acon.get( 985 "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value 986 ) 987 ) 988 989 def format_date(date_to_format: Union[datetime, str]) -> datetime: 990 if isinstance(date_to_format, str): 991 return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value) 992 else: 993 return date_to_format 994 995 return cls( 996 query_label_filter=acon["query_label_filter"], 997 queue_filter=acon["queue_filter"], 998 cadence_filter=acon["cadence_filter"], 999 target_database=acon["target_database"], 1000 current_date=datetime.now(), 1001 start_date=format_date(acon["start_date"]), 1002 end_date=format_date(acon["end_date"]), 1003 rerun_flag=acon["rerun_flag"], 1004 target_table=acon["target_table"], 1005 source_database=acon["source_database"], 1006 gab_base_path=acon["gab_base_path"], 1007 lookup_table=lookup_table, 1008 calendar_table=calendar_table, 1009 ) 1010 1011 1012class GABCadence(Enum): 1013 """Representation of the supported cadences on GAB.""" 1014 1015 DAY = 1 1016 WEEK = 2 1017 MONTH = 3 1018 QUARTER = 4 1019 YEAR = 5 1020 1021 @classmethod 1022 def get_ordered_cadences(cls) -> dict: 1023 """Get the cadences ordered by the value. 1024 1025 Returns: 1026 dict containing ordered cadences as `{name:value}`. 1027 """ 1028 cadences = list(GABCadence) 1029 return { 1030 cadence.name: cadence.value 1031 for cadence in sorted(cadences, key=lambda gab_cadence: gab_cadence.value) 1032 } 1033 1034 @classmethod 1035 def get_cadences(cls) -> set[str]: 1036 """Get the cadences values as set. 1037 1038 Returns: 1039 set containing all possible cadence values as `{value}`. 1040 """ 1041 return {cadence.name for cadence in list(GABCadence)} 1042 1043 @classmethod 1044 def order_cadences(cls, cadences_to_order: list[str]) -> list[str]: 1045 """Order a list of cadences by value. 1046 1047 Returns: 1048 ordered set containing the received cadences. 1049 """ 1050 return sorted( 1051 cadences_to_order, 1052 key=lambda item: cls.get_ordered_cadences().get(item), # type: ignore 1053 ) 1054 1055 1056class GABKeys: 1057 """Constants used to update pre-configured gab dict key.""" 1058 1059 JOIN_SELECT = "join_select" 1060 PROJECT_START = "project_start" 1061 PROJECT_END = "project_end" 1062 1063 1064class GABReplaceableKeys: 1065 """Constants used to replace pre-configured gab dict values.""" 1066 1067 CADENCE = "${cad}" 1068 DATE_COLUMN = "${date_column}" 1069 CONFIG_WEEK_START = "${config_week_start}" 1070 RECONCILIATION_CADENCE = "${rec_cadence}" 1071 1072 1073class GABCombinedConfiguration(Enum): 1074 """GAB combined configuration. 1075 1076 Based on the use case configuration return the values to override in the SQL file. 1077 This enum aims to exhaustively map each combination of `cadence`, `reconciliation`, 1078 `week_start` and `snap_flag` return the corresponding values `join_select`, 1079 `project_start` and `project_end` to replace this values in the stages SQL file. 1080 1081 Return corresponding configuration (join_select, project_start, project_end) for 1082 each combination (cadence x recon x week_start x snap_flag). 1083 """ 1084 1085 _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE = ( 1086 "date(date_trunc('${cad}',${date_column}))" 1087 ) 1088 _DEFAULT_PROJECT_START = "df_cal.cadence_start_date" 1089 _DEFAULT_PROJECT_END = "df_cal.cadence_end_date" 1090 1091 COMBINED_CONFIGURATION = { 1092 # Combination of: 1093 # - cadence: `DAY` 1094 # - reconciliation_window: `DAY`, `WEEK`, `MONTH`, `QUARTER`, `YEAR` 1095 # - week_start: `S`, `M` 1096 # - snapshot_flag: `Y`, `N` 1097 1: { 1098 "cadence": GABCadence.DAY.name, 1099 "recon": GABCadence.get_cadences(), 1100 "week_start": GABStartOfWeek.get_values(), 1101 "snap_flag": {"Y", "N"}, 1102 "join_select": "", 1103 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1104 "project_end": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1105 }, 1106 # Combination of: 1107 # - cadence: `WEEK` 1108 # - reconciliation_window: `DAY` 1109 # - week_start: `S`, `M` 1110 # - snapshot_flag: `Y` 1111 2: { 1112 "cadence": GABCadence.WEEK.name, 1113 "recon": GABCadence.DAY.name, 1114 "week_start": GABStartOfWeek.get_values(), 1115 "snap_flag": "Y", 1116 "join_select": """ 1117 select distinct case 1118 when '${config_week_start}' = 'Monday' then weekstart_mon 1119 when '${config_week_start}' = 'Sunday' then weekstart_sun 1120 end as cadence_start_date, 1121 calendar_date as cadence_end_date 1122 """, 1123 "project_start": _DEFAULT_PROJECT_START, 1124 "project_end": _DEFAULT_PROJECT_END, 1125 }, 1126 # Combination of: 1127 # - cadence: `WEEK` 1128 # - reconciliation_window: `DAY, `MONTH`, `QUARTER`, `YEAR` 1129 # - week_start: `M` 1130 # - snapshot_flag: `Y`, `N` 1131 3: { 1132 "cadence": GABCadence.WEEK.name, 1133 "recon": { 1134 GABCadence.DAY.name, 1135 GABCadence.MONTH.name, 1136 GABCadence.QUARTER.name, 1137 GABCadence.YEAR.name, 1138 }, 1139 "week_start": "M", 1140 "snap_flag": {"Y", "N"}, 1141 "join_select": """ 1142 select distinct case 1143 when '${config_week_start}' = 'Monday' then weekstart_mon 1144 when '${config_week_start}' = 'Sunday' then weekstart_sun 1145 end as cadence_start_date, 1146 case 1147 when '${config_week_start}' = 'Monday' then weekend_mon 1148 when '${config_week_start}' = 'Sunday' then weekend_sun 1149 end as cadence_end_date""", 1150 "project_start": _DEFAULT_PROJECT_START, 1151 "project_end": _DEFAULT_PROJECT_END, 1152 }, 1153 4: { 1154 "cadence": GABCadence.MONTH.name, 1155 "recon": GABCadence.DAY.name, 1156 "week_start": GABStartOfWeek.get_values(), 1157 "snap_flag": "Y", 1158 "join_select": """ 1159 select distinct month_start as cadence_start_date, 1160 calendar_date as cadence_end_date 1161 """, 1162 "project_start": _DEFAULT_PROJECT_START, 1163 "project_end": _DEFAULT_PROJECT_END, 1164 }, 1165 5: { 1166 "cadence": GABCadence.MONTH.name, 1167 "recon": GABCadence.WEEK.name, 1168 "week_start": GABStartOfWeek.MONDAY.value, 1169 "snap_flag": "Y", 1170 "join_select": """ 1171 select distinct month_start as cadence_start_date, 1172 case 1173 when date( 1174 date_trunc('MONTH',add_months(calendar_date, 1)) 1175 )-1 < weekend_mon 1176 then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1 1177 else weekend_mon 1178 end as cadence_end_date""", 1179 "project_start": _DEFAULT_PROJECT_START, 1180 "project_end": _DEFAULT_PROJECT_END, 1181 }, 1182 6: { 1183 "cadence": GABCadence.MONTH.name, 1184 "recon": GABCadence.WEEK.name, 1185 "week_start": GABStartOfWeek.SUNDAY.value, 1186 "snap_flag": "Y", 1187 "join_select": """ 1188 select distinct month_start as cadence_start_date, 1189 case 1190 when date( 1191 date_trunc('MONTH',add_months(calendar_date, 1)) 1192 )-1 < weekend_sun 1193 then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1 1194 else weekend_sun 1195 end as cadence_end_date""", 1196 "project_start": _DEFAULT_PROJECT_START, 1197 "project_end": _DEFAULT_PROJECT_END, 1198 }, 1199 7: { 1200 "cadence": GABCadence.MONTH.name, 1201 "recon": GABCadence.get_cadences(), 1202 "week_start": GABStartOfWeek.get_values(), 1203 "snap_flag": {"Y", "N"}, 1204 "join_select": "", 1205 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1206 "project_end": "date(date_trunc('MONTH',add_months(${date_column}, 1)))-1", 1207 }, 1208 8: { 1209 "cadence": GABCadence.QUARTER.name, 1210 "recon": GABCadence.DAY.name, 1211 "week_start": GABStartOfWeek.get_values(), 1212 "snap_flag": "Y", 1213 "join_select": """ 1214 select distinct quarter_start as cadence_start_date, 1215 calendar_date as cadence_end_date 1216 """, 1217 "project_start": _DEFAULT_PROJECT_START, 1218 "project_end": _DEFAULT_PROJECT_END, 1219 }, 1220 9: { 1221 "cadence": GABCadence.QUARTER.name, 1222 "recon": GABCadence.WEEK.name, 1223 "week_start": GABStartOfWeek.MONDAY.value, 1224 "snap_flag": "Y", 1225 "join_select": """ 1226 select distinct quarter_start as cadence_start_date, 1227 case 1228 when weekend_mon > date( 1229 date_trunc('QUARTER',add_months(calendar_date, 3)) 1230 )-1 1231 then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1 1232 else weekend_mon 1233 end as cadence_end_date""", 1234 "project_start": _DEFAULT_PROJECT_START, 1235 "project_end": _DEFAULT_PROJECT_END, 1236 }, 1237 10: { 1238 "cadence": GABCadence.QUARTER.name, 1239 "recon": GABCadence.WEEK.name, 1240 "week_start": GABStartOfWeek.SUNDAY.value, 1241 "snap_flag": "Y", 1242 "join_select": """ 1243 select distinct quarter_start as cadence_start_date, 1244 case 1245 when weekend_sun > date( 1246 date_trunc('QUARTER',add_months(calendar_date, 3)) 1247 )-1 1248 then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1 1249 else weekend_sun 1250 end as cadence_end_date""", 1251 "project_start": _DEFAULT_PROJECT_START, 1252 "project_end": _DEFAULT_PROJECT_END, 1253 }, 1254 11: { 1255 "cadence": GABCadence.QUARTER.name, 1256 "recon": GABCadence.MONTH.name, 1257 "week_start": GABStartOfWeek.get_values(), 1258 "snap_flag": "Y", 1259 "join_select": """ 1260 select distinct quarter_start as cadence_start_date, 1261 month_end as cadence_end_date 1262 """, 1263 "project_start": _DEFAULT_PROJECT_START, 1264 "project_end": _DEFAULT_PROJECT_END, 1265 }, 1266 12: { 1267 "cadence": GABCadence.QUARTER.name, 1268 "recon": GABCadence.YEAR.name, 1269 "week_start": GABStartOfWeek.get_values(), 1270 "snap_flag": "N", 1271 "join_select": "", 1272 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1273 "project_end": """ 1274 date( 1275 date_trunc( 1276 '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 3) 1277 ) 1278 )-1 1279 """, 1280 }, 1281 13: { 1282 "cadence": GABCadence.QUARTER.name, 1283 "recon": GABCadence.get_cadences(), 1284 "week_start": GABStartOfWeek.get_values(), 1285 "snap_flag": "N", 1286 "join_select": "", 1287 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1288 "project_end": """ 1289 date( 1290 date_trunc( 1291 '${cad}',add_months( date(date_trunc('${cad}',${date_column})), 3) 1292 ) 1293 )-1 1294 """, 1295 }, 1296 14: { 1297 "cadence": GABCadence.YEAR.name, 1298 "recon": GABCadence.WEEK.name, 1299 "week_start": GABStartOfWeek.MONDAY.value, 1300 "snap_flag": "Y", 1301 "join_select": """ 1302 select distinct year_start as cadence_start_date, 1303 case 1304 when weekend_mon > date( 1305 date_trunc('YEAR',add_months(calendar_date, 12)) 1306 )-1 1307 then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1 1308 else weekend_mon 1309 end as cadence_end_date""", 1310 "project_start": _DEFAULT_PROJECT_START, 1311 "project_end": _DEFAULT_PROJECT_END, 1312 }, 1313 15: { 1314 "cadence": GABCadence.YEAR.name, 1315 "recon": GABCadence.WEEK.name, 1316 "week_start": GABStartOfWeek.SUNDAY.value, 1317 "snap_flag": "Y", 1318 "join_select": """ 1319 select distinct year_start as cadence_start_date, 1320 case 1321 when weekend_sun > date( 1322 date_trunc('YEAR',add_months(calendar_date, 12)) 1323 )-1 1324 then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1 1325 else weekend_sun 1326 end as cadence_end_date""", 1327 "project_start": _DEFAULT_PROJECT_START, 1328 "project_end": _DEFAULT_PROJECT_END, 1329 }, 1330 16: { 1331 "cadence": GABCadence.YEAR.name, 1332 "recon": GABCadence.get_cadences(), 1333 "week_start": GABStartOfWeek.get_values(), 1334 "snap_flag": "N", 1335 "inverse_flag": "Y", 1336 "join_select": "", 1337 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1338 "project_end": """ 1339 date( 1340 date_trunc( 1341 '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 12) 1342 ) 1343 )-1 1344 """, 1345 }, 1346 17: { 1347 "cadence": GABCadence.YEAR.name, 1348 "recon": { 1349 GABCadence.DAY.name, 1350 GABCadence.MONTH.name, 1351 GABCadence.QUARTER.name, 1352 }, 1353 "week_start": GABStartOfWeek.get_values(), 1354 "snap_flag": "Y", 1355 "join_select": """ 1356 select distinct year_start as cadence_start_date, 1357 case 1358 when '${rec_cadence}' = 'DAY' then calendar_date 1359 when '${rec_cadence}' = 'MONTH' then month_end 1360 when '${rec_cadence}' = 'QUARTER' then quarter_end 1361 end as cadence_end_date 1362 """, 1363 "project_start": _DEFAULT_PROJECT_START, 1364 "project_end": _DEFAULT_PROJECT_END, 1365 }, 1366 18: { 1367 "cadence": GABCadence.get_cadences(), 1368 "recon": GABCadence.get_cadences(), 1369 "week_start": GABStartOfWeek.get_values(), 1370 "snap_flag": {"Y", "N"}, 1371 "join_select": """ 1372 select distinct 1373 case 1374 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday' 1375 then weekstart_mon 1376 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday' 1377 then weekstart_sun 1378 else 1379 date(date_trunc('${cad}',calendar_date)) 1380 end as cadence_start_date, 1381 case 1382 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday' 1383 then weekend_mon 1384 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday' 1385 then weekend_sun 1386 when '${cad}' = 'DAY' 1387 then date(date_trunc('${cad}',calendar_date)) 1388 when '${cad}' = 'MONTH' 1389 then date( 1390 date_trunc( 1391 'MONTH', 1392 add_months(date(date_trunc('${cad}',calendar_date)), 1) 1393 ) 1394 )-1 1395 when '${cad}' = 'QUARTER' 1396 then date( 1397 date_trunc( 1398 'QUARTER', 1399 add_months(date(date_trunc('${cad}',calendar_date)) , 3) 1400 ) 1401 )-1 1402 when '${cad}' = 'YEAR' 1403 then date( 1404 date_trunc( 1405 'YEAR', 1406 add_months(date(date_trunc('${cad}',calendar_date)), 12) 1407 ) 1408 )-1 1409 end as cadence_end_date 1410 """, 1411 "project_start": _DEFAULT_PROJECT_START, 1412 "project_end": _DEFAULT_PROJECT_END, 1413 }, 1414 }
20class CollectEngineUsage(Enum): 21 """Options for collecting engine usage stats. 22 23 - enabled, enables the collection and storage of Lakehouse Engine 24 usage statistics for any environment. 25 - prod_only, enables the collection and storage of Lakehouse Engine 26 usage statistics for production environment only. 27 - disabled, disables the collection and storage of Lakehouse Engine 28 usage statistics, for all environments. 29 """ 30 31 ENABLED = "enabled" 32 PROD_ONLY = "prod_only" 33 DISABLED = "disabled"
Options for collecting engine usage stats.
- enabled, enables the collection and storage of Lakehouse Engine usage statistics for any environment.
- prod_only, enables the collection and storage of Lakehouse Engine usage statistics for production environment only.
- disabled, disables the collection and storage of Lakehouse Engine usage statistics, for all environments.
Inherited Members
- enum.Enum
- name
- value
36@dataclass 37class EngineConfig(object): 38 """Definitions that can come from the Engine Config file. 39 40 - dq_bucket: S3 prod bucket used to store data quality related artifacts. 41 - dq_dev_bucket: S3 dev bucket used to store data quality related artifacts. 42 - notif_disallowed_email_servers: email servers not allowed to be used 43 for sending notifications. 44 - engine_usage_path: path where the engine prod usage stats are stored. 45 - engine_dev_usage_path: path where the engine dev usage stats are stored. 46 - collect_engine_usage: whether to enable the collection of lakehouse 47 engine usage stats or not. 48 - dq_functions_column_list: list of columns to be added to the meta argument 49 of GX when using PRISMA. 50 """ 51 52 dq_bucket: Optional[str] = None 53 dq_dev_bucket: Optional[str] = None 54 notif_disallowed_email_servers: Optional[list] = None 55 engine_usage_path: Optional[str] = None 56 engine_dev_usage_path: Optional[str] = None 57 collect_engine_usage: str = CollectEngineUsage.ENABLED.value 58 dq_functions_column_list: Optional[list] = None
Definitions that can come from the Engine Config file.
- dq_bucket: S3 prod bucket used to store data quality related artifacts.
- dq_dev_bucket: S3 dev bucket used to store data quality related artifacts.
- notif_disallowed_email_servers: email servers not allowed to be used for sending notifications.
- engine_usage_path: path where the engine prod usage stats are stored.
- engine_dev_usage_path: path where the engine dev usage stats are stored.
- collect_engine_usage: whether to enable the collection of lakehouse engine usage stats or not.
- dq_functions_column_list: list of columns to be added to the meta argument of GX when using PRISMA.
61class EngineStats(Enum): 62 """Definitions for collection of Lakehouse Engine Stats. 63 64 .. note:: 65 Note: whenever the value comes from a key inside a Spark Config 66 that returns an array, it can be specified with a '#' so that it 67 is adequately processed. 68 """ 69 70 CLUSTER_USAGE_TAGS = "spark.databricks.clusterUsageTags" 71 DEF_SPARK_CONFS = { 72 "dp_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#accountName", 73 "environment": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#environment", 74 "workspace_id": f"{CLUSTER_USAGE_TAGS}.orgId", 75 "job_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#JobId", 76 "job_name": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#RunName", 77 "run_id": f"{CLUSTER_USAGE_TAGS}.clusterAllTags#ClusterName", 78 }
Definitions for collection of Lakehouse Engine Stats.
Note: whenever the value comes from a key inside a Spark Config that returns an array, it can be specified with a '#' so that it is adequately processed.
Inherited Members
- enum.Enum
- name
- value
81class InputFormat(Enum): 82 """Formats of algorithm input.""" 83 84 JDBC = "jdbc" 85 AVRO = "avro" 86 JSON = "json" 87 CSV = "csv" 88 PARQUET = "parquet" 89 DELTAFILES = "delta" 90 CLOUDFILES = "cloudfiles" 91 KAFKA = "kafka" 92 SQL = "sql" 93 SAP_BW = "sap_bw" 94 SAP_B4 = "sap_b4" 95 DATAFRAME = "dataframe" 96 SFTP = "sftp" 97 98 @classmethod 99 def values(cls): # type: ignore 100 """Generates a list containing all enum values. 101 102 Return: 103 A list with all enum values. 104 """ 105 return (c.value for c in cls) 106 107 @classmethod 108 def exists(cls, input_format: str) -> bool: 109 """Checks if the input format exists in the enum values. 110 111 Args: 112 input_format: format to check if exists. 113 114 Return: 115 If the input format exists in our enum. 116 """ 117 return input_format in cls.values()
Formats of algorithm input.
98 @classmethod 99 def values(cls): # type: ignore 100 """Generates a list containing all enum values. 101 102 Return: 103 A list with all enum values. 104 """ 105 return (c.value for c in cls)
Generates a list containing all enum values.
Return:
A list with all enum values.
107 @classmethod 108 def exists(cls, input_format: str) -> bool: 109 """Checks if the input format exists in the enum values. 110 111 Args: 112 input_format: format to check if exists. 113 114 Return: 115 If the input format exists in our enum. 116 """ 117 return input_format in cls.values()
Checks if the input format exists in the enum values.
Arguments:
- input_format: format to check if exists.
Return:
If the input format exists in our enum.
Inherited Members
- enum.Enum
- name
- value
131class OutputFormat(Enum): 132 """Formats of algorithm output.""" 133 134 JDBC = "jdbc" 135 AVRO = "avro" 136 JSON = "json" 137 CSV = "csv" 138 PARQUET = "parquet" 139 DELTAFILES = "delta" 140 KAFKA = "kafka" 141 CONSOLE = "console" 142 NOOP = "noop" 143 DATAFRAME = "dataframe" 144 REST_API = "rest_api" 145 FILE = "file" # Internal use only 146 TABLE = "table" # Internal use only 147 148 @classmethod 149 def values(cls): # type: ignore 150 """Generates a list containing all enum values. 151 152 Return: 153 A list with all enum values. 154 """ 155 return (c.value for c in cls) 156 157 @classmethod 158 def exists(cls, output_format: str) -> bool: 159 """Checks if the output format exists in the enum values. 160 161 Args: 162 output_format: format to check if exists. 163 164 Return: 165 If the output format exists in our enum. 166 """ 167 return output_format in cls.values()
Formats of algorithm output.
148 @classmethod 149 def values(cls): # type: ignore 150 """Generates a list containing all enum values. 151 152 Return: 153 A list with all enum values. 154 """ 155 return (c.value for c in cls)
Generates a list containing all enum values.
Return:
A list with all enum values.
157 @classmethod 158 def exists(cls, output_format: str) -> bool: 159 """Checks if the output format exists in the enum values. 160 161 Args: 162 output_format: format to check if exists. 163 164 Return: 165 If the output format exists in our enum. 166 """ 167 return output_format in cls.values()
Checks if the output format exists in the enum values.
Arguments:
- output_format: format to check if exists.
Return:
If the output format exists in our enum.
Inherited Members
- enum.Enum
- name
- value
Type of notifier available.
Inherited Members
- enum.Enum
- name
- value
186class NotificationRuntimeParameters(Enum): 187 """Parameters to be replaced in runtime.""" 188 189 DATABRICKS_JOB_NAME = "databricks_job_name" 190 DATABRICKS_WORKSPACE_ID = "databricks_workspace_id"
Parameters to be replaced in runtime.
Inherited Members
- enum.Enum
- name
- value
199class ReadType(Enum): 200 """Define the types of read operations. 201 202 - BATCH - read the data in batch mode (e.g., Spark batch). 203 - STREAMING - read the data in streaming mode (e.g., Spark streaming). 204 """ 205 206 BATCH = "batch" 207 STREAMING = "streaming"
Define the types of read operations.
- BATCH - read the data in batch mode (e.g., Spark batch).
- STREAMING - read the data in streaming mode (e.g., Spark streaming).
Inherited Members
- enum.Enum
- name
- value
210class ReadMode(Enum): 211 """Different modes that control how we handle compliance to the provided schema. 212 213 These read modes map to Spark's read modes at the moment. 214 """ 215 216 PERMISSIVE = "PERMISSIVE" 217 FAILFAST = "FAILFAST" 218 DROPMALFORMED = "DROPMALFORMED"
Different modes that control how we handle compliance to the provided schema.
These read modes map to Spark's read modes at the moment.
Inherited Members
- enum.Enum
- name
- value
221class DQDefaults(Enum): 222 """Defaults used on the data quality process.""" 223 224 FILE_SYSTEM_STORE = "file_system" 225 FILE_SYSTEM_S3_STORE = "s3" 226 DQ_BATCH_IDENTIFIERS = ["spec_id", "input_id", "timestamp"] 227 DATASOURCE_CLASS_NAME = "Datasource" 228 DATASOURCE_EXECUTION_ENGINE = "SparkDFExecutionEngine" 229 DATA_CONNECTORS_CLASS_NAME = "RuntimeDataConnector" 230 DATA_CONNECTORS_MODULE_NAME = "great_expectations.datasource.data_connector" 231 DATA_CHECKPOINTS_CLASS_NAME = "SimpleCheckpoint" 232 DATA_CHECKPOINTS_CONFIG_VERSION = 1.0 233 STORE_BACKEND = "s3" 234 EXPECTATIONS_STORE_PREFIX = "dq/expectations/" 235 VALIDATIONS_STORE_PREFIX = "dq/validations/" 236 DATA_DOCS_PREFIX = "dq/data_docs/site/" 237 CHECKPOINT_STORE_PREFIX = "dq/checkpoints/" 238 VALIDATION_COLUMN_IDENTIFIER = "validationresultidentifier" 239 CUSTOM_EXPECTATION_LIST = [ 240 "expect_column_values_to_be_date_not_older_than", 241 "expect_column_pair_a_to_be_smaller_or_equal_than_b", 242 "expect_multicolumn_column_a_must_equal_b_or_c", 243 "expect_queried_column_agg_value_to_be", 244 "expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b", 245 "expect_column_pair_a_to_be_not_equal_to_b", 246 ] 247 DQ_VALIDATIONS_SCHEMA = StructType( 248 [ 249 StructField( 250 "dq_validations", 251 StructType( 252 [ 253 StructField("run_name", StringType()), 254 StructField("run_success", BooleanType()), 255 StructField("raised_exceptions", BooleanType()), 256 StructField("run_row_success", BooleanType()), 257 StructField( 258 "dq_failure_details", 259 ArrayType( 260 StructType( 261 [ 262 StructField("expectation_type", StringType()), 263 StructField("kwargs", StringType()), 264 ] 265 ), 266 ), 267 ), 268 ] 269 ), 270 ) 271 ] 272 )
Defaults used on the data quality process.
Inherited Members
- enum.Enum
- name
- value
275class WriteType(Enum): 276 """Types of write operations.""" 277 278 OVERWRITE = "overwrite" 279 COMPLETE = "complete" 280 APPEND = "append" 281 UPDATE = "update" 282 MERGE = "merge" 283 ERROR_IF_EXISTS = "error" 284 IGNORE_IF_EXISTS = "ignore"
Types of write operations.
Inherited Members
- enum.Enum
- name
- value
287@dataclass 288class InputSpec(object): 289 """Specification of an algorithm input. 290 291 This is very aligned with the way the execution environment connects to the sources 292 (e.g., spark sources). 293 294 - spec_id: spec_id of the input specification read_type: ReadType type of read 295 operation. 296 - data_format: format of the input. 297 - sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp 298 directory. 299 - df_name: dataframe name. 300 - db_table: table name in the form of `<db>.<table>`. 301 - location: uri that identifies from where to read data in the specified format. 302 - enforce_schema_from_table: if we want to enforce the table schema or not, by 303 providing a table name in the form of `<db>.<table>`. 304 - query: sql query to execute and return the dataframe. Use it if you do not want to 305 read from a file system nor from a table, but rather from a sql query instead. 306 - schema: dict representation of a schema of the input (e.g., Spark struct type 307 schema). 308 - schema_path: path to a file with a representation of a schema of the input (e.g., 309 Spark struct type schema). 310 - disable_dbfs_retry: optional flag to disable file storage dbfs. 311 - with_filepath: if we want to include the path of the file that is being read. Only 312 works with the file reader (batch and streaming modes are supported). 313 - options: dict with other relevant options according to the execution 314 environment (e.g., spark) possible sources. 315 - calculate_upper_bound: when to calculate upper bound to extract from SAP BW 316 or not. 317 - calc_upper_bound_schema: specific schema for the calculated upper_bound. 318 - generate_predicates: when to generate predicates to extract from SAP BW or not. 319 - predicates_add_null: if we want to include is null on partition by predicates. 320 - temp_view: optional name of a view to point to the input dataframe to be used 321 to create or replace a temp view on top of the dataframe. 322 """ 323 324 spec_id: str 325 read_type: str 326 data_format: Optional[str] = None 327 sftp_files_format: Optional[str] = None 328 df_name: Optional[DataFrame] = None 329 db_table: Optional[str] = None 330 location: Optional[str] = None 331 query: Optional[str] = None 332 enforce_schema_from_table: Optional[str] = None 333 schema: Optional[dict] = None 334 schema_path: Optional[str] = None 335 disable_dbfs_retry: bool = False 336 with_filepath: bool = False 337 options: Optional[dict] = None 338 jdbc_args: Optional[dict] = None 339 calculate_upper_bound: bool = False 340 calc_upper_bound_schema: Optional[str] = None 341 generate_predicates: bool = False 342 predicates_add_null: bool = True 343 temp_view: Optional[str] = None
Specification of an algorithm input.
This is very aligned with the way the execution environment connects to the sources (e.g., spark sources).
- spec_id: spec_id of the input specification read_type: ReadType type of read operation.
- data_format: format of the input.
- sftp_files_format: format of the files (csv, fwf, json, xml...) in a sftp directory.
- df_name: dataframe name.
- db_table: table name in the form of
<db>.<table>
. - location: uri that identifies from where to read data in the specified format.
- enforce_schema_from_table: if we want to enforce the table schema or not, by
providing a table name in the form of
<db>.<table>
. - query: sql query to execute and return the dataframe. Use it if you do not want to read from a file system nor from a table, but rather from a sql query instead.
- schema: dict representation of a schema of the input (e.g., Spark struct type schema).
- schema_path: path to a file with a representation of a schema of the input (e.g., Spark struct type schema).
- disable_dbfs_retry: optional flag to disable file storage dbfs.
- with_filepath: if we want to include the path of the file that is being read. Only works with the file reader (batch and streaming modes are supported).
- options: dict with other relevant options according to the execution environment (e.g., spark) possible sources.
- calculate_upper_bound: when to calculate upper bound to extract from SAP BW or not.
- calc_upper_bound_schema: specific schema for the calculated upper_bound.
- generate_predicates: when to generate predicates to extract from SAP BW or not.
- predicates_add_null: if we want to include is null on partition by predicates.
- temp_view: optional name of a view to point to the input dataframe to be used to create or replace a temp view on top of the dataframe.
346@dataclass 347class TransformerSpec(object): 348 """Transformer Specification, i.e., a single transformation amongst many. 349 350 - function: name of the function (or callable function) to be executed. 351 - args: (not applicable if using a callable function) dict with the arguments 352 to pass to the function `<k,v>` pairs with the name of the parameter of 353 the function and the respective value. 354 """ 355 356 function: str 357 args: dict
Transformer Specification, i.e., a single transformation amongst many.
- function: name of the function (or callable function) to be executed.
- args: (not applicable if using a callable function) dict with the arguments
to pass to the function
<k,v>
pairs with the name of the parameter of the function and the respective value.
360@dataclass 361class TransformSpec(object): 362 """Transformation Specification. 363 364 I.e., the specification that defines the many transformations to be done to the data 365 that was read. 366 367 - spec_id: id of the terminate specification 368 - input_id: id of the corresponding input 369 specification. 370 - transformers: list of transformers to execute. 371 - force_streaming_foreach_batch_processing: sometimes, when using streaming, we want 372 to force the transform to be executed in the foreachBatch function to ensure 373 non-supported streaming operations can be properly executed. 374 """ 375 376 spec_id: str 377 input_id: str 378 transformers: List[TransformerSpec] 379 force_streaming_foreach_batch_processing: bool = False
Transformation Specification.
I.e., the specification that defines the many transformations to be done to the data that was read.
- spec_id: id of the terminate specification
- input_id: id of the corresponding input specification.
- transformers: list of transformers to execute.
- force_streaming_foreach_batch_processing: sometimes, when using streaming, we want to force the transform to be executed in the foreachBatch function to ensure non-supported streaming operations can be properly executed.
382class DQType(Enum): 383 """Available data quality tasks.""" 384 385 VALIDATOR = "validator" 386 PRISMA = "prisma"
Available data quality tasks.
Inherited Members
- enum.Enum
- name
- value
389class DQExecutionPoint(Enum): 390 """Available data quality execution points.""" 391 392 IN_MOTION = "in_motion" 393 AT_REST = "at_rest"
Available data quality execution points.
Inherited Members
- enum.Enum
- name
- value
396class DQTableBaseParameters(Enum): 397 """Base parameters for importing DQ rules from a table.""" 398 399 PRISMA_BASE_PARAMETERS = ["arguments", "dq_tech_function"]
Base parameters for importing DQ rules from a table.
Inherited Members
- enum.Enum
- name
- value
402@dataclass 403class DQFunctionSpec(object): 404 """Defines a data quality function specification. 405 406 - function - name of the data quality function (expectation) to execute. 407 It follows the great_expectations api https://greatexpectations.io/expectations/. 408 - args - args of the function (expectation). Follow the same api as above. 409 """ 410 411 function: str 412 args: Optional[dict] = None
Defines a data quality function specification.
- function - name of the data quality function (expectation) to execute. It follows the great_expectations api https://greatexpectations.io/expectations/.
- args - args of the function (expectation). Follow the same api as above.
415@dataclass 416class DQSpec(object): 417 """Data quality overall specification. 418 419 - spec_id - id of the specification. 420 - input_id - id of the input specification. 421 - dq_type - type of DQ process to execute (e.g. validator). 422 - dq_functions - list of function specifications to execute. 423 - dq_db_table - name of table to derive the dq functions from. 424 - dq_table_table_filter - name of the table which rules are to be applied in the 425 validations (Only used when deriving dq functions). 426 - dq_table_extra_filters - extra filters to be used when deriving dq functions. 427 This is a sql expression to be applied to the dq_db_table. 428 - execution_point - execution point of the dq functions. [at_rest, in_motion]. 429 This is set during the load_data or dq_validator functions. 430 - unexpected_rows_pk - the list of columns composing the primary key of the 431 source data to identify the rows failing the DQ validations. Note: only one 432 of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It 433 is mandatory to provide one of these arguments when using tag_source_data 434 as True. When tag_source_data is False, this is not mandatory, but still 435 recommended. 436 - tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from. 437 Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to 438 be provided. It is mandatory to provide one of these arguments when using 439 tag_source_data as True. hen tag_source_data is False, this is not 440 mandatory, but still recommended. 441 - sort_processed_keys - when using the `prisma` `dq_type`, a column `processed_keys` 442 is automatically added to give observability over the PK values that were 443 processed during a run. This parameter (`sort_processed_keys`) controls whether 444 the processed keys column value should be sorted or not. Default: False. 445 - gx_result_format - great expectations result format. Default: "COMPLETE". 446 - tag_source_data - when set to true, this will ensure that the DQ process ends by 447 tagging the source data with an additional column with information about the 448 DQ results. This column makes it possible to identify if the DQ run was 449 succeeded in general and, if not, it unlocks the insights to know what 450 specific rows have made the DQ validations fail and why. Default: False. 451 Note: it only works if result_sink_explode is True, gx_result_format is 452 COMPLETE, fail_on_error is False (which is done automatically when 453 you specify tag_source_data as True) and tbl_to_derive_pk or 454 unexpected_rows_pk is configured. 455 - store_backend - which store_backend to use (e.g. s3 or file_system). 456 - local_fs_root_dir - path of the root directory. Note: only applicable for 457 store_backend file_system. 458 - data_docs_local_fs - the path for data docs only for store_backend 459 file_system. 460 - bucket - the bucket name to consider for the store_backend (store DQ artefacts). 461 Note: only applicable for store_backend s3. 462 - data_docs_bucket - the bucket name for data docs only. When defined, it will 463 supersede bucket parameter. Note: only applicable for store_backend s3. 464 - expectations_store_prefix - prefix where to store expectations' data. Note: only 465 applicable for store_backend s3. 466 - validations_store_prefix - prefix where to store validations' data. Note: only 467 applicable for store_backend s3. 468 - data_docs_prefix - prefix where to store data_docs' data. 469 - checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only 470 applicable for store_backend s3. 471 - data_asset_name - name of the data asset to consider when configuring the great 472 expectations' data source. 473 - expectation_suite_name - name to consider for great expectations' suite. 474 - result_sink_db_table - db.table_name indicating the database and table in which 475 to save the results of the DQ process. 476 - result_sink_location - file system location in which to save the results of the 477 DQ process. 478 - data_product_name - name of the data product. 479 - result_sink_partitions - the list of partitions to consider. 480 - result_sink_format - format of the result table (e.g. delta, parquet, kafka...). 481 - result_sink_options - extra spark options for configuring the result sink. 482 E.g: can be used to configure a Kafka sink if result_sink_format is kafka. 483 - result_sink_explode - flag to determine if the output table/location should have 484 the columns exploded (as True) or not (as False). Default: True. 485 - result_sink_extra_columns - list of extra columns to be exploded (following 486 the pattern "<name>.*") or columns to be selected. It is only used when 487 result_sink_explode is set to True. 488 - source - name of data source, to be easier to identify in analysis. If not 489 specified, it is set as default <input_id>. This will be only used 490 when result_sink_explode is set to True. 491 - fail_on_error - whether to fail the algorithm if the validations of your data in 492 the DQ process failed. 493 - cache_df - whether to cache the dataframe before running the DQ process or not. 494 - critical_functions - functions that should not fail. When this argument is 495 defined, fail_on_error is nullified. 496 - max_percentage_failure - percentage of failure that should be allowed. 497 This argument has priority over both fail_on_error and critical_functions. 498 """ 499 500 spec_id: str 501 input_id: str 502 dq_type: str 503 dq_functions: Optional[List[DQFunctionSpec]] = None 504 dq_db_table: Optional[str] = None 505 dq_table_table_filter: Optional[str] = None 506 dq_table_extra_filters: Optional[str] = None 507 execution_point: Optional[str] = None 508 unexpected_rows_pk: Optional[List[str]] = None 509 tbl_to_derive_pk: Optional[str] = None 510 sort_processed_keys: Optional[bool] = False 511 gx_result_format: Optional[str] = "COMPLETE" 512 tag_source_data: Optional[bool] = False 513 store_backend: str = DQDefaults.STORE_BACKEND.value 514 local_fs_root_dir: Optional[str] = None 515 data_docs_local_fs: Optional[str] = None 516 bucket: Optional[str] = None 517 data_docs_bucket: Optional[str] = None 518 expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value 519 validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value 520 data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value 521 checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value 522 data_asset_name: Optional[str] = None 523 expectation_suite_name: Optional[str] = None 524 result_sink_db_table: Optional[str] = None 525 result_sink_location: Optional[str] = None 526 data_product_name: Optional[str] = None 527 result_sink_partitions: Optional[List[str]] = None 528 result_sink_format: str = OutputFormat.DELTAFILES.value 529 result_sink_options: Optional[dict] = None 530 result_sink_explode: bool = True 531 result_sink_extra_columns: Optional[List[str]] = None 532 source: Optional[str] = None 533 fail_on_error: bool = True 534 cache_df: bool = False 535 critical_functions: Optional[List[DQFunctionSpec]] = None 536 max_percentage_failure: Optional[float] = None
Data quality overall specification.
- spec_id - id of the specification.
- input_id - id of the input specification.
- dq_type - type of DQ process to execute (e.g. validator).
- dq_functions - list of function specifications to execute.
- dq_db_table - name of table to derive the dq functions from.
- dq_table_table_filter - name of the table which rules are to be applied in the validations (Only used when deriving dq functions).
- dq_table_extra_filters - extra filters to be used when deriving dq functions. This is a sql expression to be applied to the dq_db_table.
- execution_point - execution point of the dq functions. [at_rest, in_motion]. This is set during the load_data or dq_validator functions.
- unexpected_rows_pk - the list of columns composing the primary key of the source data to identify the rows failing the DQ validations. Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It is mandatory to provide one of these arguments when using tag_source_data as True. When tag_source_data is False, this is not mandatory, but still recommended.
- tbl_to_derive_pk - db.table to automatically derive the unexpected_rows_pk from. Note: only one of tbl_to_derive_pk or unexpected_rows_pk arguments need to be provided. It is mandatory to provide one of these arguments when using tag_source_data as True. hen tag_source_data is False, this is not mandatory, but still recommended.
- sort_processed_keys - when using the
prisma
dq_type
, a columnprocessed_keys
is automatically added to give observability over the PK values that were processed during a run. This parameter (sort_processed_keys
) controls whether the processed keys column value should be sorted or not. Default: False. - gx_result_format - great expectations result format. Default: "COMPLETE".
- tag_source_data - when set to true, this will ensure that the DQ process ends by tagging the source data with an additional column with information about the DQ results. This column makes it possible to identify if the DQ run was succeeded in general and, if not, it unlocks the insights to know what specific rows have made the DQ validations fail and why. Default: False. Note: it only works if result_sink_explode is True, gx_result_format is COMPLETE, fail_on_error is False (which is done automatically when you specify tag_source_data as True) and tbl_to_derive_pk or unexpected_rows_pk is configured.
- store_backend - which store_backend to use (e.g. s3 or file_system).
- local_fs_root_dir - path of the root directory. Note: only applicable for store_backend file_system.
- data_docs_local_fs - the path for data docs only for store_backend file_system.
- bucket - the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3.
- data_docs_bucket - the bucket name for data docs only. When defined, it will supersede bucket parameter. Note: only applicable for store_backend s3.
- expectations_store_prefix - prefix where to store expectations' data. Note: only applicable for store_backend s3.
- validations_store_prefix - prefix where to store validations' data. Note: only applicable for store_backend s3.
- data_docs_prefix - prefix where to store data_docs' data.
- checkpoint_store_prefix - prefix where to store checkpoints' data. Note: only applicable for store_backend s3.
- data_asset_name - name of the data asset to consider when configuring the great expectations' data source.
- expectation_suite_name - name to consider for great expectations' suite.
- result_sink_db_table - db.table_name indicating the database and table in which to save the results of the DQ process.
- result_sink_location - file system location in which to save the results of the DQ process.
- data_product_name - name of the data product.
- result_sink_partitions - the list of partitions to consider.
- result_sink_format - format of the result table (e.g. delta, parquet, kafka...).
- result_sink_options - extra spark options for configuring the result sink. E.g: can be used to configure a Kafka sink if result_sink_format is kafka.
- result_sink_explode - flag to determine if the output table/location should have the columns exploded (as True) or not (as False). Default: True.
- result_sink_extra_columns - list of extra columns to be exploded (following
the pattern "
.*") or columns to be selected. It is only used when result_sink_explode is set to True. - source - name of data source, to be easier to identify in analysis. If not
specified, it is set as default
. This will be only used when result_sink_explode is set to True. - fail_on_error - whether to fail the algorithm if the validations of your data in the DQ process failed.
- cache_df - whether to cache the dataframe before running the DQ process or not.
- critical_functions - functions that should not fail. When this argument is defined, fail_on_error is nullified.
- max_percentage_failure - percentage of failure that should be allowed. This argument has priority over both fail_on_error and critical_functions.
539@dataclass 540class MergeOptions(object): 541 """Options for a merge operation. 542 543 - merge_predicate: predicate to apply to the merge operation so that we can 544 check if a new record corresponds to a record already included in the 545 historical data. 546 - insert_only: indicates if the merge should only insert data (e.g., deduplicate 547 scenarios). 548 - delete_predicate: predicate to apply to the delete operation. 549 - update_predicate: predicate to apply to the update operation. 550 - insert_predicate: predicate to apply to the insert operation. 551 - update_column_set: rules to apply to the update operation which allows to 552 set the value for each column to be updated. 553 (e.g. {"data": "new.data", "count": "current.count + 1"} ) 554 - insert_column_set: rules to apply to the insert operation which allows to 555 set the value for each column to be inserted. 556 (e.g. {"date": "updates.date", "count": "1"} ) 557 """ 558 559 merge_predicate: str 560 insert_only: bool = False 561 delete_predicate: Optional[str] = None 562 update_predicate: Optional[str] = None 563 insert_predicate: Optional[str] = None 564 update_column_set: Optional[dict] = None 565 insert_column_set: Optional[dict] = None
Options for a merge operation.
- merge_predicate: predicate to apply to the merge operation so that we can check if a new record corresponds to a record already included in the historical data.
- insert_only: indicates if the merge should only insert data (e.g., deduplicate scenarios).
- delete_predicate: predicate to apply to the delete operation.
- update_predicate: predicate to apply to the update operation.
- insert_predicate: predicate to apply to the insert operation.
- update_column_set: rules to apply to the update operation which allows to set the value for each column to be updated. (e.g. {"data": "new.data", "count": "current.count + 1"} )
- insert_column_set: rules to apply to the insert operation which allows to set the value for each column to be inserted. (e.g. {"date": "updates.date", "count": "1"} )
568@dataclass 569class OutputSpec(object): 570 """Specification of an algorithm output. 571 572 This is very aligned with the way the execution environment connects to the output 573 systems (e.g., spark outputs). 574 575 - spec_id: id of the output specification. 576 - input_id: id of the corresponding input specification. 577 - write_type: type of write operation. 578 - data_format: format of the output. Defaults to DELTA. 579 - db_table: table name in the form of `<db>.<table>`. 580 - location: uri that identifies from where to write data in the specified format. 581 - partitions: list of partition input_col names. 582 - merge_opts: options to apply to the merge operation. 583 - streaming_micro_batch_transformers: transformers to invoke for each streaming 584 micro batch, before writing (i.e., in Spark's foreachBatch structured 585 streaming function). Note: the lakehouse engine manages this for you, so 586 you don't have to manually specify streaming transformations here, so we don't 587 advise you to manually specify transformations through this parameter. Supply 588 them as regular transformers in the transform_specs sections of an ACON. 589 - streaming_once: if the streaming query is to be executed just once, or not, 590 generating just one micro batch. 591 - streaming_processing_time: if streaming query is to be kept alive, this indicates 592 the processing time of each micro batch. 593 - streaming_available_now: if set to True, set a trigger that processes all 594 available data in multiple batches then terminates the query. 595 When using streaming, this is the default trigger that the lakehouse-engine will 596 use, unless you configure a different one. 597 - streaming_continuous: set a trigger that runs a continuous query with a given 598 checkpoint interval. 599 - streaming_await_termination: whether to wait (True) for the termination of the 600 streaming query (e.g. timeout or exception) or not (False). Default: True. 601 - streaming_await_termination_timeout: a timeout to set to the 602 streaming_await_termination. Default: None. 603 - with_batch_id: whether to include the streaming batch id in the final data, 604 or not. It only takes effect in streaming mode. 605 - options: dict with other relevant options according to the execution environment 606 (e.g., spark) possible outputs. E.g.,: JDBC options, checkpoint location for 607 streaming, etc. 608 - streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers 609 but for the DQ functions to be executed. Used internally by the lakehouse 610 engine, so you don't have to supply DQ functions through this parameter. Use the 611 dq_specs of the acon instead. 612 """ 613 614 spec_id: str 615 input_id: str 616 write_type: str 617 data_format: str = OutputFormat.DELTAFILES.value 618 db_table: Optional[str] = None 619 location: Optional[str] = None 620 merge_opts: Optional[MergeOptions] = None 621 partitions: Optional[List[str]] = None 622 streaming_micro_batch_transformers: Optional[List[TransformerSpec]] = None 623 streaming_once: Optional[bool] = None 624 streaming_processing_time: Optional[str] = None 625 streaming_available_now: bool = True 626 streaming_continuous: Optional[str] = None 627 streaming_await_termination: bool = True 628 streaming_await_termination_timeout: Optional[int] = None 629 with_batch_id: bool = False 630 options: Optional[dict] = None 631 streaming_micro_batch_dq_processors: Optional[List[DQSpec]] = None
Specification of an algorithm output.
This is very aligned with the way the execution environment connects to the output systems (e.g., spark outputs).
- spec_id: id of the output specification.
- input_id: id of the corresponding input specification.
- write_type: type of write operation.
- data_format: format of the output. Defaults to DELTA.
- db_table: table name in the form of
<db>.<table>
. - location: uri that identifies from where to write data in the specified format.
- partitions: list of partition input_col names.
- merge_opts: options to apply to the merge operation.
- streaming_micro_batch_transformers: transformers to invoke for each streaming micro batch, before writing (i.e., in Spark's foreachBatch structured streaming function). Note: the lakehouse engine manages this for you, so you don't have to manually specify streaming transformations here, so we don't advise you to manually specify transformations through this parameter. Supply them as regular transformers in the transform_specs sections of an ACON.
- streaming_once: if the streaming query is to be executed just once, or not, generating just one micro batch.
- streaming_processing_time: if streaming query is to be kept alive, this indicates the processing time of each micro batch.
- streaming_available_now: if set to True, set a trigger that processes all available data in multiple batches then terminates the query. When using streaming, this is the default trigger that the lakehouse-engine will use, unless you configure a different one.
- streaming_continuous: set a trigger that runs a continuous query with a given checkpoint interval.
- streaming_await_termination: whether to wait (True) for the termination of the streaming query (e.g. timeout or exception) or not (False). Default: True.
- streaming_await_termination_timeout: a timeout to set to the streaming_await_termination. Default: None.
- with_batch_id: whether to include the streaming batch id in the final data, or not. It only takes effect in streaming mode.
- options: dict with other relevant options according to the execution environment (e.g., spark) possible outputs. E.g.,: JDBC options, checkpoint location for streaming, etc.
- streaming_micro_batch_dq_processors: similar to streaming_micro_batch_transformers but for the DQ functions to be executed. Used internally by the lakehouse engine, so you don't have to supply DQ functions through this parameter. Use the dq_specs of the acon instead.
634@dataclass 635class TerminatorSpec(object): 636 """Terminator Specification. 637 638 I.e., the specification that defines a terminator operation to be executed. Examples 639 are compute statistics, vacuum, optimize, etc. 640 641 - function: terminator function to execute. 642 - args: arguments of the terminator function. 643 - input_id: id of the corresponding output specification (Optional). 644 """ 645 646 function: str 647 args: Optional[dict] = None 648 input_id: Optional[str] = None
Terminator Specification.
I.e., the specification that defines a terminator operation to be executed. Examples are compute statistics, vacuum, optimize, etc.
- function: terminator function to execute.
- args: arguments of the terminator function.
- input_id: id of the corresponding output specification (Optional).
651@dataclass 652class ReconciliatorSpec(object): 653 """Reconciliator Specification. 654 655 - metrics: list of metrics in the form of: 656 [{ 657 metric: name of the column present in both truth and current datasets, 658 aggregation: sum, avg, max, min, ..., 659 type: percentage or absolute, 660 yellow: value, 661 red: value 662 }]. 663 - recon_type: reconciliation type (percentage or absolute). Percentage calculates 664 the difference between truth and current results as a percentage (x-y/x), and 665 absolute calculates the raw difference (x - y). 666 - truth_input_spec: input specification of the truth data. 667 - current_input_spec: input specification of the current results data 668 - truth_preprocess_query: additional query on top of the truth input data to 669 preprocess the truth data before it gets fueled into the reconciliation process. 670 Important note: you need to assume that the data out of 671 the truth_input_spec is referencable by a table called 'truth'. 672 - truth_preprocess_query_args: optional dict having the functions/transformations to 673 apply on top of the truth_preprocess_query and respective arguments. Note: cache 674 is being applied on the Dataframe, by default. For turning the default behavior 675 off, pass `"truth_preprocess_query_args": []`. 676 - current_preprocess_query: additional query on top of the current results input 677 data to preprocess the current results data before it gets fueled into the 678 reconciliation process. Important note: you need to assume that the data out of 679 the current_results_input_spec is referencable by a table called 'current'. 680 - current_preprocess_query_args: optional dict having the 681 functions/transformations to apply on top of the current_preprocess_query 682 and respective arguments. Note: cache is being applied on the Dataframe, 683 by default. For turning the default behavior off, pass 684 `"current_preprocess_query_args": []`. 685 - ignore_empty_df: optional boolean, to ignore the recon process if source & target 686 dataframes are empty, recon will exit success code (passed) 687 """ 688 689 metrics: List[dict] 690 truth_input_spec: InputSpec 691 current_input_spec: InputSpec 692 truth_preprocess_query: Optional[str] = None 693 truth_preprocess_query_args: Optional[List[dict]] = None 694 current_preprocess_query: Optional[str] = None 695 current_preprocess_query_args: Optional[List[dict]] = None 696 ignore_empty_df: Optional[bool] = False
Reconciliator Specification.
- metrics: list of metrics in the form of: [{ metric: name of the column present in both truth and current datasets, aggregation: sum, avg, max, min, ..., type: percentage or absolute, yellow: value, red: value }].
- recon_type: reconciliation type (percentage or absolute). Percentage calculates the difference between truth and current results as a percentage (x-y/x), and absolute calculates the raw difference (x - y).
- truth_input_spec: input specification of the truth data.
- current_input_spec: input specification of the current results data
- truth_preprocess_query: additional query on top of the truth input data to preprocess the truth data before it gets fueled into the reconciliation process. Important note: you need to assume that the data out of the truth_input_spec is referencable by a table called 'truth'.
- truth_preprocess_query_args: optional dict having the functions/transformations to
apply on top of the truth_preprocess_query and respective arguments. Note: cache
is being applied on the Dataframe, by default. For turning the default behavior
off, pass
"truth_preprocess_query_args": []
. - current_preprocess_query: additional query on top of the current results input data to preprocess the current results data before it gets fueled into the reconciliation process. Important note: you need to assume that the data out of the current_results_input_spec is referencable by a table called 'current'.
- current_preprocess_query_args: optional dict having the
functions/transformations to apply on top of the current_preprocess_query
and respective arguments. Note: cache is being applied on the Dataframe,
by default. For turning the default behavior off, pass
"current_preprocess_query_args": []
. - ignore_empty_df: optional boolean, to ignore the recon process if source & target dataframes are empty, recon will exit success code (passed)
699@dataclass 700class DQValidatorSpec(object): 701 """Data Quality Validator Specification. 702 703 - input_spec: input specification of the data to be checked/validated. 704 - dq_spec: data quality specification. 705 - restore_prev_version: specify if, having 706 delta table/files as input, they should be restored to the 707 previous version if the data quality process fails. Note: this 708 is only considered if fail_on_error is kept as True. 709 """ 710 711 input_spec: InputSpec 712 dq_spec: DQSpec 713 restore_prev_version: Optional[bool] = False
Data Quality Validator Specification.
- input_spec: input specification of the data to be checked/validated.
- dq_spec: data quality specification.
- restore_prev_version: specify if, having delta table/files as input, they should be restored to the previous version if the data quality process fails. Note: this is only considered if fail_on_error is kept as True.
716class SQLDefinitions(Enum): 717 """SQL definitions statements.""" 718 719 compute_table_stats = "ANALYZE TABLE {} COMPUTE STATISTICS" 720 drop_table_stmt = "DROP TABLE IF EXISTS" 721 drop_view_stmt = "DROP VIEW IF EXISTS" 722 truncate_stmt = "TRUNCATE TABLE" 723 describe_stmt = "DESCRIBE TABLE" 724 optimize_stmt = "OPTIMIZE" 725 show_tbl_props_stmt = "SHOW TBLPROPERTIES" 726 delete_where_stmt = "DELETE FROM {} WHERE {}"
SQL definitions statements.
Inherited Members
- enum.Enum
- name
- value
729class FileManagerAPIKeys(Enum): 730 """File Manager s3 api keys.""" 731 732 CONTENTS = "Contents" 733 KEY = "Key" 734 CONTINUATION = "NextContinuationToken" 735 BUCKET = "Bucket" 736 OBJECTS = "Objects"
File Manager s3 api keys.
Inherited Members
- enum.Enum
- name
- value
739@dataclass 740class SensorSpec(object): 741 """Sensor Specification. 742 743 - sensor_id: sensor id. 744 - assets: a list of assets that are considered as available to 745 consume downstream after this sensor has status 746 PROCESSED_NEW_DATA. 747 - control_db_table_name: db.table to store sensor metadata. 748 - input_spec: input specification of the source to be checked for new data. 749 - preprocess_query: SQL query to transform/filter the result from the 750 upstream. Consider that we should refer to 'new_data' whenever 751 we are referring to the input of the sensor. E.g.: 752 "SELECT dummy_col FROM new_data WHERE ..." 753 - checkpoint_location: optional location to store checkpoints to resume 754 from. These checkpoints use the same as Spark checkpoint strategy. 755 For Spark readers that do not support checkpoints, use the 756 preprocess_query parameter to form a SQL query to filter the result 757 from the upstream accordingly. 758 - fail_on_empty_result: if the sensor should throw an error if there is no new 759 data in the upstream. Default: True. 760 """ 761 762 sensor_id: str 763 assets: List[str] 764 control_db_table_name: str 765 input_spec: InputSpec 766 preprocess_query: Optional[str] 767 checkpoint_location: Optional[str] 768 fail_on_empty_result: bool = True 769 770 @classmethod 771 def create_from_acon(cls, acon: dict): # type: ignore 772 """Create SensorSpec from acon. 773 774 Args: 775 acon: sensor ACON. 776 """ 777 checkpoint_location = acon.get("base_checkpoint_location") 778 if checkpoint_location: 779 checkpoint_location = ( 780 f"{checkpoint_location.rstrip('/')}/lakehouse_engine/" 781 f"sensors/{acon['sensor_id']}" 782 ) 783 784 return cls( 785 sensor_id=acon["sensor_id"], 786 assets=acon["assets"], 787 control_db_table_name=acon["control_db_table_name"], 788 input_spec=InputSpec(**acon["input_spec"]), 789 preprocess_query=acon.get("preprocess_query"), 790 checkpoint_location=checkpoint_location, 791 fail_on_empty_result=acon.get("fail_on_empty_result", True), 792 )
Sensor Specification.
- sensor_id: sensor id.
- assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.
- control_db_table_name: db.table to store sensor metadata.
- input_spec: input specification of the source to be checked for new data.
- preprocess_query: SQL query to transform/filter the result from the upstream. Consider that we should refer to 'new_data' whenever we are referring to the input of the sensor. E.g.: "SELECT dummy_col FROM new_data WHERE ..."
- checkpoint_location: optional location to store checkpoints to resume from. These checkpoints use the same as Spark checkpoint strategy. For Spark readers that do not support checkpoints, use the preprocess_query parameter to form a SQL query to filter the result from the upstream accordingly.
- fail_on_empty_result: if the sensor should throw an error if there is no new data in the upstream. Default: True.
770 @classmethod 771 def create_from_acon(cls, acon: dict): # type: ignore 772 """Create SensorSpec from acon. 773 774 Args: 775 acon: sensor ACON. 776 """ 777 checkpoint_location = acon.get("base_checkpoint_location") 778 if checkpoint_location: 779 checkpoint_location = ( 780 f"{checkpoint_location.rstrip('/')}/lakehouse_engine/" 781 f"sensors/{acon['sensor_id']}" 782 ) 783 784 return cls( 785 sensor_id=acon["sensor_id"], 786 assets=acon["assets"], 787 control_db_table_name=acon["control_db_table_name"], 788 input_spec=InputSpec(**acon["input_spec"]), 789 preprocess_query=acon.get("preprocess_query"), 790 checkpoint_location=checkpoint_location, 791 fail_on_empty_result=acon.get("fail_on_empty_result", True), 792 )
Create SensorSpec from acon.
Arguments:
- acon: sensor ACON.
795class SensorStatus(Enum): 796 """Status for a sensor.""" 797 798 ACQUIRED_NEW_DATA = "ACQUIRED_NEW_DATA" 799 PROCESSED_NEW_DATA = "PROCESSED_NEW_DATA"
Status for a sensor.
Inherited Members
- enum.Enum
- name
- value
829class SAPLogchain(Enum): 830 """Defaults used on consuming data from SAP Logchain.""" 831 832 DBTABLE = "SAPPHA.RSPCLOGCHAIN" 833 GREEN_STATUS = "G" 834 ENGINE_TABLE = "sensor_new_data"
Defaults used on consuming data from SAP Logchain.
Inherited Members
- enum.Enum
- name
- value
837class RestoreType(Enum): 838 """Archive types.""" 839 840 BULK = "Bulk" 841 STANDARD = "Standard" 842 EXPEDITED = "Expedited" 843 844 @classmethod 845 def values(cls): # type: ignore 846 """Generates a list containing all enum values. 847 848 Return: 849 A list with all enum values. 850 """ 851 return (c.value for c in cls) 852 853 @classmethod 854 def exists(cls, restore_type: str) -> bool: 855 """Checks if the restore type exists in the enum values. 856 857 Args: 858 restore_type: restore type to check if exists. 859 860 Return: 861 If the restore type exists in our enum. 862 """ 863 return restore_type in cls.values()
Archive types.
844 @classmethod 845 def values(cls): # type: ignore 846 """Generates a list containing all enum values. 847 848 Return: 849 A list with all enum values. 850 """ 851 return (c.value for c in cls)
Generates a list containing all enum values.
Return:
A list with all enum values.
853 @classmethod 854 def exists(cls, restore_type: str) -> bool: 855 """Checks if the restore type exists in the enum values. 856 857 Args: 858 restore_type: restore type to check if exists. 859 860 Return: 861 If the restore type exists in our enum. 862 """ 863 return restore_type in cls.values()
Checks if the restore type exists in the enum values.
Arguments:
- restore_type: restore type to check if exists.
Return:
If the restore type exists in our enum.
Inherited Members
- enum.Enum
- name
- value
866class RestoreStatus(Enum): 867 """Archive types.""" 868 869 NOT_STARTED = "not_started" 870 ONGOING = "ongoing" 871 RESTORED = "restored"
Archive types.
Inherited Members
- enum.Enum
- name
- value
881class SQLParser(Enum): 882 """Defaults to use for parsing.""" 883 884 DOUBLE_QUOTES = '"' 885 SINGLE_QUOTES = "'" 886 BACKSLASH = "\\" 887 SINGLE_TRACE = "-" 888 DOUBLE_TRACES = "--" 889 SLASH = "/" 890 OPENING_MULTIPLE_LINE_COMMENT = "/*" 891 CLOSING_MULTIPLE_LINE_COMMENT = "*/" 892 PARAGRAPH = "\n" 893 STAR = "*" 894 895 MULTIPLE_LINE_COMMENT = [ 896 OPENING_MULTIPLE_LINE_COMMENT, 897 CLOSING_MULTIPLE_LINE_COMMENT, 898 ]
Defaults to use for parsing.
Inherited Members
- enum.Enum
- name
- value
901class GABDefaults(Enum): 902 """Defaults used on the GAB process.""" 903 904 DATE_FORMAT = "%Y-%m-%d" 905 DIMENSIONS_DEFAULT_COLUMNS = ["from_date", "to_date"] 906 DEFAULT_DIMENSION_CALENDAR_TABLE = "dim_calendar" 907 DEFAULT_LOOKUP_QUERY_BUILDER_TABLE = "lkp_query_builder"
Defaults used on the GAB process.
Inherited Members
- enum.Enum
- name
- value
910class GABStartOfWeek(Enum): 911 """Representation of start of week values on GAB.""" 912 913 SUNDAY = "S" 914 MONDAY = "M" 915 916 @classmethod 917 def get_start_of_week(cls) -> dict: 918 """Get the start of week enum as a dict. 919 920 Returns: 921 dict containing all enum entries as `{name:value}`. 922 """ 923 return { 924 start_of_week.name: start_of_week.value 925 for start_of_week in list(GABStartOfWeek) 926 } 927 928 @classmethod 929 def get_values(cls) -> set[str]: 930 """Get the start of week enum values as set. 931 932 Returns: 933 set containing all possible values `{value}`. 934 """ 935 return {start_of_week.value for start_of_week in list(GABStartOfWeek)}
Representation of start of week values on GAB.
916 @classmethod 917 def get_start_of_week(cls) -> dict: 918 """Get the start of week enum as a dict. 919 920 Returns: 921 dict containing all enum entries as `{name:value}`. 922 """ 923 return { 924 start_of_week.name: start_of_week.value 925 for start_of_week in list(GABStartOfWeek) 926 }
Get the start of week enum as a dict.
Returns:
dict containing all enum entries as
{name:value}
.
928 @classmethod 929 def get_values(cls) -> set[str]: 930 """Get the start of week enum values as set. 931 932 Returns: 933 set containing all possible values `{value}`. 934 """ 935 return {start_of_week.value for start_of_week in list(GABStartOfWeek)}
Get the start of week enum values as set.
Returns:
set containing all possible values
{value}
.
Inherited Members
- enum.Enum
- name
- value
938@dataclass 939class GABSpec(object): 940 """Gab Specification. 941 942 query_label_filter: query use-case label to execute. 943 queue_filter: queue to execute the job. 944 cadence_filter: selected cadences to build the asset. 945 target_database: target database to write. 946 curr_date: current date. 947 start_date: period start date. 948 end_date: period end date. 949 rerun_flag: rerun flag. 950 target_table: target table to write. 951 source_database: source database. 952 gab_base_path: base path to read the use cases. 953 lookup_table: gab configuration table. 954 calendar_table: gab calendar table. 955 """ 956 957 query_label_filter: list[str] 958 queue_filter: list[str] 959 cadence_filter: list[str] 960 target_database: str 961 current_date: datetime 962 start_date: datetime 963 end_date: datetime 964 rerun_flag: str 965 target_table: str 966 source_database: str 967 gab_base_path: str 968 lookup_table: str 969 calendar_table: str 970 971 @classmethod 972 def create_from_acon(cls, acon: dict): # type: ignore 973 """Create GabSpec from acon. 974 975 Args: 976 acon: gab ACON. 977 """ 978 lookup_table = f"{acon['source_database']}." + ( 979 acon.get( 980 "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value 981 ) 982 ) 983 984 calendar_table = f"{acon['source_database']}." + ( 985 acon.get( 986 "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value 987 ) 988 ) 989 990 def format_date(date_to_format: Union[datetime, str]) -> datetime: 991 if isinstance(date_to_format, str): 992 return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value) 993 else: 994 return date_to_format 995 996 return cls( 997 query_label_filter=acon["query_label_filter"], 998 queue_filter=acon["queue_filter"], 999 cadence_filter=acon["cadence_filter"], 1000 target_database=acon["target_database"], 1001 current_date=datetime.now(), 1002 start_date=format_date(acon["start_date"]), 1003 end_date=format_date(acon["end_date"]), 1004 rerun_flag=acon["rerun_flag"], 1005 target_table=acon["target_table"], 1006 source_database=acon["source_database"], 1007 gab_base_path=acon["gab_base_path"], 1008 lookup_table=lookup_table, 1009 calendar_table=calendar_table, 1010 )
Gab Specification.
query_label_filter: query use-case label to execute. queue_filter: queue to execute the job. cadence_filter: selected cadences to build the asset. target_database: target database to write. curr_date: current date. start_date: period start date. end_date: period end date. rerun_flag: rerun flag. target_table: target table to write. source_database: source database. gab_base_path: base path to read the use cases. lookup_table: gab configuration table. calendar_table: gab calendar table.
971 @classmethod 972 def create_from_acon(cls, acon: dict): # type: ignore 973 """Create GabSpec from acon. 974 975 Args: 976 acon: gab ACON. 977 """ 978 lookup_table = f"{acon['source_database']}." + ( 979 acon.get( 980 "lookup_table", GABDefaults.DEFAULT_LOOKUP_QUERY_BUILDER_TABLE.value 981 ) 982 ) 983 984 calendar_table = f"{acon['source_database']}." + ( 985 acon.get( 986 "calendar_table", GABDefaults.DEFAULT_DIMENSION_CALENDAR_TABLE.value 987 ) 988 ) 989 990 def format_date(date_to_format: Union[datetime, str]) -> datetime: 991 if isinstance(date_to_format, str): 992 return datetime.strptime(date_to_format, GABDefaults.DATE_FORMAT.value) 993 else: 994 return date_to_format 995 996 return cls( 997 query_label_filter=acon["query_label_filter"], 998 queue_filter=acon["queue_filter"], 999 cadence_filter=acon["cadence_filter"], 1000 target_database=acon["target_database"], 1001 current_date=datetime.now(), 1002 start_date=format_date(acon["start_date"]), 1003 end_date=format_date(acon["end_date"]), 1004 rerun_flag=acon["rerun_flag"], 1005 target_table=acon["target_table"], 1006 source_database=acon["source_database"], 1007 gab_base_path=acon["gab_base_path"], 1008 lookup_table=lookup_table, 1009 calendar_table=calendar_table, 1010 )
Create GabSpec from acon.
Arguments:
- acon: gab ACON.
1013class GABCadence(Enum): 1014 """Representation of the supported cadences on GAB.""" 1015 1016 DAY = 1 1017 WEEK = 2 1018 MONTH = 3 1019 QUARTER = 4 1020 YEAR = 5 1021 1022 @classmethod 1023 def get_ordered_cadences(cls) -> dict: 1024 """Get the cadences ordered by the value. 1025 1026 Returns: 1027 dict containing ordered cadences as `{name:value}`. 1028 """ 1029 cadences = list(GABCadence) 1030 return { 1031 cadence.name: cadence.value 1032 for cadence in sorted(cadences, key=lambda gab_cadence: gab_cadence.value) 1033 } 1034 1035 @classmethod 1036 def get_cadences(cls) -> set[str]: 1037 """Get the cadences values as set. 1038 1039 Returns: 1040 set containing all possible cadence values as `{value}`. 1041 """ 1042 return {cadence.name for cadence in list(GABCadence)} 1043 1044 @classmethod 1045 def order_cadences(cls, cadences_to_order: list[str]) -> list[str]: 1046 """Order a list of cadences by value. 1047 1048 Returns: 1049 ordered set containing the received cadences. 1050 """ 1051 return sorted( 1052 cadences_to_order, 1053 key=lambda item: cls.get_ordered_cadences().get(item), # type: ignore 1054 )
Representation of the supported cadences on GAB.
1022 @classmethod 1023 def get_ordered_cadences(cls) -> dict: 1024 """Get the cadences ordered by the value. 1025 1026 Returns: 1027 dict containing ordered cadences as `{name:value}`. 1028 """ 1029 cadences = list(GABCadence) 1030 return { 1031 cadence.name: cadence.value 1032 for cadence in sorted(cadences, key=lambda gab_cadence: gab_cadence.value) 1033 }
Get the cadences ordered by the value.
Returns:
dict containing ordered cadences as
{name:value}
.
1035 @classmethod 1036 def get_cadences(cls) -> set[str]: 1037 """Get the cadences values as set. 1038 1039 Returns: 1040 set containing all possible cadence values as `{value}`. 1041 """ 1042 return {cadence.name for cadence in list(GABCadence)}
Get the cadences values as set.
Returns:
set containing all possible cadence values as
{value}
.
1044 @classmethod 1045 def order_cadences(cls, cadences_to_order: list[str]) -> list[str]: 1046 """Order a list of cadences by value. 1047 1048 Returns: 1049 ordered set containing the received cadences. 1050 """ 1051 return sorted( 1052 cadences_to_order, 1053 key=lambda item: cls.get_ordered_cadences().get(item), # type: ignore 1054 )
Order a list of cadences by value.
Returns:
ordered set containing the received cadences.
Inherited Members
- enum.Enum
- name
- value
1057class GABKeys: 1058 """Constants used to update pre-configured gab dict key.""" 1059 1060 JOIN_SELECT = "join_select" 1061 PROJECT_START = "project_start" 1062 PROJECT_END = "project_end"
Constants used to update pre-configured gab dict key.
1065class GABReplaceableKeys: 1066 """Constants used to replace pre-configured gab dict values.""" 1067 1068 CADENCE = "${cad}" 1069 DATE_COLUMN = "${date_column}" 1070 CONFIG_WEEK_START = "${config_week_start}" 1071 RECONCILIATION_CADENCE = "${rec_cadence}"
Constants used to replace pre-configured gab dict values.
1074class GABCombinedConfiguration(Enum): 1075 """GAB combined configuration. 1076 1077 Based on the use case configuration return the values to override in the SQL file. 1078 This enum aims to exhaustively map each combination of `cadence`, `reconciliation`, 1079 `week_start` and `snap_flag` return the corresponding values `join_select`, 1080 `project_start` and `project_end` to replace this values in the stages SQL file. 1081 1082 Return corresponding configuration (join_select, project_start, project_end) for 1083 each combination (cadence x recon x week_start x snap_flag). 1084 """ 1085 1086 _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE = ( 1087 "date(date_trunc('${cad}',${date_column}))" 1088 ) 1089 _DEFAULT_PROJECT_START = "df_cal.cadence_start_date" 1090 _DEFAULT_PROJECT_END = "df_cal.cadence_end_date" 1091 1092 COMBINED_CONFIGURATION = { 1093 # Combination of: 1094 # - cadence: `DAY` 1095 # - reconciliation_window: `DAY`, `WEEK`, `MONTH`, `QUARTER`, `YEAR` 1096 # - week_start: `S`, `M` 1097 # - snapshot_flag: `Y`, `N` 1098 1: { 1099 "cadence": GABCadence.DAY.name, 1100 "recon": GABCadence.get_cadences(), 1101 "week_start": GABStartOfWeek.get_values(), 1102 "snap_flag": {"Y", "N"}, 1103 "join_select": "", 1104 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1105 "project_end": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1106 }, 1107 # Combination of: 1108 # - cadence: `WEEK` 1109 # - reconciliation_window: `DAY` 1110 # - week_start: `S`, `M` 1111 # - snapshot_flag: `Y` 1112 2: { 1113 "cadence": GABCadence.WEEK.name, 1114 "recon": GABCadence.DAY.name, 1115 "week_start": GABStartOfWeek.get_values(), 1116 "snap_flag": "Y", 1117 "join_select": """ 1118 select distinct case 1119 when '${config_week_start}' = 'Monday' then weekstart_mon 1120 when '${config_week_start}' = 'Sunday' then weekstart_sun 1121 end as cadence_start_date, 1122 calendar_date as cadence_end_date 1123 """, 1124 "project_start": _DEFAULT_PROJECT_START, 1125 "project_end": _DEFAULT_PROJECT_END, 1126 }, 1127 # Combination of: 1128 # - cadence: `WEEK` 1129 # - reconciliation_window: `DAY, `MONTH`, `QUARTER`, `YEAR` 1130 # - week_start: `M` 1131 # - snapshot_flag: `Y`, `N` 1132 3: { 1133 "cadence": GABCadence.WEEK.name, 1134 "recon": { 1135 GABCadence.DAY.name, 1136 GABCadence.MONTH.name, 1137 GABCadence.QUARTER.name, 1138 GABCadence.YEAR.name, 1139 }, 1140 "week_start": "M", 1141 "snap_flag": {"Y", "N"}, 1142 "join_select": """ 1143 select distinct case 1144 when '${config_week_start}' = 'Monday' then weekstart_mon 1145 when '${config_week_start}' = 'Sunday' then weekstart_sun 1146 end as cadence_start_date, 1147 case 1148 when '${config_week_start}' = 'Monday' then weekend_mon 1149 when '${config_week_start}' = 'Sunday' then weekend_sun 1150 end as cadence_end_date""", 1151 "project_start": _DEFAULT_PROJECT_START, 1152 "project_end": _DEFAULT_PROJECT_END, 1153 }, 1154 4: { 1155 "cadence": GABCadence.MONTH.name, 1156 "recon": GABCadence.DAY.name, 1157 "week_start": GABStartOfWeek.get_values(), 1158 "snap_flag": "Y", 1159 "join_select": """ 1160 select distinct month_start as cadence_start_date, 1161 calendar_date as cadence_end_date 1162 """, 1163 "project_start": _DEFAULT_PROJECT_START, 1164 "project_end": _DEFAULT_PROJECT_END, 1165 }, 1166 5: { 1167 "cadence": GABCadence.MONTH.name, 1168 "recon": GABCadence.WEEK.name, 1169 "week_start": GABStartOfWeek.MONDAY.value, 1170 "snap_flag": "Y", 1171 "join_select": """ 1172 select distinct month_start as cadence_start_date, 1173 case 1174 when date( 1175 date_trunc('MONTH',add_months(calendar_date, 1)) 1176 )-1 < weekend_mon 1177 then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1 1178 else weekend_mon 1179 end as cadence_end_date""", 1180 "project_start": _DEFAULT_PROJECT_START, 1181 "project_end": _DEFAULT_PROJECT_END, 1182 }, 1183 6: { 1184 "cadence": GABCadence.MONTH.name, 1185 "recon": GABCadence.WEEK.name, 1186 "week_start": GABStartOfWeek.SUNDAY.value, 1187 "snap_flag": "Y", 1188 "join_select": """ 1189 select distinct month_start as cadence_start_date, 1190 case 1191 when date( 1192 date_trunc('MONTH',add_months(calendar_date, 1)) 1193 )-1 < weekend_sun 1194 then date(date_trunc('MONTH',add_months(calendar_date, 1)))-1 1195 else weekend_sun 1196 end as cadence_end_date""", 1197 "project_start": _DEFAULT_PROJECT_START, 1198 "project_end": _DEFAULT_PROJECT_END, 1199 }, 1200 7: { 1201 "cadence": GABCadence.MONTH.name, 1202 "recon": GABCadence.get_cadences(), 1203 "week_start": GABStartOfWeek.get_values(), 1204 "snap_flag": {"Y", "N"}, 1205 "join_select": "", 1206 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1207 "project_end": "date(date_trunc('MONTH',add_months(${date_column}, 1)))-1", 1208 }, 1209 8: { 1210 "cadence": GABCadence.QUARTER.name, 1211 "recon": GABCadence.DAY.name, 1212 "week_start": GABStartOfWeek.get_values(), 1213 "snap_flag": "Y", 1214 "join_select": """ 1215 select distinct quarter_start as cadence_start_date, 1216 calendar_date as cadence_end_date 1217 """, 1218 "project_start": _DEFAULT_PROJECT_START, 1219 "project_end": _DEFAULT_PROJECT_END, 1220 }, 1221 9: { 1222 "cadence": GABCadence.QUARTER.name, 1223 "recon": GABCadence.WEEK.name, 1224 "week_start": GABStartOfWeek.MONDAY.value, 1225 "snap_flag": "Y", 1226 "join_select": """ 1227 select distinct quarter_start as cadence_start_date, 1228 case 1229 when weekend_mon > date( 1230 date_trunc('QUARTER',add_months(calendar_date, 3)) 1231 )-1 1232 then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1 1233 else weekend_mon 1234 end as cadence_end_date""", 1235 "project_start": _DEFAULT_PROJECT_START, 1236 "project_end": _DEFAULT_PROJECT_END, 1237 }, 1238 10: { 1239 "cadence": GABCadence.QUARTER.name, 1240 "recon": GABCadence.WEEK.name, 1241 "week_start": GABStartOfWeek.SUNDAY.value, 1242 "snap_flag": "Y", 1243 "join_select": """ 1244 select distinct quarter_start as cadence_start_date, 1245 case 1246 when weekend_sun > date( 1247 date_trunc('QUARTER',add_months(calendar_date, 3)) 1248 )-1 1249 then date(date_trunc('QUARTER',add_months(calendar_date, 3)))-1 1250 else weekend_sun 1251 end as cadence_end_date""", 1252 "project_start": _DEFAULT_PROJECT_START, 1253 "project_end": _DEFAULT_PROJECT_END, 1254 }, 1255 11: { 1256 "cadence": GABCadence.QUARTER.name, 1257 "recon": GABCadence.MONTH.name, 1258 "week_start": GABStartOfWeek.get_values(), 1259 "snap_flag": "Y", 1260 "join_select": """ 1261 select distinct quarter_start as cadence_start_date, 1262 month_end as cadence_end_date 1263 """, 1264 "project_start": _DEFAULT_PROJECT_START, 1265 "project_end": _DEFAULT_PROJECT_END, 1266 }, 1267 12: { 1268 "cadence": GABCadence.QUARTER.name, 1269 "recon": GABCadence.YEAR.name, 1270 "week_start": GABStartOfWeek.get_values(), 1271 "snap_flag": "N", 1272 "join_select": "", 1273 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1274 "project_end": """ 1275 date( 1276 date_trunc( 1277 '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 3) 1278 ) 1279 )-1 1280 """, 1281 }, 1282 13: { 1283 "cadence": GABCadence.QUARTER.name, 1284 "recon": GABCadence.get_cadences(), 1285 "week_start": GABStartOfWeek.get_values(), 1286 "snap_flag": "N", 1287 "join_select": "", 1288 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1289 "project_end": """ 1290 date( 1291 date_trunc( 1292 '${cad}',add_months( date(date_trunc('${cad}',${date_column})), 3) 1293 ) 1294 )-1 1295 """, 1296 }, 1297 14: { 1298 "cadence": GABCadence.YEAR.name, 1299 "recon": GABCadence.WEEK.name, 1300 "week_start": GABStartOfWeek.MONDAY.value, 1301 "snap_flag": "Y", 1302 "join_select": """ 1303 select distinct year_start as cadence_start_date, 1304 case 1305 when weekend_mon > date( 1306 date_trunc('YEAR',add_months(calendar_date, 12)) 1307 )-1 1308 then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1 1309 else weekend_mon 1310 end as cadence_end_date""", 1311 "project_start": _DEFAULT_PROJECT_START, 1312 "project_end": _DEFAULT_PROJECT_END, 1313 }, 1314 15: { 1315 "cadence": GABCadence.YEAR.name, 1316 "recon": GABCadence.WEEK.name, 1317 "week_start": GABStartOfWeek.SUNDAY.value, 1318 "snap_flag": "Y", 1319 "join_select": """ 1320 select distinct year_start as cadence_start_date, 1321 case 1322 when weekend_sun > date( 1323 date_trunc('YEAR',add_months(calendar_date, 12)) 1324 )-1 1325 then date(date_trunc('YEAR',add_months(calendar_date, 12)))-1 1326 else weekend_sun 1327 end as cadence_end_date""", 1328 "project_start": _DEFAULT_PROJECT_START, 1329 "project_end": _DEFAULT_PROJECT_END, 1330 }, 1331 16: { 1332 "cadence": GABCadence.YEAR.name, 1333 "recon": GABCadence.get_cadences(), 1334 "week_start": GABStartOfWeek.get_values(), 1335 "snap_flag": "N", 1336 "inverse_flag": "Y", 1337 "join_select": "", 1338 "project_start": _PROJECT_DATE_COLUMN_TRUNCATED_BY_CADENCE, 1339 "project_end": """ 1340 date( 1341 date_trunc( 1342 '${cad}',add_months(date(date_trunc('${cad}',${date_column})), 12) 1343 ) 1344 )-1 1345 """, 1346 }, 1347 17: { 1348 "cadence": GABCadence.YEAR.name, 1349 "recon": { 1350 GABCadence.DAY.name, 1351 GABCadence.MONTH.name, 1352 GABCadence.QUARTER.name, 1353 }, 1354 "week_start": GABStartOfWeek.get_values(), 1355 "snap_flag": "Y", 1356 "join_select": """ 1357 select distinct year_start as cadence_start_date, 1358 case 1359 when '${rec_cadence}' = 'DAY' then calendar_date 1360 when '${rec_cadence}' = 'MONTH' then month_end 1361 when '${rec_cadence}' = 'QUARTER' then quarter_end 1362 end as cadence_end_date 1363 """, 1364 "project_start": _DEFAULT_PROJECT_START, 1365 "project_end": _DEFAULT_PROJECT_END, 1366 }, 1367 18: { 1368 "cadence": GABCadence.get_cadences(), 1369 "recon": GABCadence.get_cadences(), 1370 "week_start": GABStartOfWeek.get_values(), 1371 "snap_flag": {"Y", "N"}, 1372 "join_select": """ 1373 select distinct 1374 case 1375 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday' 1376 then weekstart_mon 1377 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday' 1378 then weekstart_sun 1379 else 1380 date(date_trunc('${cad}',calendar_date)) 1381 end as cadence_start_date, 1382 case 1383 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Monday' 1384 then weekend_mon 1385 when '${cad}' = 'WEEK' and '${config_week_start}' = 'Sunday' 1386 then weekend_sun 1387 when '${cad}' = 'DAY' 1388 then date(date_trunc('${cad}',calendar_date)) 1389 when '${cad}' = 'MONTH' 1390 then date( 1391 date_trunc( 1392 'MONTH', 1393 add_months(date(date_trunc('${cad}',calendar_date)), 1) 1394 ) 1395 )-1 1396 when '${cad}' = 'QUARTER' 1397 then date( 1398 date_trunc( 1399 'QUARTER', 1400 add_months(date(date_trunc('${cad}',calendar_date)) , 3) 1401 ) 1402 )-1 1403 when '${cad}' = 'YEAR' 1404 then date( 1405 date_trunc( 1406 'YEAR', 1407 add_months(date(date_trunc('${cad}',calendar_date)), 12) 1408 ) 1409 )-1 1410 end as cadence_end_date 1411 """, 1412 "project_start": _DEFAULT_PROJECT_START, 1413 "project_end": _DEFAULT_PROJECT_END, 1414 }, 1415 }
GAB combined configuration.
Based on the use case configuration return the values to override in the SQL file.
This enum aims to exhaustively map each combination of cadence
, reconciliation
,
week_start
and snap_flag
return the corresponding values join_select
,
project_start
and project_end
to replace this values in the stages SQL file.
Return corresponding configuration (join_select, project_start, project_end) for each combination (cadence x recon x week_start x snap_flag).
Inherited Members
- enum.Enum
- name
- value