lakehouse_engine.core.gab_manager
Module to define GAB Manager classes.
1"""Module to define GAB Manager classes.""" 2 3import calendar 4from datetime import datetime, timedelta 5from typing import Tuple, Union, cast 6 7import pendulum 8from pendulum import DateTime 9from pyspark.sql import DataFrame 10 11from lakehouse_engine.core.definitions import GABCadence, GABDefaults 12from lakehouse_engine.core.gab_sql_generator import GABViewGenerator 13from lakehouse_engine.utils.gab_utils import GABUtils 14from lakehouse_engine.utils.logging_handler import LoggingHandler 15 16 17class GABCadenceManager(object): 18 """Class to control the GAB Cadence Window.""" 19 20 _LOGGER = LoggingHandler(__name__).get_logger() 21 22 def extended_window_calculator( 23 self, 24 cadence: str, 25 reconciliation_cadence: str, 26 current_date: datetime, 27 start_date_str: str, 28 end_date_str: str, 29 query_type: str, 30 rerun_flag: str, 31 snapshot_flag: str, 32 ) -> tuple[datetime, datetime, datetime, datetime]: 33 """extended_window_calculator function. 34 35 Calculates the extended window of any cadence despite the user providing 36 custom dates which are not the exact start and end dates of a cadence. 37 38 Args: 39 cadence: cadence to process 40 reconciliation_cadence: reconciliation to process. 41 current_date: current date. 42 start_date_str: start date of the period to process. 43 end_date_str: end date of the period to process. 44 query_type: use case query type. 45 rerun_flag: flag indicating if it's a rerun or a normal run. 46 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 47 """ 48 cad_order = GABCadence.get_ordered_cadences() 49 50 derived_cadence = self._get_reconciliation_cadence( 51 cad_order, rerun_flag, cadence, reconciliation_cadence, snapshot_flag 52 ) 53 54 self._LOGGER.info(f"cadence passed to extended window: {derived_cadence}") 55 56 start_date = datetime.strptime(start_date_str, GABDefaults.DATE_FORMAT.value) 57 end_date = datetime.strptime(end_date_str, GABDefaults.DATE_FORMAT.value) 58 59 bucket_start_date, bucket_end_date = self.get_cadence_start_end_dates( 60 cadence, derived_cadence, start_date, end_date, query_type, current_date 61 ) 62 63 self._LOGGER.info(f"bucket dates: {bucket_start_date} - {bucket_end_date}") 64 65 filter_start_date, filter_end_date = self.get_cadence_start_end_dates( 66 cadence, 67 ( 68 reconciliation_cadence 69 if cad_order[cadence] < cad_order[reconciliation_cadence] 70 else cadence 71 ), 72 start_date, 73 end_date, 74 query_type, 75 current_date, 76 ) 77 78 self._LOGGER.info(f"filter dates: {filter_start_date} - {filter_end_date}") 79 80 return bucket_start_date, bucket_end_date, filter_start_date, filter_end_date 81 82 @classmethod 83 def _get_reconciliation_cadence( 84 cls, 85 cadence_order: dict, 86 rerun_flag: str, 87 cadence: str, 88 reconciliation_cadence: str, 89 snapshot_flag: str, 90 ) -> str: 91 """Get bigger cadence when rerun_flag or snapshot. 92 93 Args: 94 cadence_order: ordered cadences. 95 rerun_flag: flag indicating if it's a rerun or a normal run. 96 cadence: cadence to process. 97 reconciliation_cadence: reconciliation to process. 98 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 99 """ 100 derived_cadence = reconciliation_cadence 101 102 if rerun_flag == "Y": 103 if cadence_order[cadence] > cadence_order[reconciliation_cadence]: 104 derived_cadence = cadence 105 elif cadence_order[cadence] < cadence_order[reconciliation_cadence]: 106 derived_cadence = reconciliation_cadence 107 else: 108 if ( 109 cadence_order[cadence] > cadence_order[reconciliation_cadence] 110 and snapshot_flag == "Y" 111 ) or (cadence_order[cadence] < cadence_order[reconciliation_cadence]): 112 derived_cadence = reconciliation_cadence 113 elif ( 114 cadence_order[cadence] > cadence_order[reconciliation_cadence] 115 and snapshot_flag == "N" 116 ): 117 derived_cadence = cadence 118 119 return derived_cadence 120 121 def get_cadence_start_end_dates( 122 self, 123 cadence: str, 124 derived_cadence: str, 125 start_date: datetime, 126 end_date: datetime, 127 query_type: str, 128 current_date: datetime, 129 ) -> tuple[datetime, datetime]: 130 """Generate the new set of extended start and end dates based on the cadence. 131 132 Running week cadence again to extend to correct week start and end date in case 133 of recon window for Week cadence is present. 134 For end_date 2012-12-31,in case of Quarter Recon window present for Week 135 cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31. 136 But these are not start and end dates of week. Hence, to correct this, new dates 137 are passed again to get the correct dates. 138 139 Args: 140 cadence: cadence to process. 141 derived_cadence: cadence reconciliation to process. 142 start_date: start date of the period to process. 143 end_date: end date of the period to process. 144 query_type: use case query type. 145 current_date: current date to be used in the end date, in case the end date 146 is greater than current date so the end date should be the current date. 147 """ 148 new_start_date = self._get_cadence_calculated_date( 149 derived_cadence=derived_cadence, base_date=start_date, is_start=True 150 ) 151 new_end_date = self._get_cadence_calculated_date( 152 derived_cadence=derived_cadence, base_date=end_date, is_start=False 153 ) 154 155 if cadence.upper() == "WEEK": 156 new_start_date = ( 157 pendulum.datetime( 158 int(new_start_date.strftime("%Y")), 159 int(new_start_date.strftime("%m")), 160 int(new_start_date.strftime("%d")), 161 ) 162 .start_of("week") 163 .replace(tzinfo=None) 164 ) 165 new_end_date = ( 166 pendulum.datetime( 167 int(new_end_date.strftime("%Y")), 168 int(new_end_date.strftime("%m")), 169 int(new_end_date.strftime("%d")), 170 ) 171 .end_of("week") 172 .replace(hour=0, minute=0, second=0, microsecond=0) 173 .replace(tzinfo=None) 174 ) 175 176 new_end_date = new_end_date + timedelta(days=1) 177 178 if new_end_date >= current_date: 179 new_end_date = current_date 180 181 if query_type == "NAM": 182 new_end_date = new_end_date + timedelta(days=1) 183 184 return new_start_date, new_end_date 185 186 @classmethod 187 def _get_cadence_calculated_date( 188 cls, derived_cadence: str, base_date: datetime, is_start: bool 189 ) -> Union[datetime, DateTime]: # type: ignore 190 cadence_base_date = cls._get_cadence_base_date(derived_cadence, base_date) 191 cadence_date_calculated: Union[DateTime, datetime] 192 193 if derived_cadence.upper() == "WEEK": 194 cadence_date_calculated = cls._get_calculated_week_date( 195 cast(DateTime, cadence_base_date), is_start 196 ) 197 elif derived_cadence.upper() == "MONTH": 198 cadence_date_calculated = cls._get_calculated_month_date( 199 cast(datetime, cadence_base_date), is_start 200 ) 201 elif derived_cadence.upper() in ["QUARTER", "YEAR"]: 202 cadence_date_calculated = cls._get_calculated_quarter_or_year_date( 203 cast(DateTime, cadence_base_date), is_start, derived_cadence 204 ) 205 else: 206 cadence_date_calculated = cadence_base_date # type: ignore 207 208 return cadence_date_calculated # type: ignore 209 210 @classmethod 211 def _get_cadence_base_date( 212 cls, derived_cadence: str, base_date: datetime 213 ) -> Union[datetime, DateTime, str]: # type: ignore 214 """Get start date for the selected cadence. 215 216 Args: 217 derived_cadence: cadence reconciliation to process. 218 base_date: base date used to compute the start date of the cadence. 219 """ 220 if derived_cadence.upper() in ["DAY", "MONTH"]: 221 cadence_date_calculated = base_date 222 elif derived_cadence.upper() in ["WEEK", "QUARTER", "YEAR"]: 223 cadence_date_calculated = pendulum.datetime( 224 int(base_date.strftime("%Y")), 225 int(base_date.strftime("%m")), 226 int(base_date.strftime("%d")), 227 ) 228 else: 229 cadence_date_calculated = "0" # type: ignore 230 231 return cadence_date_calculated 232 233 @classmethod 234 def _get_calculated_week_date( 235 cls, cadence_date_calculated: DateTime, is_start: bool 236 ) -> DateTime: 237 """Get WEEK start/end date. 238 239 Args: 240 cadence_date_calculated: base date to compute the week date. 241 is_start: flag indicating if we should get the start or end for the cadence. 242 """ 243 if is_start: 244 cadence_date_calculated = cadence_date_calculated.start_of("week").replace( 245 tzinfo=None 246 ) 247 else: 248 cadence_date_calculated = ( 249 cadence_date_calculated.end_of("week") 250 .replace(hour=0, minute=0, second=0, microsecond=0) 251 .replace(tzinfo=None) 252 ) 253 254 return cadence_date_calculated 255 256 @classmethod 257 def _get_calculated_month_date( 258 cls, cadence_date_calculated: datetime, is_start: bool 259 ) -> datetime: 260 """Get MONTH start/end date. 261 262 Args: 263 cadence_date_calculated: base date to compute the month date. 264 is_start: flag indicating if we should get the start or end for the cadence. 265 """ 266 if is_start: 267 cadence_date_calculated = cadence_date_calculated - timedelta( 268 days=(int(cadence_date_calculated.strftime("%d")) - 1) 269 ) 270 else: 271 cadence_date_calculated = datetime( 272 int(cadence_date_calculated.strftime("%Y")), 273 int(cadence_date_calculated.strftime("%m")), 274 calendar.monthrange( 275 int(cadence_date_calculated.strftime("%Y")), 276 int(cadence_date_calculated.strftime("%m")), 277 )[1], 278 ) 279 280 return cadence_date_calculated 281 282 @classmethod 283 def _get_calculated_quarter_or_year_date( 284 cls, cadence_date_calculated: DateTime, is_start: bool, cadence: str 285 ) -> DateTime: 286 """Get QUARTER/YEAR start/end date. 287 288 Args: 289 cadence_date_calculated: base date to compute the quarter/year date. 290 is_start: flag indicating if we should get the start or end for the cadence. 291 cadence: selected cadence (possible values: QUARTER or YEAR). 292 """ 293 if is_start: 294 cadence_date_calculated = cadence_date_calculated.first_of( 295 cadence.lower() 296 ).replace(tzinfo=None) 297 else: 298 cadence_date_calculated = cadence_date_calculated.last_of( 299 cadence.lower() 300 ).replace(tzinfo=None) 301 302 return cadence_date_calculated 303 304 305class GABViewManager(object): 306 """Class to control the GAB View creation.""" 307 308 _LOGGER = LoggingHandler(__name__).get_logger() 309 310 def __init__( 311 self, 312 query_id: str, 313 lookup_query_builder: DataFrame, 314 target_database: str, 315 target_table: str, 316 ): 317 """Construct GABViewManager instances. 318 319 Args: 320 query_id: gab configuration table use case identifier. 321 lookup_query_builder: gab configuration data. 322 target_database: target database to write. 323 target_table: target table to write. 324 """ 325 self.query_id = query_id 326 self.lookup_query_builder = lookup_query_builder 327 self.target_database = target_database 328 self.target_table = target_table 329 330 def generate_use_case_views(self) -> None: 331 """Generate all the use case views. 332 333 Generates the DDLs for each of the views. This DDL is dynamically built based on 334 the mappings provided in the config table. 335 """ 336 reconciliation_window = GABUtils.get_json_column_as_dict( 337 self.lookup_query_builder, self.query_id, "recon_window" 338 ) 339 340 cadence_snapshot_status = self._get_cadence_snapshot_status( 341 reconciliation_window 342 ) 343 344 ( 345 cadences_with_snapshot, 346 cadences_without_snapshot, 347 ) = self._split_cadence_by_snapshot(cadence_snapshot_status) 348 349 mappings = GABUtils.get_json_column_as_dict( 350 self.lookup_query_builder, self.query_id, "mappings" 351 ) 352 353 for view_name in mappings.keys(): 354 self._generate_use_case_view( 355 mappings, 356 view_name, 357 cadence_snapshot_status, 358 cadences_with_snapshot, 359 cadences_without_snapshot, 360 self.target_database, 361 self.target_table, 362 self.query_id, 363 ) 364 365 @classmethod 366 def _generate_use_case_view( 367 cls, 368 mappings: dict, 369 view_name: str, 370 cadence_snapshot_status: dict, 371 cadences_with_snapshot: list[str], 372 cadences_without_snapshot: list[str], 373 target_database: str, 374 target_table: str, 375 query_id: str, 376 ) -> None: 377 """Generate the selected use case views. 378 379 Args: 380 mappings: use case mappings configuration. 381 view_name: name of the view to be generated. 382 cadence_snapshot_status: cadences to execute with the information if it has 383 snapshot. 384 cadences_with_snapshot: cadences to execute with snapshot. 385 cadences_without_snapshot: cadences to execute without snapshot. 386 target_database: target database to write. 387 target_table: target table to write. 388 query_id: gab configuration table use case identifier. 389 """ 390 view_configuration = mappings[view_name] 391 392 view_dimensions = view_configuration["dimensions"] 393 view_metrics = view_configuration["metric"] 394 custom_filter = view_configuration["filter"] 395 396 view_filter = " " 397 if custom_filter: 398 view_filter = " AND " + custom_filter 399 400 ( 401 dimensions, 402 dimensions_and_metrics, 403 dimensions_and_metrics_with_alias, 404 ) = cls._get_dimensions_and_metrics_from_use_case_view( 405 view_dimensions, view_metrics 406 ) 407 408 ( 409 final_cols, 410 final_calculated_script, 411 final_calculated_script_snapshot, 412 ) = cls._get_calculated_and_derived_metrics_from_use_case_view( 413 view_metrics, view_dimensions, cadence_snapshot_status 414 ) 415 416 GABViewGenerator( 417 cadence_snapshot_status=cadence_snapshot_status, 418 target_database=target_database, 419 view_name=view_name, 420 final_cols=final_cols, 421 target_table=target_table, 422 dimensions_and_metrics_with_alias=dimensions_and_metrics_with_alias, 423 dimensions=dimensions, 424 dimensions_and_metrics=dimensions_and_metrics, 425 final_calculated_script=final_calculated_script, 426 query_id=query_id, 427 view_filter=view_filter, 428 final_calculated_script_snapshot=final_calculated_script_snapshot, 429 without_snapshot_cadences=cadences_without_snapshot, 430 with_snapshot_cadences=cadences_with_snapshot, 431 ).generate_sql() 432 433 @classmethod 434 def _get_dimensions_and_metrics_from_use_case_view( 435 cls, view_dimensions: dict, view_metrics: dict 436 ) -> Tuple[str, str, str]: 437 """Get dimensions and metrics from use case. 438 439 Args: 440 view_dimensions: use case configured dimensions. 441 view_metrics: use case configured metrics. 442 """ 443 ( 444 extracted_dimensions_with_alias, 445 extracted_dimensions_without_alias, 446 ) = GABUtils.extract_columns_from_mapping( 447 columns=view_dimensions, 448 is_dimension=True, 449 extract_column_without_alias=True, 450 table_alias="a", 451 is_extracted_value_as_name=False, 452 ) 453 454 dimensions_without_default_columns = [ 455 extracted_dimension 456 for extracted_dimension in extracted_dimensions_without_alias 457 if extracted_dimension not in GABDefaults.DIMENSIONS_DEFAULT_COLUMNS.value 458 ] 459 460 dimensions = ",".join(dimensions_without_default_columns) 461 dimensions_with_alias = ",".join(extracted_dimensions_with_alias) 462 463 ( 464 extracted_metrics_with_alias, 465 extracted_metrics_without_alias, 466 ) = GABUtils.extract_columns_from_mapping( 467 columns=view_metrics, 468 is_dimension=False, 469 extract_column_without_alias=True, 470 table_alias="a", 471 is_extracted_value_as_name=False, 472 ) 473 metrics = ",".join(extracted_metrics_without_alias) 474 metrics_with_alias = ",".join(extracted_metrics_with_alias) 475 476 dimensions_and_metrics_with_alias = ( 477 dimensions_with_alias + "," + metrics_with_alias 478 ) 479 dimensions_and_metrics = dimensions + "," + metrics 480 481 return dimensions, dimensions_and_metrics, dimensions_and_metrics_with_alias 482 483 @classmethod 484 def _get_calculated_and_derived_metrics_from_use_case_view( 485 cls, view_metrics: dict, view_dimensions: dict, cadence_snapshot_status: dict 486 ) -> Tuple[str, str, str]: 487 """Get calculated and derived metrics from use case. 488 489 Args: 490 view_dimensions: use case configured dimensions. 491 view_metrics: use case configured metrics. 492 cadence_snapshot_status: cadences to execute with the information if it has 493 snapshot. 494 """ 495 calculated_script = [] 496 calculated_script_snapshot = [] 497 derived_script = [] 498 for metric_key, metric_value in view_metrics.items(): 499 ( 500 calculated_metrics_script, 501 calculated_metrics_script_snapshot, 502 derived_metrics_script, 503 ) = cls._get_calculated_metrics( 504 metric_key, metric_value, view_dimensions, cadence_snapshot_status 505 ) 506 calculated_script += [*calculated_metrics_script] 507 calculated_script_snapshot += [*calculated_metrics_script_snapshot] 508 derived_script += [*derived_metrics_script] 509 510 joined_calculated_script = cls._join_list_to_string_when_present( 511 calculated_script 512 ) 513 joined_calculated_script_snapshot = cls._join_list_to_string_when_present( 514 calculated_script_snapshot 515 ) 516 517 joined_derived = cls._join_list_to_string_when_present( 518 to_join=derived_script, starting_value="*,", default_value="*" 519 ) 520 521 return ( 522 joined_derived, 523 joined_calculated_script, 524 joined_calculated_script_snapshot, 525 ) 526 527 @classmethod 528 def _join_list_to_string_when_present( 529 cls, 530 to_join: list[str], 531 separator: str = ",", 532 starting_value: str = ",", 533 default_value: str = "", 534 ) -> str: 535 """Join list to string when has values, otherwise return the default value. 536 537 Args: 538 to_join: values to join. 539 separator: separator to be used in the join. 540 starting_value: value to be started before the join. 541 default_value: value to be returned if the list is empty. 542 """ 543 return starting_value + separator.join(to_join) if to_join else default_value 544 545 @classmethod 546 def _get_cadence_snapshot_status(cls, result: dict) -> dict: 547 cadence_snapshot_status = {} 548 for k, v in result.items(): 549 cadence_snapshot_status[k] = next( 550 ( 551 next( 552 ( 553 snap_list["snapshot"] 554 for snap_list in loop_outer_cad.values() 555 if snap_list["snapshot"] == "Y" 556 ), 557 "N", 558 ) 559 for loop_outer_cad in v.values() 560 if v 561 ), 562 "N", 563 ) 564 565 return cadence_snapshot_status 566 567 @classmethod 568 def _split_cadence_by_snapshot( 569 cls, cadence_snapshot_status: dict 570 ) -> tuple[list[str], list[str]]: 571 """Split cadences by the snapshot value. 572 573 Args: 574 cadence_snapshot_status: cadences to be split by snapshot status. 575 """ 576 with_snapshot_cadences = [] 577 without_snapshot_cadences = [] 578 579 for key_snap_status, value_snap_status in cadence_snapshot_status.items(): 580 if value_snap_status == "Y": 581 with_snapshot_cadences.append(key_snap_status) 582 else: 583 without_snapshot_cadences.append(key_snap_status) 584 585 return with_snapshot_cadences, without_snapshot_cadences 586 587 @classmethod 588 def _get_calculated_metrics( 589 cls, 590 metric_key: str, 591 metric_value: dict, 592 view_dimensions: dict, 593 cadence_snapshot_status: dict, 594 ) -> tuple[list[str], list[str], list[str]]: 595 """Get calculated metrics from use case. 596 597 Args: 598 metric_key: use case metric name. 599 metric_value: use case metric value. 600 view_dimensions: use case configured dimensions. 601 cadence_snapshot_status: cadences to execute with the information if it has 602 snapshot. 603 """ 604 dim_partition = ",".join([str(i) for i in view_dimensions.keys()][2:]) 605 dim_partition = "cadence," + dim_partition 606 calculated_metrics = metric_value["calculated_metric"] 607 derived_metrics = metric_value["derived_metric"] 608 calculated_metrics_script: list[str] = [] 609 calculated_metrics_script_snapshot: list[str] = [] 610 derived_metrics_script: list[str] = [] 611 612 if calculated_metrics: 613 ( 614 calculated_metrics_script, 615 calculated_metrics_script_snapshot, 616 ) = cls._get_calculated_metric( 617 metric_key, calculated_metrics, dim_partition, cadence_snapshot_status 618 ) 619 620 if derived_metrics: 621 derived_metrics_script = cls._get_derived_metrics(derived_metrics) 622 623 return ( 624 calculated_metrics_script, 625 calculated_metrics_script_snapshot, 626 derived_metrics_script, 627 ) 628 629 @classmethod 630 def _get_derived_metrics(cls, derived_metric: dict) -> list[str]: 631 """Get derived metrics from use case. 632 633 Args: 634 derived_metric: use case derived metrics. 635 """ 636 derived_metric_script = [] 637 638 for i in range(0, len(derived_metric)): 639 derived_formula = str(derived_metric[i]["formula"]) 640 derived_label = derived_metric[i]["label"] 641 derived_metric_script.append(derived_formula + " AS " + derived_label) 642 643 return derived_metric_script 644 645 @classmethod 646 def _get_calculated_metric( 647 cls, 648 metric_key: str, 649 calculated_metric: dict, 650 dimension_partition: str, 651 cadence_snapshot_status: dict, 652 ) -> tuple[list[str], list[str]]: 653 """Get calculated metrics from use case. 654 655 Args: 656 metric_key: use case metric name. 657 calculated_metric: use case calculated metrics. 658 dimension_partition: dimension partition. 659 cadence_snapshot_status: cadences to execute with the information if it has 660 snapshot. 661 """ 662 last_cadence_script: list[str] = [] 663 last_year_cadence_script: list[str] = [] 664 window_script: list[str] = [] 665 last_cadence_script_snapshot: list[str] = [] 666 last_year_cadence_script_snapshot: list[str] = [] 667 window_script_snapshot: list[str] = [] 668 669 if "last_cadence" in calculated_metric: 670 ( 671 last_cadence_script, 672 last_cadence_script_snapshot, 673 ) = cls._get_cadence_calculated_metric( 674 metric_key, 675 dimension_partition, 676 calculated_metric, 677 cadence_snapshot_status, 678 "last_cadence", 679 ) 680 if "last_year_cadence" in calculated_metric: 681 ( 682 last_year_cadence_script, 683 last_year_cadence_script_snapshot, 684 ) = cls._get_cadence_calculated_metric( 685 metric_key, 686 dimension_partition, 687 calculated_metric, 688 cadence_snapshot_status, 689 "last_year_cadence", 690 ) 691 if "window_function" in calculated_metric: 692 window_script, window_script_snapshot = cls._get_window_calculated_metric( 693 metric_key, 694 dimension_partition, 695 calculated_metric, 696 cadence_snapshot_status, 697 ) 698 699 calculated_script = [ 700 *last_cadence_script, 701 *last_year_cadence_script, 702 *window_script, 703 ] 704 calculated_script_snapshot = [ 705 *last_cadence_script_snapshot, 706 *last_year_cadence_script_snapshot, 707 *window_script_snapshot, 708 ] 709 710 return calculated_script, calculated_script_snapshot 711 712 @classmethod 713 def _get_window_calculated_metric( 714 cls, 715 metric_key: str, 716 dimension_partition: str, 717 calculated_metric: dict, 718 cadence_snapshot_status: dict, 719 ) -> tuple[list, list]: 720 """Get window calculated metrics from use case. 721 722 Args: 723 metric_key: use case metric name. 724 dimension_partition: dimension partition. 725 calculated_metric: use case calculated metrics. 726 cadence_snapshot_status: cadences to execute with the information if it has 727 snapshot. 728 """ 729 calculated_script = [] 730 calculated_script_snapshot = [] 731 732 for i in range(0, len(calculated_metric["window_function"])): 733 window_function = calculated_metric["window_function"][i]["agg_func"] 734 window_function_start = calculated_metric["window_function"][i]["window"][0] 735 window_function_end = calculated_metric["window_function"][i]["window"][1] 736 window_label = calculated_metric["window_function"][i]["label"] 737 738 calculated_script.append( 739 f""" 740 NVL( 741 {window_function}({metric_key}) OVER 742 ( 743 PARTITION BY {dimension_partition} 744 order by from_date ROWS BETWEEN 745 {str(window_function_start)} PRECEDING 746 AND {str(window_function_end)} PRECEDING 747 ), 748 0 749 ) AS 750 {window_label} 751 """ 752 ) 753 754 if "Y" in cadence_snapshot_status.values(): 755 calculated_script_snapshot.append( 756 f""" 757 NVL( 758 {window_function}({metric_key}) OVER 759 ( 760 PARTITION BY {dimension_partition} ,rn 761 order by from_date ROWS BETWEEN 762 {str(window_function_start)} PRECEDING 763 AND {str(window_function_end)} PRECEDING 764 ), 765 0 766 ) AS 767 {window_label} 768 """ 769 ) 770 771 return calculated_script, calculated_script_snapshot 772 773 @classmethod 774 def _get_cadence_calculated_metric( 775 cls, 776 metric_key: str, 777 dimension_partition: str, 778 calculated_metric: dict, 779 cadence_snapshot_status: dict, 780 cadence: str, 781 ) -> tuple[list, list]: 782 """Get cadence calculated metrics from use case. 783 784 Args: 785 metric_key: use case metric name. 786 calculated_metric: use case calculated metrics. 787 dimension_partition: dimension partition. 788 cadence_snapshot_status: cadences to execute with the information if it has 789 snapshot. 790 cadence: cadence to process. 791 """ 792 calculated_script = [] 793 calculated_script_snapshot = [] 794 795 for i in range(0, len(calculated_metric[cadence])): 796 cadence_lag = cls._get_cadence_item_lag(calculated_metric, cadence, i) 797 cadence_label = calculated_metric[cadence][i]["label"] 798 799 calculated_script.append( 800 cls._get_cadence_lag_statement( 801 metric_key, 802 cadence_lag, 803 dimension_partition, 804 cadence_label, 805 snapshot=False, 806 cadence=cadence, 807 ) 808 ) 809 810 if "Y" in cadence_snapshot_status.values(): 811 calculated_script_snapshot.append( 812 cls._get_cadence_lag_statement( 813 metric_key, 814 cadence_lag, 815 dimension_partition, 816 cadence_label, 817 snapshot=True, 818 cadence=cadence, 819 ) 820 ) 821 822 return calculated_script, calculated_script_snapshot 823 824 @classmethod 825 def _get_cadence_item_lag( 826 cls, calculated_metric: dict, cadence: str, item: int 827 ) -> str: 828 """Get calculated metric item lag. 829 830 Args: 831 calculated_metric: use case calculated metrics. 832 cadence: cadence to process. 833 item: metric item. 834 """ 835 return str(calculated_metric[cadence][item]["window"]) 836 837 @classmethod 838 def _get_cadence_lag_statement( 839 cls, 840 metric_key: str, 841 cadence_lag: str, 842 dimension_partition: str, 843 cadence_label: str, 844 snapshot: bool, 845 cadence: str, 846 ) -> str: 847 """Get cadence lag statement. 848 849 Args: 850 metric_key: use case metric name. 851 cadence_lag: cadence window lag. 852 dimension_partition: dimension partition. 853 cadence_label: cadence name. 854 snapshot: indicate if the snapshot is enabled. 855 cadence: cadence to process. 856 """ 857 cadence_lag_statement = "" 858 if cadence == "last_cadence": 859 cadence_lag_statement = ( 860 "NVL(LAG(" 861 + metric_key 862 + "," 863 + cadence_lag 864 + ") OVER(PARTITION BY " 865 + dimension_partition 866 + (",rn" if snapshot else "") 867 + " order by from_date),0) AS " 868 + cadence_label 869 ) 870 elif cadence == "last_year_cadence": 871 cadence_lag_statement = ( 872 "NVL(LAG(" 873 + metric_key 874 + "," 875 + cadence_lag 876 + ") OVER(PARTITION BY " 877 + dimension_partition 878 + (",rn" if snapshot else "") 879 + """, 880 case 881 when cadence in ('DAY','MONTH','QUARTER') 882 then struct(month(from_date), day(from_date)) 883 when cadence in('WEEK') 884 then struct(weekofyear(from_date+1),1) 885 end order by from_date),0) AS """ 886 + cadence_label 887 ) 888 else: 889 cls._LOGGER.error(f"Cadence {cadence} not implemented yet") 890 891 return cadence_lag_statement
18class GABCadenceManager(object): 19 """Class to control the GAB Cadence Window.""" 20 21 _LOGGER = LoggingHandler(__name__).get_logger() 22 23 def extended_window_calculator( 24 self, 25 cadence: str, 26 reconciliation_cadence: str, 27 current_date: datetime, 28 start_date_str: str, 29 end_date_str: str, 30 query_type: str, 31 rerun_flag: str, 32 snapshot_flag: str, 33 ) -> tuple[datetime, datetime, datetime, datetime]: 34 """extended_window_calculator function. 35 36 Calculates the extended window of any cadence despite the user providing 37 custom dates which are not the exact start and end dates of a cadence. 38 39 Args: 40 cadence: cadence to process 41 reconciliation_cadence: reconciliation to process. 42 current_date: current date. 43 start_date_str: start date of the period to process. 44 end_date_str: end date of the period to process. 45 query_type: use case query type. 46 rerun_flag: flag indicating if it's a rerun or a normal run. 47 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 48 """ 49 cad_order = GABCadence.get_ordered_cadences() 50 51 derived_cadence = self._get_reconciliation_cadence( 52 cad_order, rerun_flag, cadence, reconciliation_cadence, snapshot_flag 53 ) 54 55 self._LOGGER.info(f"cadence passed to extended window: {derived_cadence}") 56 57 start_date = datetime.strptime(start_date_str, GABDefaults.DATE_FORMAT.value) 58 end_date = datetime.strptime(end_date_str, GABDefaults.DATE_FORMAT.value) 59 60 bucket_start_date, bucket_end_date = self.get_cadence_start_end_dates( 61 cadence, derived_cadence, start_date, end_date, query_type, current_date 62 ) 63 64 self._LOGGER.info(f"bucket dates: {bucket_start_date} - {bucket_end_date}") 65 66 filter_start_date, filter_end_date = self.get_cadence_start_end_dates( 67 cadence, 68 ( 69 reconciliation_cadence 70 if cad_order[cadence] < cad_order[reconciliation_cadence] 71 else cadence 72 ), 73 start_date, 74 end_date, 75 query_type, 76 current_date, 77 ) 78 79 self._LOGGER.info(f"filter dates: {filter_start_date} - {filter_end_date}") 80 81 return bucket_start_date, bucket_end_date, filter_start_date, filter_end_date 82 83 @classmethod 84 def _get_reconciliation_cadence( 85 cls, 86 cadence_order: dict, 87 rerun_flag: str, 88 cadence: str, 89 reconciliation_cadence: str, 90 snapshot_flag: str, 91 ) -> str: 92 """Get bigger cadence when rerun_flag or snapshot. 93 94 Args: 95 cadence_order: ordered cadences. 96 rerun_flag: flag indicating if it's a rerun or a normal run. 97 cadence: cadence to process. 98 reconciliation_cadence: reconciliation to process. 99 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 100 """ 101 derived_cadence = reconciliation_cadence 102 103 if rerun_flag == "Y": 104 if cadence_order[cadence] > cadence_order[reconciliation_cadence]: 105 derived_cadence = cadence 106 elif cadence_order[cadence] < cadence_order[reconciliation_cadence]: 107 derived_cadence = reconciliation_cadence 108 else: 109 if ( 110 cadence_order[cadence] > cadence_order[reconciliation_cadence] 111 and snapshot_flag == "Y" 112 ) or (cadence_order[cadence] < cadence_order[reconciliation_cadence]): 113 derived_cadence = reconciliation_cadence 114 elif ( 115 cadence_order[cadence] > cadence_order[reconciliation_cadence] 116 and snapshot_flag == "N" 117 ): 118 derived_cadence = cadence 119 120 return derived_cadence 121 122 def get_cadence_start_end_dates( 123 self, 124 cadence: str, 125 derived_cadence: str, 126 start_date: datetime, 127 end_date: datetime, 128 query_type: str, 129 current_date: datetime, 130 ) -> tuple[datetime, datetime]: 131 """Generate the new set of extended start and end dates based on the cadence. 132 133 Running week cadence again to extend to correct week start and end date in case 134 of recon window for Week cadence is present. 135 For end_date 2012-12-31,in case of Quarter Recon window present for Week 136 cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31. 137 But these are not start and end dates of week. Hence, to correct this, new dates 138 are passed again to get the correct dates. 139 140 Args: 141 cadence: cadence to process. 142 derived_cadence: cadence reconciliation to process. 143 start_date: start date of the period to process. 144 end_date: end date of the period to process. 145 query_type: use case query type. 146 current_date: current date to be used in the end date, in case the end date 147 is greater than current date so the end date should be the current date. 148 """ 149 new_start_date = self._get_cadence_calculated_date( 150 derived_cadence=derived_cadence, base_date=start_date, is_start=True 151 ) 152 new_end_date = self._get_cadence_calculated_date( 153 derived_cadence=derived_cadence, base_date=end_date, is_start=False 154 ) 155 156 if cadence.upper() == "WEEK": 157 new_start_date = ( 158 pendulum.datetime( 159 int(new_start_date.strftime("%Y")), 160 int(new_start_date.strftime("%m")), 161 int(new_start_date.strftime("%d")), 162 ) 163 .start_of("week") 164 .replace(tzinfo=None) 165 ) 166 new_end_date = ( 167 pendulum.datetime( 168 int(new_end_date.strftime("%Y")), 169 int(new_end_date.strftime("%m")), 170 int(new_end_date.strftime("%d")), 171 ) 172 .end_of("week") 173 .replace(hour=0, minute=0, second=0, microsecond=0) 174 .replace(tzinfo=None) 175 ) 176 177 new_end_date = new_end_date + timedelta(days=1) 178 179 if new_end_date >= current_date: 180 new_end_date = current_date 181 182 if query_type == "NAM": 183 new_end_date = new_end_date + timedelta(days=1) 184 185 return new_start_date, new_end_date 186 187 @classmethod 188 def _get_cadence_calculated_date( 189 cls, derived_cadence: str, base_date: datetime, is_start: bool 190 ) -> Union[datetime, DateTime]: # type: ignore 191 cadence_base_date = cls._get_cadence_base_date(derived_cadence, base_date) 192 cadence_date_calculated: Union[DateTime, datetime] 193 194 if derived_cadence.upper() == "WEEK": 195 cadence_date_calculated = cls._get_calculated_week_date( 196 cast(DateTime, cadence_base_date), is_start 197 ) 198 elif derived_cadence.upper() == "MONTH": 199 cadence_date_calculated = cls._get_calculated_month_date( 200 cast(datetime, cadence_base_date), is_start 201 ) 202 elif derived_cadence.upper() in ["QUARTER", "YEAR"]: 203 cadence_date_calculated = cls._get_calculated_quarter_or_year_date( 204 cast(DateTime, cadence_base_date), is_start, derived_cadence 205 ) 206 else: 207 cadence_date_calculated = cadence_base_date # type: ignore 208 209 return cadence_date_calculated # type: ignore 210 211 @classmethod 212 def _get_cadence_base_date( 213 cls, derived_cadence: str, base_date: datetime 214 ) -> Union[datetime, DateTime, str]: # type: ignore 215 """Get start date for the selected cadence. 216 217 Args: 218 derived_cadence: cadence reconciliation to process. 219 base_date: base date used to compute the start date of the cadence. 220 """ 221 if derived_cadence.upper() in ["DAY", "MONTH"]: 222 cadence_date_calculated = base_date 223 elif derived_cadence.upper() in ["WEEK", "QUARTER", "YEAR"]: 224 cadence_date_calculated = pendulum.datetime( 225 int(base_date.strftime("%Y")), 226 int(base_date.strftime("%m")), 227 int(base_date.strftime("%d")), 228 ) 229 else: 230 cadence_date_calculated = "0" # type: ignore 231 232 return cadence_date_calculated 233 234 @classmethod 235 def _get_calculated_week_date( 236 cls, cadence_date_calculated: DateTime, is_start: bool 237 ) -> DateTime: 238 """Get WEEK start/end date. 239 240 Args: 241 cadence_date_calculated: base date to compute the week date. 242 is_start: flag indicating if we should get the start or end for the cadence. 243 """ 244 if is_start: 245 cadence_date_calculated = cadence_date_calculated.start_of("week").replace( 246 tzinfo=None 247 ) 248 else: 249 cadence_date_calculated = ( 250 cadence_date_calculated.end_of("week") 251 .replace(hour=0, minute=0, second=0, microsecond=0) 252 .replace(tzinfo=None) 253 ) 254 255 return cadence_date_calculated 256 257 @classmethod 258 def _get_calculated_month_date( 259 cls, cadence_date_calculated: datetime, is_start: bool 260 ) -> datetime: 261 """Get MONTH start/end date. 262 263 Args: 264 cadence_date_calculated: base date to compute the month date. 265 is_start: flag indicating if we should get the start or end for the cadence. 266 """ 267 if is_start: 268 cadence_date_calculated = cadence_date_calculated - timedelta( 269 days=(int(cadence_date_calculated.strftime("%d")) - 1) 270 ) 271 else: 272 cadence_date_calculated = datetime( 273 int(cadence_date_calculated.strftime("%Y")), 274 int(cadence_date_calculated.strftime("%m")), 275 calendar.monthrange( 276 int(cadence_date_calculated.strftime("%Y")), 277 int(cadence_date_calculated.strftime("%m")), 278 )[1], 279 ) 280 281 return cadence_date_calculated 282 283 @classmethod 284 def _get_calculated_quarter_or_year_date( 285 cls, cadence_date_calculated: DateTime, is_start: bool, cadence: str 286 ) -> DateTime: 287 """Get QUARTER/YEAR start/end date. 288 289 Args: 290 cadence_date_calculated: base date to compute the quarter/year date. 291 is_start: flag indicating if we should get the start or end for the cadence. 292 cadence: selected cadence (possible values: QUARTER or YEAR). 293 """ 294 if is_start: 295 cadence_date_calculated = cadence_date_calculated.first_of( 296 cadence.lower() 297 ).replace(tzinfo=None) 298 else: 299 cadence_date_calculated = cadence_date_calculated.last_of( 300 cadence.lower() 301 ).replace(tzinfo=None) 302 303 return cadence_date_calculated
Class to control the GAB Cadence Window.
23 def extended_window_calculator( 24 self, 25 cadence: str, 26 reconciliation_cadence: str, 27 current_date: datetime, 28 start_date_str: str, 29 end_date_str: str, 30 query_type: str, 31 rerun_flag: str, 32 snapshot_flag: str, 33 ) -> tuple[datetime, datetime, datetime, datetime]: 34 """extended_window_calculator function. 35 36 Calculates the extended window of any cadence despite the user providing 37 custom dates which are not the exact start and end dates of a cadence. 38 39 Args: 40 cadence: cadence to process 41 reconciliation_cadence: reconciliation to process. 42 current_date: current date. 43 start_date_str: start date of the period to process. 44 end_date_str: end date of the period to process. 45 query_type: use case query type. 46 rerun_flag: flag indicating if it's a rerun or a normal run. 47 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 48 """ 49 cad_order = GABCadence.get_ordered_cadences() 50 51 derived_cadence = self._get_reconciliation_cadence( 52 cad_order, rerun_flag, cadence, reconciliation_cadence, snapshot_flag 53 ) 54 55 self._LOGGER.info(f"cadence passed to extended window: {derived_cadence}") 56 57 start_date = datetime.strptime(start_date_str, GABDefaults.DATE_FORMAT.value) 58 end_date = datetime.strptime(end_date_str, GABDefaults.DATE_FORMAT.value) 59 60 bucket_start_date, bucket_end_date = self.get_cadence_start_end_dates( 61 cadence, derived_cadence, start_date, end_date, query_type, current_date 62 ) 63 64 self._LOGGER.info(f"bucket dates: {bucket_start_date} - {bucket_end_date}") 65 66 filter_start_date, filter_end_date = self.get_cadence_start_end_dates( 67 cadence, 68 ( 69 reconciliation_cadence 70 if cad_order[cadence] < cad_order[reconciliation_cadence] 71 else cadence 72 ), 73 start_date, 74 end_date, 75 query_type, 76 current_date, 77 ) 78 79 self._LOGGER.info(f"filter dates: {filter_start_date} - {filter_end_date}") 80 81 return bucket_start_date, bucket_end_date, filter_start_date, filter_end_date
extended_window_calculator function.
Calculates the extended window of any cadence despite the user providing custom dates which are not the exact start and end dates of a cadence.
Arguments:
- cadence: cadence to process
- reconciliation_cadence: reconciliation to process.
- current_date: current date.
- start_date_str: start date of the period to process.
- end_date_str: end date of the period to process.
- query_type: use case query type.
- rerun_flag: flag indicating if it's a rerun or a normal run.
- snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
122 def get_cadence_start_end_dates( 123 self, 124 cadence: str, 125 derived_cadence: str, 126 start_date: datetime, 127 end_date: datetime, 128 query_type: str, 129 current_date: datetime, 130 ) -> tuple[datetime, datetime]: 131 """Generate the new set of extended start and end dates based on the cadence. 132 133 Running week cadence again to extend to correct week start and end date in case 134 of recon window for Week cadence is present. 135 For end_date 2012-12-31,in case of Quarter Recon window present for Week 136 cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31. 137 But these are not start and end dates of week. Hence, to correct this, new dates 138 are passed again to get the correct dates. 139 140 Args: 141 cadence: cadence to process. 142 derived_cadence: cadence reconciliation to process. 143 start_date: start date of the period to process. 144 end_date: end date of the period to process. 145 query_type: use case query type. 146 current_date: current date to be used in the end date, in case the end date 147 is greater than current date so the end date should be the current date. 148 """ 149 new_start_date = self._get_cadence_calculated_date( 150 derived_cadence=derived_cadence, base_date=start_date, is_start=True 151 ) 152 new_end_date = self._get_cadence_calculated_date( 153 derived_cadence=derived_cadence, base_date=end_date, is_start=False 154 ) 155 156 if cadence.upper() == "WEEK": 157 new_start_date = ( 158 pendulum.datetime( 159 int(new_start_date.strftime("%Y")), 160 int(new_start_date.strftime("%m")), 161 int(new_start_date.strftime("%d")), 162 ) 163 .start_of("week") 164 .replace(tzinfo=None) 165 ) 166 new_end_date = ( 167 pendulum.datetime( 168 int(new_end_date.strftime("%Y")), 169 int(new_end_date.strftime("%m")), 170 int(new_end_date.strftime("%d")), 171 ) 172 .end_of("week") 173 .replace(hour=0, minute=0, second=0, microsecond=0) 174 .replace(tzinfo=None) 175 ) 176 177 new_end_date = new_end_date + timedelta(days=1) 178 179 if new_end_date >= current_date: 180 new_end_date = current_date 181 182 if query_type == "NAM": 183 new_end_date = new_end_date + timedelta(days=1) 184 185 return new_start_date, new_end_date
Generate the new set of extended start and end dates based on the cadence.
Running week cadence again to extend to correct week start and end date in case of recon window for Week cadence is present. For end_date 2012-12-31,in case of Quarter Recon window present for Week cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31. But these are not start and end dates of week. Hence, to correct this, new dates are passed again to get the correct dates.
Arguments:
- cadence: cadence to process.
- derived_cadence: cadence reconciliation to process.
- start_date: start date of the period to process.
- end_date: end date of the period to process.
- query_type: use case query type.
- current_date: current date to be used in the end date, in case the end date is greater than current date so the end date should be the current date.
306class GABViewManager(object): 307 """Class to control the GAB View creation.""" 308 309 _LOGGER = LoggingHandler(__name__).get_logger() 310 311 def __init__( 312 self, 313 query_id: str, 314 lookup_query_builder: DataFrame, 315 target_database: str, 316 target_table: str, 317 ): 318 """Construct GABViewManager instances. 319 320 Args: 321 query_id: gab configuration table use case identifier. 322 lookup_query_builder: gab configuration data. 323 target_database: target database to write. 324 target_table: target table to write. 325 """ 326 self.query_id = query_id 327 self.lookup_query_builder = lookup_query_builder 328 self.target_database = target_database 329 self.target_table = target_table 330 331 def generate_use_case_views(self) -> None: 332 """Generate all the use case views. 333 334 Generates the DDLs for each of the views. This DDL is dynamically built based on 335 the mappings provided in the config table. 336 """ 337 reconciliation_window = GABUtils.get_json_column_as_dict( 338 self.lookup_query_builder, self.query_id, "recon_window" 339 ) 340 341 cadence_snapshot_status = self._get_cadence_snapshot_status( 342 reconciliation_window 343 ) 344 345 ( 346 cadences_with_snapshot, 347 cadences_without_snapshot, 348 ) = self._split_cadence_by_snapshot(cadence_snapshot_status) 349 350 mappings = GABUtils.get_json_column_as_dict( 351 self.lookup_query_builder, self.query_id, "mappings" 352 ) 353 354 for view_name in mappings.keys(): 355 self._generate_use_case_view( 356 mappings, 357 view_name, 358 cadence_snapshot_status, 359 cadences_with_snapshot, 360 cadences_without_snapshot, 361 self.target_database, 362 self.target_table, 363 self.query_id, 364 ) 365 366 @classmethod 367 def _generate_use_case_view( 368 cls, 369 mappings: dict, 370 view_name: str, 371 cadence_snapshot_status: dict, 372 cadences_with_snapshot: list[str], 373 cadences_without_snapshot: list[str], 374 target_database: str, 375 target_table: str, 376 query_id: str, 377 ) -> None: 378 """Generate the selected use case views. 379 380 Args: 381 mappings: use case mappings configuration. 382 view_name: name of the view to be generated. 383 cadence_snapshot_status: cadences to execute with the information if it has 384 snapshot. 385 cadences_with_snapshot: cadences to execute with snapshot. 386 cadences_without_snapshot: cadences to execute without snapshot. 387 target_database: target database to write. 388 target_table: target table to write. 389 query_id: gab configuration table use case identifier. 390 """ 391 view_configuration = mappings[view_name] 392 393 view_dimensions = view_configuration["dimensions"] 394 view_metrics = view_configuration["metric"] 395 custom_filter = view_configuration["filter"] 396 397 view_filter = " " 398 if custom_filter: 399 view_filter = " AND " + custom_filter 400 401 ( 402 dimensions, 403 dimensions_and_metrics, 404 dimensions_and_metrics_with_alias, 405 ) = cls._get_dimensions_and_metrics_from_use_case_view( 406 view_dimensions, view_metrics 407 ) 408 409 ( 410 final_cols, 411 final_calculated_script, 412 final_calculated_script_snapshot, 413 ) = cls._get_calculated_and_derived_metrics_from_use_case_view( 414 view_metrics, view_dimensions, cadence_snapshot_status 415 ) 416 417 GABViewGenerator( 418 cadence_snapshot_status=cadence_snapshot_status, 419 target_database=target_database, 420 view_name=view_name, 421 final_cols=final_cols, 422 target_table=target_table, 423 dimensions_and_metrics_with_alias=dimensions_and_metrics_with_alias, 424 dimensions=dimensions, 425 dimensions_and_metrics=dimensions_and_metrics, 426 final_calculated_script=final_calculated_script, 427 query_id=query_id, 428 view_filter=view_filter, 429 final_calculated_script_snapshot=final_calculated_script_snapshot, 430 without_snapshot_cadences=cadences_without_snapshot, 431 with_snapshot_cadences=cadences_with_snapshot, 432 ).generate_sql() 433 434 @classmethod 435 def _get_dimensions_and_metrics_from_use_case_view( 436 cls, view_dimensions: dict, view_metrics: dict 437 ) -> Tuple[str, str, str]: 438 """Get dimensions and metrics from use case. 439 440 Args: 441 view_dimensions: use case configured dimensions. 442 view_metrics: use case configured metrics. 443 """ 444 ( 445 extracted_dimensions_with_alias, 446 extracted_dimensions_without_alias, 447 ) = GABUtils.extract_columns_from_mapping( 448 columns=view_dimensions, 449 is_dimension=True, 450 extract_column_without_alias=True, 451 table_alias="a", 452 is_extracted_value_as_name=False, 453 ) 454 455 dimensions_without_default_columns = [ 456 extracted_dimension 457 for extracted_dimension in extracted_dimensions_without_alias 458 if extracted_dimension not in GABDefaults.DIMENSIONS_DEFAULT_COLUMNS.value 459 ] 460 461 dimensions = ",".join(dimensions_without_default_columns) 462 dimensions_with_alias = ",".join(extracted_dimensions_with_alias) 463 464 ( 465 extracted_metrics_with_alias, 466 extracted_metrics_without_alias, 467 ) = GABUtils.extract_columns_from_mapping( 468 columns=view_metrics, 469 is_dimension=False, 470 extract_column_without_alias=True, 471 table_alias="a", 472 is_extracted_value_as_name=False, 473 ) 474 metrics = ",".join(extracted_metrics_without_alias) 475 metrics_with_alias = ",".join(extracted_metrics_with_alias) 476 477 dimensions_and_metrics_with_alias = ( 478 dimensions_with_alias + "," + metrics_with_alias 479 ) 480 dimensions_and_metrics = dimensions + "," + metrics 481 482 return dimensions, dimensions_and_metrics, dimensions_and_metrics_with_alias 483 484 @classmethod 485 def _get_calculated_and_derived_metrics_from_use_case_view( 486 cls, view_metrics: dict, view_dimensions: dict, cadence_snapshot_status: dict 487 ) -> Tuple[str, str, str]: 488 """Get calculated and derived metrics from use case. 489 490 Args: 491 view_dimensions: use case configured dimensions. 492 view_metrics: use case configured metrics. 493 cadence_snapshot_status: cadences to execute with the information if it has 494 snapshot. 495 """ 496 calculated_script = [] 497 calculated_script_snapshot = [] 498 derived_script = [] 499 for metric_key, metric_value in view_metrics.items(): 500 ( 501 calculated_metrics_script, 502 calculated_metrics_script_snapshot, 503 derived_metrics_script, 504 ) = cls._get_calculated_metrics( 505 metric_key, metric_value, view_dimensions, cadence_snapshot_status 506 ) 507 calculated_script += [*calculated_metrics_script] 508 calculated_script_snapshot += [*calculated_metrics_script_snapshot] 509 derived_script += [*derived_metrics_script] 510 511 joined_calculated_script = cls._join_list_to_string_when_present( 512 calculated_script 513 ) 514 joined_calculated_script_snapshot = cls._join_list_to_string_when_present( 515 calculated_script_snapshot 516 ) 517 518 joined_derived = cls._join_list_to_string_when_present( 519 to_join=derived_script, starting_value="*,", default_value="*" 520 ) 521 522 return ( 523 joined_derived, 524 joined_calculated_script, 525 joined_calculated_script_snapshot, 526 ) 527 528 @classmethod 529 def _join_list_to_string_when_present( 530 cls, 531 to_join: list[str], 532 separator: str = ",", 533 starting_value: str = ",", 534 default_value: str = "", 535 ) -> str: 536 """Join list to string when has values, otherwise return the default value. 537 538 Args: 539 to_join: values to join. 540 separator: separator to be used in the join. 541 starting_value: value to be started before the join. 542 default_value: value to be returned if the list is empty. 543 """ 544 return starting_value + separator.join(to_join) if to_join else default_value 545 546 @classmethod 547 def _get_cadence_snapshot_status(cls, result: dict) -> dict: 548 cadence_snapshot_status = {} 549 for k, v in result.items(): 550 cadence_snapshot_status[k] = next( 551 ( 552 next( 553 ( 554 snap_list["snapshot"] 555 for snap_list in loop_outer_cad.values() 556 if snap_list["snapshot"] == "Y" 557 ), 558 "N", 559 ) 560 for loop_outer_cad in v.values() 561 if v 562 ), 563 "N", 564 ) 565 566 return cadence_snapshot_status 567 568 @classmethod 569 def _split_cadence_by_snapshot( 570 cls, cadence_snapshot_status: dict 571 ) -> tuple[list[str], list[str]]: 572 """Split cadences by the snapshot value. 573 574 Args: 575 cadence_snapshot_status: cadences to be split by snapshot status. 576 """ 577 with_snapshot_cadences = [] 578 without_snapshot_cadences = [] 579 580 for key_snap_status, value_snap_status in cadence_snapshot_status.items(): 581 if value_snap_status == "Y": 582 with_snapshot_cadences.append(key_snap_status) 583 else: 584 without_snapshot_cadences.append(key_snap_status) 585 586 return with_snapshot_cadences, without_snapshot_cadences 587 588 @classmethod 589 def _get_calculated_metrics( 590 cls, 591 metric_key: str, 592 metric_value: dict, 593 view_dimensions: dict, 594 cadence_snapshot_status: dict, 595 ) -> tuple[list[str], list[str], list[str]]: 596 """Get calculated metrics from use case. 597 598 Args: 599 metric_key: use case metric name. 600 metric_value: use case metric value. 601 view_dimensions: use case configured dimensions. 602 cadence_snapshot_status: cadences to execute with the information if it has 603 snapshot. 604 """ 605 dim_partition = ",".join([str(i) for i in view_dimensions.keys()][2:]) 606 dim_partition = "cadence," + dim_partition 607 calculated_metrics = metric_value["calculated_metric"] 608 derived_metrics = metric_value["derived_metric"] 609 calculated_metrics_script: list[str] = [] 610 calculated_metrics_script_snapshot: list[str] = [] 611 derived_metrics_script: list[str] = [] 612 613 if calculated_metrics: 614 ( 615 calculated_metrics_script, 616 calculated_metrics_script_snapshot, 617 ) = cls._get_calculated_metric( 618 metric_key, calculated_metrics, dim_partition, cadence_snapshot_status 619 ) 620 621 if derived_metrics: 622 derived_metrics_script = cls._get_derived_metrics(derived_metrics) 623 624 return ( 625 calculated_metrics_script, 626 calculated_metrics_script_snapshot, 627 derived_metrics_script, 628 ) 629 630 @classmethod 631 def _get_derived_metrics(cls, derived_metric: dict) -> list[str]: 632 """Get derived metrics from use case. 633 634 Args: 635 derived_metric: use case derived metrics. 636 """ 637 derived_metric_script = [] 638 639 for i in range(0, len(derived_metric)): 640 derived_formula = str(derived_metric[i]["formula"]) 641 derived_label = derived_metric[i]["label"] 642 derived_metric_script.append(derived_formula + " AS " + derived_label) 643 644 return derived_metric_script 645 646 @classmethod 647 def _get_calculated_metric( 648 cls, 649 metric_key: str, 650 calculated_metric: dict, 651 dimension_partition: str, 652 cadence_snapshot_status: dict, 653 ) -> tuple[list[str], list[str]]: 654 """Get calculated metrics from use case. 655 656 Args: 657 metric_key: use case metric name. 658 calculated_metric: use case calculated metrics. 659 dimension_partition: dimension partition. 660 cadence_snapshot_status: cadences to execute with the information if it has 661 snapshot. 662 """ 663 last_cadence_script: list[str] = [] 664 last_year_cadence_script: list[str] = [] 665 window_script: list[str] = [] 666 last_cadence_script_snapshot: list[str] = [] 667 last_year_cadence_script_snapshot: list[str] = [] 668 window_script_snapshot: list[str] = [] 669 670 if "last_cadence" in calculated_metric: 671 ( 672 last_cadence_script, 673 last_cadence_script_snapshot, 674 ) = cls._get_cadence_calculated_metric( 675 metric_key, 676 dimension_partition, 677 calculated_metric, 678 cadence_snapshot_status, 679 "last_cadence", 680 ) 681 if "last_year_cadence" in calculated_metric: 682 ( 683 last_year_cadence_script, 684 last_year_cadence_script_snapshot, 685 ) = cls._get_cadence_calculated_metric( 686 metric_key, 687 dimension_partition, 688 calculated_metric, 689 cadence_snapshot_status, 690 "last_year_cadence", 691 ) 692 if "window_function" in calculated_metric: 693 window_script, window_script_snapshot = cls._get_window_calculated_metric( 694 metric_key, 695 dimension_partition, 696 calculated_metric, 697 cadence_snapshot_status, 698 ) 699 700 calculated_script = [ 701 *last_cadence_script, 702 *last_year_cadence_script, 703 *window_script, 704 ] 705 calculated_script_snapshot = [ 706 *last_cadence_script_snapshot, 707 *last_year_cadence_script_snapshot, 708 *window_script_snapshot, 709 ] 710 711 return calculated_script, calculated_script_snapshot 712 713 @classmethod 714 def _get_window_calculated_metric( 715 cls, 716 metric_key: str, 717 dimension_partition: str, 718 calculated_metric: dict, 719 cadence_snapshot_status: dict, 720 ) -> tuple[list, list]: 721 """Get window calculated metrics from use case. 722 723 Args: 724 metric_key: use case metric name. 725 dimension_partition: dimension partition. 726 calculated_metric: use case calculated metrics. 727 cadence_snapshot_status: cadences to execute with the information if it has 728 snapshot. 729 """ 730 calculated_script = [] 731 calculated_script_snapshot = [] 732 733 for i in range(0, len(calculated_metric["window_function"])): 734 window_function = calculated_metric["window_function"][i]["agg_func"] 735 window_function_start = calculated_metric["window_function"][i]["window"][0] 736 window_function_end = calculated_metric["window_function"][i]["window"][1] 737 window_label = calculated_metric["window_function"][i]["label"] 738 739 calculated_script.append( 740 f""" 741 NVL( 742 {window_function}({metric_key}) OVER 743 ( 744 PARTITION BY {dimension_partition} 745 order by from_date ROWS BETWEEN 746 {str(window_function_start)} PRECEDING 747 AND {str(window_function_end)} PRECEDING 748 ), 749 0 750 ) AS 751 {window_label} 752 """ 753 ) 754 755 if "Y" in cadence_snapshot_status.values(): 756 calculated_script_snapshot.append( 757 f""" 758 NVL( 759 {window_function}({metric_key}) OVER 760 ( 761 PARTITION BY {dimension_partition} ,rn 762 order by from_date ROWS BETWEEN 763 {str(window_function_start)} PRECEDING 764 AND {str(window_function_end)} PRECEDING 765 ), 766 0 767 ) AS 768 {window_label} 769 """ 770 ) 771 772 return calculated_script, calculated_script_snapshot 773 774 @classmethod 775 def _get_cadence_calculated_metric( 776 cls, 777 metric_key: str, 778 dimension_partition: str, 779 calculated_metric: dict, 780 cadence_snapshot_status: dict, 781 cadence: str, 782 ) -> tuple[list, list]: 783 """Get cadence calculated metrics from use case. 784 785 Args: 786 metric_key: use case metric name. 787 calculated_metric: use case calculated metrics. 788 dimension_partition: dimension partition. 789 cadence_snapshot_status: cadences to execute with the information if it has 790 snapshot. 791 cadence: cadence to process. 792 """ 793 calculated_script = [] 794 calculated_script_snapshot = [] 795 796 for i in range(0, len(calculated_metric[cadence])): 797 cadence_lag = cls._get_cadence_item_lag(calculated_metric, cadence, i) 798 cadence_label = calculated_metric[cadence][i]["label"] 799 800 calculated_script.append( 801 cls._get_cadence_lag_statement( 802 metric_key, 803 cadence_lag, 804 dimension_partition, 805 cadence_label, 806 snapshot=False, 807 cadence=cadence, 808 ) 809 ) 810 811 if "Y" in cadence_snapshot_status.values(): 812 calculated_script_snapshot.append( 813 cls._get_cadence_lag_statement( 814 metric_key, 815 cadence_lag, 816 dimension_partition, 817 cadence_label, 818 snapshot=True, 819 cadence=cadence, 820 ) 821 ) 822 823 return calculated_script, calculated_script_snapshot 824 825 @classmethod 826 def _get_cadence_item_lag( 827 cls, calculated_metric: dict, cadence: str, item: int 828 ) -> str: 829 """Get calculated metric item lag. 830 831 Args: 832 calculated_metric: use case calculated metrics. 833 cadence: cadence to process. 834 item: metric item. 835 """ 836 return str(calculated_metric[cadence][item]["window"]) 837 838 @classmethod 839 def _get_cadence_lag_statement( 840 cls, 841 metric_key: str, 842 cadence_lag: str, 843 dimension_partition: str, 844 cadence_label: str, 845 snapshot: bool, 846 cadence: str, 847 ) -> str: 848 """Get cadence lag statement. 849 850 Args: 851 metric_key: use case metric name. 852 cadence_lag: cadence window lag. 853 dimension_partition: dimension partition. 854 cadence_label: cadence name. 855 snapshot: indicate if the snapshot is enabled. 856 cadence: cadence to process. 857 """ 858 cadence_lag_statement = "" 859 if cadence == "last_cadence": 860 cadence_lag_statement = ( 861 "NVL(LAG(" 862 + metric_key 863 + "," 864 + cadence_lag 865 + ") OVER(PARTITION BY " 866 + dimension_partition 867 + (",rn" if snapshot else "") 868 + " order by from_date),0) AS " 869 + cadence_label 870 ) 871 elif cadence == "last_year_cadence": 872 cadence_lag_statement = ( 873 "NVL(LAG(" 874 + metric_key 875 + "," 876 + cadence_lag 877 + ") OVER(PARTITION BY " 878 + dimension_partition 879 + (",rn" if snapshot else "") 880 + """, 881 case 882 when cadence in ('DAY','MONTH','QUARTER') 883 then struct(month(from_date), day(from_date)) 884 when cadence in('WEEK') 885 then struct(weekofyear(from_date+1),1) 886 end order by from_date),0) AS """ 887 + cadence_label 888 ) 889 else: 890 cls._LOGGER.error(f"Cadence {cadence} not implemented yet") 891 892 return cadence_lag_statement
Class to control the GAB View creation.
311 def __init__( 312 self, 313 query_id: str, 314 lookup_query_builder: DataFrame, 315 target_database: str, 316 target_table: str, 317 ): 318 """Construct GABViewManager instances. 319 320 Args: 321 query_id: gab configuration table use case identifier. 322 lookup_query_builder: gab configuration data. 323 target_database: target database to write. 324 target_table: target table to write. 325 """ 326 self.query_id = query_id 327 self.lookup_query_builder = lookup_query_builder 328 self.target_database = target_database 329 self.target_table = target_table
Construct GABViewManager instances.
Arguments:
- query_id: gab configuration table use case identifier.
- lookup_query_builder: gab configuration data.
- target_database: target database to write.
- target_table: target table to write.
331 def generate_use_case_views(self) -> None: 332 """Generate all the use case views. 333 334 Generates the DDLs for each of the views. This DDL is dynamically built based on 335 the mappings provided in the config table. 336 """ 337 reconciliation_window = GABUtils.get_json_column_as_dict( 338 self.lookup_query_builder, self.query_id, "recon_window" 339 ) 340 341 cadence_snapshot_status = self._get_cadence_snapshot_status( 342 reconciliation_window 343 ) 344 345 ( 346 cadences_with_snapshot, 347 cadences_without_snapshot, 348 ) = self._split_cadence_by_snapshot(cadence_snapshot_status) 349 350 mappings = GABUtils.get_json_column_as_dict( 351 self.lookup_query_builder, self.query_id, "mappings" 352 ) 353 354 for view_name in mappings.keys(): 355 self._generate_use_case_view( 356 mappings, 357 view_name, 358 cadence_snapshot_status, 359 cadences_with_snapshot, 360 cadences_without_snapshot, 361 self.target_database, 362 self.target_table, 363 self.query_id, 364 )
Generate all the use case views.
Generates the DDLs for each of the views. This DDL is dynamically built based on the mappings provided in the config table.