lakehouse_engine.algorithms.gab
Module to define Gold Asset Builder algorithm behavior.
1"""Module to define Gold Asset Builder algorithm behavior.""" 2 3import copy 4from datetime import datetime, timedelta 5from typing import Union 6 7import pendulum 8from jinja2 import Template 9from pyspark import Row 10from pyspark.sql import DataFrame 11from pyspark.sql.functions import lit 12 13from lakehouse_engine.algorithms.algorithm import Algorithm 14from lakehouse_engine.core.definitions import ( 15 GABCadence, 16 GABCombinedConfiguration, 17 GABDefaults, 18 GABKeys, 19 GABReplaceableKeys, 20 GABSpec, 21 GABStartOfWeek, 22) 23from lakehouse_engine.core.exec_env import ExecEnv 24from lakehouse_engine.core.gab_manager import GABCadenceManager, GABViewManager 25from lakehouse_engine.core.gab_sql_generator import ( 26 GABDeleteGenerator, 27 GABInsertGenerator, 28) 29from lakehouse_engine.utils.gab_utils import GABPartitionUtils, GABUtils 30from lakehouse_engine.utils.logging_handler import LoggingHandler 31 32 33class GAB(Algorithm): 34 """Class representing the gold asset builder.""" 35 36 _LOGGER = LoggingHandler(__name__).get_logger() 37 _SPARK_DEFAULT_PARALLELISM_CONFIG = ( 38 "spark.sql.sources.parallelPartitionDiscovery.parallelism" 39 ) 40 _SPARK_DEFAULT_PARALLELISM_VALUE = "10000" 41 42 def __init__(self, acon: dict): 43 """Construct GAB instances. 44 45 Args: 46 acon: algorithm configuration. 47 """ 48 self.spec: GABSpec = GABSpec.create_from_acon(acon=acon) 49 50 def execute(self) -> None: 51 """Execute the Gold Asset Builder.""" 52 self._LOGGER.info(f"Reading {self.spec.lookup_table} as lkp_query_builder") 53 lookup_query_builder_df = ExecEnv.SESSION.read.table(self.spec.lookup_table) 54 ExecEnv.SESSION.read.table(self.spec.calendar_table).createOrReplaceTempView( 55 "df_cal" 56 ) 57 self._LOGGER.info(f"Generating calendar from {self.spec.calendar_table}") 58 59 query_label = self.spec.query_label_filter 60 queue = self.spec.queue_filter 61 cadence = self.spec.cadence_filter 62 63 self._LOGGER.info(f"Query Label Filter {query_label}") 64 self._LOGGER.info(f"Queue Filter {queue}") 65 self._LOGGER.info(f"Cadence Filter {cadence}") 66 67 gab_path = self.spec.gab_base_path 68 self._LOGGER.info(f"Gab Base Path {gab_path}") 69 70 lookup_query_builder_df = lookup_query_builder_df.filter( 71 ( 72 (lookup_query_builder_df.query_label.isin(query_label)) 73 & (lookup_query_builder_df.queue.isin(queue)) 74 & (lookup_query_builder_df.is_active != lit("N")) 75 ) 76 ) 77 78 lookup_query_builder_df.cache() 79 80 for use_case in lookup_query_builder_df.collect(): 81 self._process_use_case( 82 use_case=use_case, 83 lookup_query_builder=lookup_query_builder_df, 84 selected_cadences=cadence, 85 gab_path=gab_path, 86 ) 87 88 lookup_query_builder_df.unpersist() 89 90 def _process_use_case( 91 self, 92 use_case: Row, 93 lookup_query_builder: DataFrame, 94 selected_cadences: list[str], 95 gab_path: str, 96 ) -> None: 97 """Process each gab use case. 98 99 Args: 100 use_case: gab use case to process. 101 lookup_query_builder: gab configuration data. 102 selected_cadences: selected cadences to process. 103 gab_path: gab base path used to get the use case stages sql files. 104 """ 105 self._LOGGER.info(f"Executing use case: {use_case['query_label']}") 106 107 reconciliation = GABUtils.get_json_column_as_dict( 108 lookup_query_builder=lookup_query_builder, 109 query_id=use_case["query_id"], 110 query_column="recon_window", 111 ) 112 self._LOGGER.info(f"reconcilation window - {reconciliation}") 113 configured_cadences = list(reconciliation.keys()) 114 115 stages = GABUtils.get_json_column_as_dict( 116 lookup_query_builder=lookup_query_builder, 117 query_id=use_case["query_id"], 118 query_column="intermediate_stages", 119 ) 120 self._LOGGER.info(f"intermediate stages - {stages}") 121 122 self._LOGGER.info(f"selected_cadences: {selected_cadences}") 123 self._LOGGER.info(f"configured_cadences: {configured_cadences}") 124 cadences = self._get_filtered_cadences(selected_cadences, configured_cadences) 125 self._LOGGER.info(f"filtered cadences - {cadences}") 126 127 latest_run_date, latest_config_date = self._get_latest_usecase_data( 128 use_case["query_id"] 129 ) 130 self._LOGGER.info(f"latest_config_date: {latest_config_date}") 131 self._LOGGER.info(f"latest_run_date: - {latest_run_date}") 132 self._set_use_case_stage_template_file(stages, gab_path, use_case) 133 processed_cadences = [] 134 135 for cadence in cadences: 136 is_cadence_processed = self._process_use_case_query_cadence( 137 cadence, 138 reconciliation, 139 use_case, 140 stages, 141 lookup_query_builder, 142 ) 143 if is_cadence_processed: 144 processed_cadences.append(is_cadence_processed) 145 146 if processed_cadences: 147 self._generate_ddl( 148 latest_config_date=latest_config_date, 149 latest_run_date=latest_run_date, 150 query_id=use_case["query_id"], 151 lookup_query_builder=lookup_query_builder, 152 ) 153 else: 154 self._LOGGER.info( 155 f"Skipping use case {use_case['query_label']}. No cadence processed " 156 "for the use case." 157 ) 158 159 @classmethod 160 def _set_use_case_stage_template_file( 161 cls, stages: dict, gab_path: str, use_case: Row 162 ) -> None: 163 """Set templated file for each stage. 164 165 Args: 166 stages: use case stages with their configuration. 167 gab_path: gab base path used to get the use case stages SQL files. 168 use_case: gab use case to process. 169 """ 170 cls._LOGGER.info("Reading templated file for each stage...") 171 172 for i in range(1, len(stages) + 1): 173 stage = stages[str(i)] 174 stage_file_path = stage["file_path"] 175 full_path = gab_path + stage_file_path 176 cls._LOGGER.info(f"Stage file path is: {full_path}") 177 file_read = open(full_path, "r").read() 178 templated_file = file_read.replace( 179 "replace_offset_value", str(use_case["timezone_offset"]) 180 ) 181 stage["templated_file"] = templated_file 182 stage["full_file_path"] = full_path 183 184 def _process_use_case_query_cadence( 185 self, 186 cadence: str, 187 reconciliation: dict, 188 use_case: Row, 189 stages: dict, 190 lookup_query_builder: DataFrame, 191 ) -> bool: 192 """Identify use case reconciliation window and cadence. 193 194 Args: 195 cadence: cadence to process. 196 reconciliation: configured use case reconciliation window. 197 use_case: gab use case to process. 198 stages: use case stages with their configuration. 199 lookup_query_builder: gab configuration data. 200 """ 201 selected_reconciliation_window = {} 202 selected_cadence = reconciliation.get(cadence) 203 self._LOGGER.info(f"Processing cadence: {cadence}") 204 self._LOGGER.info(f"Reconciliation Window - {selected_cadence}") 205 206 if selected_cadence: 207 selected_reconciliation_window = selected_cadence.get("recon_window") 208 209 self._LOGGER.info(f"{cadence}: {self.spec.start_date} - {self.spec.end_date}") 210 211 start_of_week = use_case["start_of_the_week"] 212 213 self._set_week_configuration_by_uc_start_of_week(start_of_week) 214 215 cadence_configuration_at_end_date = ( 216 GABUtils.get_cadence_configuration_at_end_date(self.spec.end_date) 217 ) 218 219 reconciliation_cadences = GABUtils().get_reconciliation_cadences( 220 cadence=cadence, 221 selected_reconciliation_window=selected_reconciliation_window, 222 cadence_configuration_at_end_date=cadence_configuration_at_end_date, 223 rerun_flag=self.spec.rerun_flag, 224 ) 225 226 start_date_str = GABUtils.format_datetime_to_default(self.spec.start_date) 227 end_date_str = GABUtils.format_datetime_to_default(self.spec.end_date) 228 229 for reconciliation_cadence, snapshot_flag in reconciliation_cadences.items(): 230 self._process_reconciliation_cadence( 231 reconciliation_cadence=reconciliation_cadence, 232 snapshot_flag=snapshot_flag, 233 cadence=cadence, 234 start_date_str=start_date_str, 235 end_date_str=end_date_str, 236 use_case=use_case, 237 lookup_query_builder=lookup_query_builder, 238 stages=stages, 239 ) 240 241 return (cadence in reconciliation.keys()) or ( 242 reconciliation_cadences is not None 243 ) 244 245 def _process_reconciliation_cadence( 246 self, 247 reconciliation_cadence: str, 248 snapshot_flag: str, 249 cadence: str, 250 start_date_str: str, 251 end_date_str: str, 252 use_case: Row, 253 lookup_query_builder: DataFrame, 254 stages: dict, 255 ) -> None: 256 """Process use case reconciliation window. 257 258 Reconcile the pre-aggregated data to cover the late events. 259 260 Args: 261 reconciliation_cadence: reconciliation to process. 262 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 263 cadence: cadence to process. 264 start_date_str: start date of the period to process. 265 end_date_str: end date of the period to process. 266 use_case: gab use case to process. 267 lookup_query_builder: gab configuration data. 268 stages: use case stages with their configuration. 269 270 Example: 271 Cadence: week; 272 Reconciliation: monthly; 273 This means every weekend previous week aggregations will be calculated and 274 on month end we will reconcile the numbers calculated for last 4 weeks 275 to readjust the number for late events. 276 """ 277 ( 278 window_start_date, 279 window_end_date, 280 filter_start_date, 281 filter_end_date, 282 ) = GABCadenceManager().extended_window_calculator( 283 cadence, 284 reconciliation_cadence, 285 self.spec.current_date, 286 start_date_str, 287 end_date_str, 288 use_case["query_type"], 289 self.spec.rerun_flag, 290 snapshot_flag, 291 ) 292 293 if use_case["timezone_offset"]: 294 filter_start_date = filter_start_date + timedelta( 295 hours=use_case["timezone_offset"] 296 ) 297 filter_end_date = filter_end_date + timedelta( 298 hours=use_case["timezone_offset"] 299 ) 300 301 filter_start_date_str = GABUtils.format_datetime_to_default(filter_start_date) 302 filter_end_date_str = GABUtils.format_datetime_to_default(filter_end_date) 303 304 partition_end = GABUtils.format_datetime_to_default( 305 (window_end_date - timedelta(days=1)) 306 ) 307 308 window_start_date_str = GABUtils.format_datetime_to_default(window_start_date) 309 window_end_date_str = GABUtils.format_datetime_to_default(window_end_date) 310 311 partition_filter = GABPartitionUtils.get_partition_condition( 312 filter_start_date_str, partition_end 313 ) 314 315 self._LOGGER.info( 316 "extended window for start and end dates are: " 317 f"{filter_start_date_str} - {filter_end_date_str}" 318 ) 319 320 unpersist_list = [] 321 322 for i in range(1, len(stages) + 1): 323 stage = stages[str(i)] 324 templated_file = stage["templated_file"] 325 stage_file_path = stage["full_file_path"] 326 327 templated = self._process_use_case_query_step( 328 stage=stages[str(i)], 329 templated_file=templated_file, 330 use_case=use_case, 331 reconciliation_cadence=reconciliation_cadence, 332 cadence=cadence, 333 snapshot_flag=snapshot_flag, 334 window_start_date=window_start_date_str, 335 partition_end=partition_end, 336 filter_start_date=filter_start_date_str, 337 filter_end_date=filter_end_date_str, 338 partition_filter=partition_filter, 339 ) 340 341 temp_stage_view_name = self._create_stage_view( 342 templated, 343 stages[str(i)], 344 window_start_date_str, 345 window_end_date_str, 346 use_case["query_id"], 347 use_case["query_label"], 348 cadence, 349 stage_file_path, 350 ) 351 unpersist_list.append(temp_stage_view_name) 352 353 insert_success = self._generate_view_statement( 354 query_id=use_case["query_id"], 355 cadence=cadence, 356 temp_stage_view_name=temp_stage_view_name, 357 lookup_query_builder=lookup_query_builder, 358 window_start_date=window_start_date_str, 359 window_end_date=window_end_date_str, 360 query_label=use_case["query_label"], 361 ) 362 self._LOGGER.info(f"Inserted data to generate the view: {insert_success}") 363 364 self._unpersist_cached_views(unpersist_list) 365 366 def _process_use_case_query_step( 367 self, 368 stage: dict, 369 templated_file: str, 370 use_case: Row, 371 reconciliation_cadence: str, 372 cadence: str, 373 snapshot_flag: str, 374 window_start_date: str, 375 partition_end: str, 376 filter_start_date: str, 377 filter_end_date: str, 378 partition_filter: str, 379 ) -> str: 380 """Process each use case step. 381 382 Process any intermediate view defined in the gab configuration table as step for 383 the use case. 384 385 Args: 386 stage: stage to process. 387 templated_file: sql file to process at this stage. 388 use_case: gab use case to process. 389 reconciliation_cadence: configured use case reconciliation window. 390 cadence: cadence to process. 391 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 392 window_start_date: start date for the configured stage. 393 partition_end: end date for the configured stage. 394 filter_start_date: filter start date to replace in the stage query. 395 filter_end_date: filter end date to replace in the stage query. 396 partition_filter: partition condition. 397 """ 398 filter_col = stage["project_date_column"] 399 if stage["filter_date_column"]: 400 filter_col = stage["filter_date_column"] 401 402 # dummy value to avoid empty error if empty on the configuration 403 project_col = stage.get("project_date_column", "X") 404 405 gab_base_configuration_copy = copy.deepcopy( 406 GABCombinedConfiguration.COMBINED_CONFIGURATION.value 407 ) 408 409 for item in gab_base_configuration_copy.values(): 410 self._update_rendered_item_cadence( 411 reconciliation_cadence, cadence, project_col, item # type: ignore 412 ) 413 414 ( 415 rendered_date, 416 rendered_to_date, 417 join_condition, 418 ) = self._get_cadence_configuration( 419 gab_base_configuration_copy, 420 cadence, 421 reconciliation_cadence, 422 snapshot_flag, 423 use_case["start_of_the_week"], 424 project_col, 425 window_start_date, 426 partition_end, 427 ) 428 429 rendered_file = self._render_template_query( 430 templated=templated_file, 431 cadence=cadence, 432 start_of_the_week=use_case["start_of_the_week"], 433 query_id=use_case["query_id"], 434 rendered_date=rendered_date, 435 filter_start_date=filter_start_date, 436 filter_end_date=filter_end_date, 437 filter_col=filter_col, 438 timezone_offset=use_case["timezone_offset"], 439 join_condition=join_condition, 440 partition_filter=partition_filter, 441 rendered_to_date=rendered_to_date, 442 ) 443 444 return rendered_file 445 446 @classmethod 447 def _get_filtered_cadences( 448 cls, selected_cadences: list[str], configured_cadences: list[str] 449 ) -> list[str]: 450 """Get filtered cadences. 451 452 Get the intersection of user selected cadences and use case configured cadences. 453 454 Args: 455 selected_cadences: user selected cadences. 456 configured_cadences: use case configured cadences. 457 """ 458 return ( 459 configured_cadences 460 if "All" in selected_cadences 461 else GABCadence.order_cadences( 462 list(set(selected_cadences).intersection(configured_cadences)) 463 ) 464 ) 465 466 def _get_latest_usecase_data(self, query_id: str) -> tuple[datetime, datetime]: 467 """Get latest use case data. 468 469 Args: 470 query_id: use case query id. 471 """ 472 return ( 473 self._get_latest_run_date(query_id), 474 self._get_latest_use_case_date(query_id), 475 ) 476 477 def _get_latest_run_date(self, query_id: str) -> datetime: 478 """Get latest use case run date. 479 480 Args: 481 query_id: use case query id. 482 """ 483 last_success_run_sql = """ 484 SELECT run_start_time 485 FROM {database}.gab_log_events 486 WHERE query_id = {query_id} 487 AND stage_name = 'Final Insert' 488 AND status = 'Success' 489 ORDER BY 1 DESC 490 LIMIT 1 491 """.format( # nosec: B608 492 database=self.spec.target_database, query_id=query_id 493 ) 494 try: 495 latest_run_date: datetime = ExecEnv.SESSION.sql( 496 last_success_run_sql 497 ).collect()[0][0] 498 except Exception: 499 latest_run_date = datetime.strptime( 500 "2020-01-01", GABDefaults.DATE_FORMAT.value 501 ) 502 503 return latest_run_date 504 505 def _get_latest_use_case_date(self, query_id: str) -> datetime: 506 """Get latest use case configured date. 507 508 Args: 509 query_id: use case query id. 510 """ 511 query_config_sql = """ 512 SELECT lh_created_on 513 FROM {lkp_query_builder} 514 WHERE query_id = {query_id} 515 """.format( # nosec: B608 516 lkp_query_builder=self.spec.lookup_table, 517 query_id=query_id, 518 ) 519 520 latest_config_date: datetime = ExecEnv.SESSION.sql(query_config_sql).collect()[ 521 0 522 ][0] 523 524 return latest_config_date 525 526 @classmethod 527 def _set_week_configuration_by_uc_start_of_week(cls, start_of_week: str) -> None: 528 """Set week configuration by use case start of week. 529 530 Args: 531 start_of_week: use case start of week (MONDAY or SUNDAY). 532 """ 533 if start_of_week.upper() == "MONDAY": 534 pendulum.week_starts_at(pendulum.MONDAY) 535 pendulum.week_ends_at(pendulum.SUNDAY) 536 elif start_of_week.upper() == "SUNDAY": 537 pendulum.week_starts_at(pendulum.SUNDAY) 538 pendulum.week_ends_at(pendulum.SATURDAY) 539 else: 540 raise NotImplementedError( 541 f"The requested {start_of_week} is not implemented." 542 "Supported `start_of_week` values: [MONDAY, SUNDAY]" 543 ) 544 545 @classmethod 546 def _update_rendered_item_cadence( 547 cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict 548 ) -> None: 549 """Override item properties based in the rendered item cadence. 550 551 Args: 552 reconciliation_cadence: configured use case reconciliation window. 553 cadence: cadence to process. 554 project_col: use case projection date column name. 555 item: predefined use case combination. 556 """ 557 rendered_item = cls._get_rendered_item_cadence( 558 reconciliation_cadence, cadence, project_col, item 559 ) 560 item["join_select"] = rendered_item["join_select"] 561 item["project_start"] = rendered_item["project_start"] 562 item["project_end"] = rendered_item["project_end"] 563 564 @classmethod 565 def _get_rendered_item_cadence( 566 cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict 567 ) -> dict: 568 """Update pre-configured gab parameters with use case data. 569 570 Args: 571 reconciliation_cadence: configured use case reconciliation window. 572 cadence: cadence to process. 573 project_col: use case projection date column name. 574 item: predefined use case combination. 575 """ 576 return { 577 GABKeys.JOIN_SELECT: ( 578 item[GABKeys.JOIN_SELECT] 579 .replace(GABReplaceableKeys.CONFIG_WEEK_START, "Monday") 580 .replace( 581 GABReplaceableKeys.RECONCILIATION_CADENCE, 582 reconciliation_cadence, 583 ) 584 .replace(GABReplaceableKeys.CADENCE, cadence) 585 ), 586 GABKeys.PROJECT_START: ( 587 item[GABKeys.PROJECT_START] 588 .replace(GABReplaceableKeys.CADENCE, cadence) 589 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 590 ), 591 GABKeys.PROJECT_END: ( 592 item[GABKeys.PROJECT_END] 593 .replace(GABReplaceableKeys.CADENCE, cadence) 594 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 595 ), 596 } 597 598 @classmethod 599 def _get_cadence_configuration( 600 cls, 601 use_case_configuration: dict, 602 cadence: str, 603 reconciliation_cadence: str, 604 snapshot_flag: str, 605 start_of_week: str, 606 project_col: str, 607 window_start_date: str, 608 partition_end: str, 609 ) -> tuple[str, str, str]: 610 """Get use case configuration fields to replace pre-configured parameters. 611 612 Args: 613 use_case_configuration: use case configuration. 614 cadence: cadence to process. 615 reconciliation_cadence: cadence to be reconciliated. 616 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 617 start_of_week: use case start of week (MONDAY or SUNDAY). 618 project_col: use case projection date column name. 619 window_start_date: start date for the configured stage. 620 partition_end: end date for the configured stage. 621 622 Returns: 623 rendered_from_date: projection start date. 624 rendered_to_date: projection end date. 625 join_condition: string containing the join condition to replace in the 626 templated query by jinja substitution. 627 """ 628 cadence_dict = next( 629 ( 630 dict(configuration) 631 for configuration in use_case_configuration.values() 632 if ( 633 (cadence in configuration["cadence"]) 634 and (reconciliation_cadence in configuration["recon"]) 635 and (snapshot_flag in configuration["snap_flag"]) 636 and ( 637 GABStartOfWeek.get_start_of_week()[start_of_week.upper()] 638 in configuration["week_start"] 639 ) 640 ) 641 ), 642 None, 643 ) 644 rendered_from_date = None 645 rendered_to_date = None 646 join_condition = None 647 648 if cadence_dict: 649 rendered_from_date = ( 650 cadence_dict[GABKeys.PROJECT_START] 651 .replace(GABReplaceableKeys.CADENCE, cadence) 652 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 653 ) 654 rendered_to_date = ( 655 cadence_dict[GABKeys.PROJECT_END] 656 .replace(GABReplaceableKeys.CADENCE, cadence) 657 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 658 ) 659 660 if cadence_dict[GABKeys.JOIN_SELECT]: 661 join_condition = """ 662 inner join ( 663 {join_select} from df_cal 664 where calendar_date 665 between '{bucket_start}' and '{bucket_end}' 666 ) 667 df_cal on date({date_column}) 668 between df_cal.cadence_start_date and df_cal.cadence_end_date 669 """.format( 670 join_select=cadence_dict[GABKeys.JOIN_SELECT], 671 bucket_start=window_start_date, 672 bucket_end=partition_end, 673 date_column=project_col, 674 ) 675 676 return rendered_from_date, rendered_to_date, join_condition 677 678 def _render_template_query( 679 self, 680 templated: str, 681 cadence: str, 682 start_of_the_week: str, 683 query_id: str, 684 rendered_date: str, 685 filter_start_date: str, 686 filter_end_date: str, 687 filter_col: str, 688 timezone_offset: str, 689 join_condition: str, 690 partition_filter: str, 691 rendered_to_date: str, 692 ) -> str: 693 """Replace jinja templated parameters in the SQL with the actual data. 694 695 Args: 696 templated: templated sql file to process at this stage. 697 cadence: cadence to process. 698 start_of_the_week: use case start of week (MONDAY or SUNDAY). 699 query_id: gab configuration table use case identifier. 700 rendered_date: projection start date. 701 filter_start_date: filter start date to replace in the stage query. 702 filter_end_date: filter end date to replace in the stage query. 703 filter_col: use case projection date column name. 704 timezone_offset: timezone offset configured in the use case. 705 join_condition: string containing the join condition. 706 partition_filter: partition condition. 707 rendered_to_date: projection end date. 708 """ 709 return Template(templated).render( 710 cadence="'{cadence}' as cadence".format(cadence=cadence), 711 cadence_run=cadence, 712 week_start=start_of_the_week, 713 query_id="'{query_id}' as query_id".format(query_id=query_id), 714 project_date_column=rendered_date, 715 target_table=self.spec.target_table, 716 database=self.spec.source_database, 717 start_date=filter_start_date, 718 end_date=filter_end_date, 719 filter_date_column=filter_col, 720 offset_value=timezone_offset, 721 joins=join_condition if join_condition else "", 722 partition_filter=partition_filter, 723 to_date=rendered_to_date, 724 ) 725 726 def _create_stage_view( 727 self, 728 rendered_template: str, 729 stage: dict, 730 window_start_date: str, 731 window_end_date: str, 732 query_id: str, 733 query_label: str, 734 cadence: str, 735 stage_file_path: str, 736 ) -> str: 737 """Create each use case stage view. 738 739 Each stage has a specific order and refer to a specific SQL to be executed. 740 741 Args: 742 rendered_template: rendered stage SQL file. 743 stage: stage to process. 744 window_start_date: start date for the configured stage. 745 window_end_date: end date for the configured stage. 746 query_id: gab configuration table use case identifier. 747 query_label: gab configuration table use case name. 748 cadence: cadence to process. 749 stage_file_path: full stage file path (gab path + stage path). 750 """ 751 run_start_time = datetime.now() 752 creation_status: str 753 error_message: Union[Exception, str] 754 755 try: 756 tmp = ExecEnv.SESSION.sql(rendered_template) 757 num_partitions = ExecEnv.SESSION.conf.get( 758 self._SPARK_DEFAULT_PARALLELISM_CONFIG, 759 self._SPARK_DEFAULT_PARALLELISM_VALUE, 760 ) 761 762 if stage["repartition"]: 763 if stage["repartition"].get("numPartitions"): 764 num_partitions = stage["repartition"]["numPartitions"] 765 766 if stage["repartition"].get("keys"): 767 tmp = tmp.repartition( 768 int(num_partitions), *stage["repartition"]["keys"] 769 ) 770 self._LOGGER.info("Repartitioned on given Key(s)") 771 else: 772 tmp = tmp.repartition(int(num_partitions)) 773 self._LOGGER.info("Repartitioned on given partition count") 774 775 temp_step_view_name: str = stage["table_alias"] 776 tmp.createOrReplaceTempView(temp_step_view_name) 777 778 if stage["storage_level"]: 779 ExecEnv.SESSION.sql( 780 "CACHE TABLE {tbl} " 781 "OPTIONS ('storageLevel' '{type}')".format( 782 tbl=temp_step_view_name, 783 type=stage["storage_level"], 784 ) 785 ) 786 ExecEnv.SESSION.sql( 787 "SELECT COUNT(*) FROM {tbl}".format( # nosec: B608 788 tbl=temp_step_view_name 789 ) 790 ) 791 self._LOGGER.info(f"Cached stage view - {temp_step_view_name} ") 792 793 creation_status = "Success" 794 error_message = "NA" 795 except Exception as err: 796 creation_status = "Failed" 797 error_message = err 798 raise err 799 finally: 800 run_end_time = datetime.now() 801 GABUtils().logger( 802 run_start_time, 803 run_end_time, 804 window_start_date, 805 window_end_date, 806 query_id, 807 query_label, 808 cadence, 809 stage_file_path, 810 rendered_template, 811 creation_status, 812 error_message, 813 self.spec.target_database, 814 ) 815 816 return temp_step_view_name 817 818 def _generate_view_statement( 819 self, 820 query_id: str, 821 cadence: str, 822 temp_stage_view_name: str, 823 lookup_query_builder: DataFrame, 824 window_start_date: str, 825 window_end_date: str, 826 query_label: str, 827 ) -> bool: 828 """Feed use case data to the insights table (default: unified use case table). 829 830 Args: 831 query_id: gab configuration table use case identifier. 832 cadence: cadence to process. 833 temp_stage_view_name: name of the temp view generated by the stage. 834 lookup_query_builder: gab configuration data. 835 window_start_date: start date for the configured stage. 836 window_end_date: end date for the configured stage. 837 query_label: gab configuration table use case name. 838 """ 839 run_start_time = datetime.now() 840 creation_status: str 841 error_message: Union[Exception, str] 842 843 GABDeleteGenerator( 844 query_id=query_id, 845 cadence=cadence, 846 temp_stage_view_name=temp_stage_view_name, 847 lookup_query_builder=lookup_query_builder, 848 target_database=self.spec.target_database, 849 target_table=self.spec.target_table, 850 ).generate_sql() 851 852 gen_ins = GABInsertGenerator( 853 query_id=query_id, 854 cadence=cadence, 855 final_stage_table=temp_stage_view_name, 856 lookup_query_builder=lookup_query_builder, 857 target_database=self.spec.target_database, 858 target_table=self.spec.target_table, 859 ).generate_sql() 860 try: 861 ExecEnv.SESSION.sql(gen_ins) 862 863 creation_status = "Success" 864 error_message = "NA" 865 inserted = True 866 except Exception as err: 867 creation_status = "Failed" 868 error_message = err 869 raise 870 finally: 871 run_end_time = datetime.now() 872 GABUtils().logger( 873 run_start_time, 874 run_end_time, 875 window_start_date, 876 window_end_date, 877 query_id, 878 query_label, 879 cadence, 880 "Final Insert", 881 gen_ins, 882 creation_status, 883 error_message, 884 self.spec.target_database, 885 ) 886 887 return inserted 888 889 @classmethod 890 def _unpersist_cached_views(cls, unpersist_list: list[str]) -> None: 891 """Unpersist cached views. 892 893 Args: 894 unpersist_list: list containing the view names to unpersist. 895 """ 896 [ 897 ExecEnv.SESSION.sql("UNCACHE TABLE {tbl}".format(tbl=i)) 898 for i in unpersist_list 899 ] 900 901 def _generate_ddl( 902 self, 903 latest_config_date: datetime, 904 latest_run_date: datetime, 905 query_id: str, 906 lookup_query_builder: DataFrame, 907 ) -> None: 908 """Generate the actual gold asset. 909 910 It will create and return the view containing all specified dimensions, metrics 911 and computed metric for each cadence/reconciliation window. 912 913 Args: 914 latest_config_date: latest use case configuration date. 915 latest_run_date: latest use case run date. 916 query_id: gab configuration table use case identifier. 917 lookup_query_builder: gab configuration data. 918 """ 919 if str(latest_config_date) > str(latest_run_date): 920 GABViewManager( 921 query_id=query_id, 922 lookup_query_builder=lookup_query_builder, 923 target_database=self.spec.target_database, 924 target_table=self.spec.target_table, 925 ).generate_use_case_views() 926 else: 927 self._LOGGER.info( 928 "View is not being re-created as there are no changes in the " 929 "configuration after the latest run" 930 )
34class GAB(Algorithm): 35 """Class representing the gold asset builder.""" 36 37 _LOGGER = LoggingHandler(__name__).get_logger() 38 _SPARK_DEFAULT_PARALLELISM_CONFIG = ( 39 "spark.sql.sources.parallelPartitionDiscovery.parallelism" 40 ) 41 _SPARK_DEFAULT_PARALLELISM_VALUE = "10000" 42 43 def __init__(self, acon: dict): 44 """Construct GAB instances. 45 46 Args: 47 acon: algorithm configuration. 48 """ 49 self.spec: GABSpec = GABSpec.create_from_acon(acon=acon) 50 51 def execute(self) -> None: 52 """Execute the Gold Asset Builder.""" 53 self._LOGGER.info(f"Reading {self.spec.lookup_table} as lkp_query_builder") 54 lookup_query_builder_df = ExecEnv.SESSION.read.table(self.spec.lookup_table) 55 ExecEnv.SESSION.read.table(self.spec.calendar_table).createOrReplaceTempView( 56 "df_cal" 57 ) 58 self._LOGGER.info(f"Generating calendar from {self.spec.calendar_table}") 59 60 query_label = self.spec.query_label_filter 61 queue = self.spec.queue_filter 62 cadence = self.spec.cadence_filter 63 64 self._LOGGER.info(f"Query Label Filter {query_label}") 65 self._LOGGER.info(f"Queue Filter {queue}") 66 self._LOGGER.info(f"Cadence Filter {cadence}") 67 68 gab_path = self.spec.gab_base_path 69 self._LOGGER.info(f"Gab Base Path {gab_path}") 70 71 lookup_query_builder_df = lookup_query_builder_df.filter( 72 ( 73 (lookup_query_builder_df.query_label.isin(query_label)) 74 & (lookup_query_builder_df.queue.isin(queue)) 75 & (lookup_query_builder_df.is_active != lit("N")) 76 ) 77 ) 78 79 lookup_query_builder_df.cache() 80 81 for use_case in lookup_query_builder_df.collect(): 82 self._process_use_case( 83 use_case=use_case, 84 lookup_query_builder=lookup_query_builder_df, 85 selected_cadences=cadence, 86 gab_path=gab_path, 87 ) 88 89 lookup_query_builder_df.unpersist() 90 91 def _process_use_case( 92 self, 93 use_case: Row, 94 lookup_query_builder: DataFrame, 95 selected_cadences: list[str], 96 gab_path: str, 97 ) -> None: 98 """Process each gab use case. 99 100 Args: 101 use_case: gab use case to process. 102 lookup_query_builder: gab configuration data. 103 selected_cadences: selected cadences to process. 104 gab_path: gab base path used to get the use case stages sql files. 105 """ 106 self._LOGGER.info(f"Executing use case: {use_case['query_label']}") 107 108 reconciliation = GABUtils.get_json_column_as_dict( 109 lookup_query_builder=lookup_query_builder, 110 query_id=use_case["query_id"], 111 query_column="recon_window", 112 ) 113 self._LOGGER.info(f"reconcilation window - {reconciliation}") 114 configured_cadences = list(reconciliation.keys()) 115 116 stages = GABUtils.get_json_column_as_dict( 117 lookup_query_builder=lookup_query_builder, 118 query_id=use_case["query_id"], 119 query_column="intermediate_stages", 120 ) 121 self._LOGGER.info(f"intermediate stages - {stages}") 122 123 self._LOGGER.info(f"selected_cadences: {selected_cadences}") 124 self._LOGGER.info(f"configured_cadences: {configured_cadences}") 125 cadences = self._get_filtered_cadences(selected_cadences, configured_cadences) 126 self._LOGGER.info(f"filtered cadences - {cadences}") 127 128 latest_run_date, latest_config_date = self._get_latest_usecase_data( 129 use_case["query_id"] 130 ) 131 self._LOGGER.info(f"latest_config_date: {latest_config_date}") 132 self._LOGGER.info(f"latest_run_date: - {latest_run_date}") 133 self._set_use_case_stage_template_file(stages, gab_path, use_case) 134 processed_cadences = [] 135 136 for cadence in cadences: 137 is_cadence_processed = self._process_use_case_query_cadence( 138 cadence, 139 reconciliation, 140 use_case, 141 stages, 142 lookup_query_builder, 143 ) 144 if is_cadence_processed: 145 processed_cadences.append(is_cadence_processed) 146 147 if processed_cadences: 148 self._generate_ddl( 149 latest_config_date=latest_config_date, 150 latest_run_date=latest_run_date, 151 query_id=use_case["query_id"], 152 lookup_query_builder=lookup_query_builder, 153 ) 154 else: 155 self._LOGGER.info( 156 f"Skipping use case {use_case['query_label']}. No cadence processed " 157 "for the use case." 158 ) 159 160 @classmethod 161 def _set_use_case_stage_template_file( 162 cls, stages: dict, gab_path: str, use_case: Row 163 ) -> None: 164 """Set templated file for each stage. 165 166 Args: 167 stages: use case stages with their configuration. 168 gab_path: gab base path used to get the use case stages SQL files. 169 use_case: gab use case to process. 170 """ 171 cls._LOGGER.info("Reading templated file for each stage...") 172 173 for i in range(1, len(stages) + 1): 174 stage = stages[str(i)] 175 stage_file_path = stage["file_path"] 176 full_path = gab_path + stage_file_path 177 cls._LOGGER.info(f"Stage file path is: {full_path}") 178 file_read = open(full_path, "r").read() 179 templated_file = file_read.replace( 180 "replace_offset_value", str(use_case["timezone_offset"]) 181 ) 182 stage["templated_file"] = templated_file 183 stage["full_file_path"] = full_path 184 185 def _process_use_case_query_cadence( 186 self, 187 cadence: str, 188 reconciliation: dict, 189 use_case: Row, 190 stages: dict, 191 lookup_query_builder: DataFrame, 192 ) -> bool: 193 """Identify use case reconciliation window and cadence. 194 195 Args: 196 cadence: cadence to process. 197 reconciliation: configured use case reconciliation window. 198 use_case: gab use case to process. 199 stages: use case stages with their configuration. 200 lookup_query_builder: gab configuration data. 201 """ 202 selected_reconciliation_window = {} 203 selected_cadence = reconciliation.get(cadence) 204 self._LOGGER.info(f"Processing cadence: {cadence}") 205 self._LOGGER.info(f"Reconciliation Window - {selected_cadence}") 206 207 if selected_cadence: 208 selected_reconciliation_window = selected_cadence.get("recon_window") 209 210 self._LOGGER.info(f"{cadence}: {self.spec.start_date} - {self.spec.end_date}") 211 212 start_of_week = use_case["start_of_the_week"] 213 214 self._set_week_configuration_by_uc_start_of_week(start_of_week) 215 216 cadence_configuration_at_end_date = ( 217 GABUtils.get_cadence_configuration_at_end_date(self.spec.end_date) 218 ) 219 220 reconciliation_cadences = GABUtils().get_reconciliation_cadences( 221 cadence=cadence, 222 selected_reconciliation_window=selected_reconciliation_window, 223 cadence_configuration_at_end_date=cadence_configuration_at_end_date, 224 rerun_flag=self.spec.rerun_flag, 225 ) 226 227 start_date_str = GABUtils.format_datetime_to_default(self.spec.start_date) 228 end_date_str = GABUtils.format_datetime_to_default(self.spec.end_date) 229 230 for reconciliation_cadence, snapshot_flag in reconciliation_cadences.items(): 231 self._process_reconciliation_cadence( 232 reconciliation_cadence=reconciliation_cadence, 233 snapshot_flag=snapshot_flag, 234 cadence=cadence, 235 start_date_str=start_date_str, 236 end_date_str=end_date_str, 237 use_case=use_case, 238 lookup_query_builder=lookup_query_builder, 239 stages=stages, 240 ) 241 242 return (cadence in reconciliation.keys()) or ( 243 reconciliation_cadences is not None 244 ) 245 246 def _process_reconciliation_cadence( 247 self, 248 reconciliation_cadence: str, 249 snapshot_flag: str, 250 cadence: str, 251 start_date_str: str, 252 end_date_str: str, 253 use_case: Row, 254 lookup_query_builder: DataFrame, 255 stages: dict, 256 ) -> None: 257 """Process use case reconciliation window. 258 259 Reconcile the pre-aggregated data to cover the late events. 260 261 Args: 262 reconciliation_cadence: reconciliation to process. 263 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 264 cadence: cadence to process. 265 start_date_str: start date of the period to process. 266 end_date_str: end date of the period to process. 267 use_case: gab use case to process. 268 lookup_query_builder: gab configuration data. 269 stages: use case stages with their configuration. 270 271 Example: 272 Cadence: week; 273 Reconciliation: monthly; 274 This means every weekend previous week aggregations will be calculated and 275 on month end we will reconcile the numbers calculated for last 4 weeks 276 to readjust the number for late events. 277 """ 278 ( 279 window_start_date, 280 window_end_date, 281 filter_start_date, 282 filter_end_date, 283 ) = GABCadenceManager().extended_window_calculator( 284 cadence, 285 reconciliation_cadence, 286 self.spec.current_date, 287 start_date_str, 288 end_date_str, 289 use_case["query_type"], 290 self.spec.rerun_flag, 291 snapshot_flag, 292 ) 293 294 if use_case["timezone_offset"]: 295 filter_start_date = filter_start_date + timedelta( 296 hours=use_case["timezone_offset"] 297 ) 298 filter_end_date = filter_end_date + timedelta( 299 hours=use_case["timezone_offset"] 300 ) 301 302 filter_start_date_str = GABUtils.format_datetime_to_default(filter_start_date) 303 filter_end_date_str = GABUtils.format_datetime_to_default(filter_end_date) 304 305 partition_end = GABUtils.format_datetime_to_default( 306 (window_end_date - timedelta(days=1)) 307 ) 308 309 window_start_date_str = GABUtils.format_datetime_to_default(window_start_date) 310 window_end_date_str = GABUtils.format_datetime_to_default(window_end_date) 311 312 partition_filter = GABPartitionUtils.get_partition_condition( 313 filter_start_date_str, partition_end 314 ) 315 316 self._LOGGER.info( 317 "extended window for start and end dates are: " 318 f"{filter_start_date_str} - {filter_end_date_str}" 319 ) 320 321 unpersist_list = [] 322 323 for i in range(1, len(stages) + 1): 324 stage = stages[str(i)] 325 templated_file = stage["templated_file"] 326 stage_file_path = stage["full_file_path"] 327 328 templated = self._process_use_case_query_step( 329 stage=stages[str(i)], 330 templated_file=templated_file, 331 use_case=use_case, 332 reconciliation_cadence=reconciliation_cadence, 333 cadence=cadence, 334 snapshot_flag=snapshot_flag, 335 window_start_date=window_start_date_str, 336 partition_end=partition_end, 337 filter_start_date=filter_start_date_str, 338 filter_end_date=filter_end_date_str, 339 partition_filter=partition_filter, 340 ) 341 342 temp_stage_view_name = self._create_stage_view( 343 templated, 344 stages[str(i)], 345 window_start_date_str, 346 window_end_date_str, 347 use_case["query_id"], 348 use_case["query_label"], 349 cadence, 350 stage_file_path, 351 ) 352 unpersist_list.append(temp_stage_view_name) 353 354 insert_success = self._generate_view_statement( 355 query_id=use_case["query_id"], 356 cadence=cadence, 357 temp_stage_view_name=temp_stage_view_name, 358 lookup_query_builder=lookup_query_builder, 359 window_start_date=window_start_date_str, 360 window_end_date=window_end_date_str, 361 query_label=use_case["query_label"], 362 ) 363 self._LOGGER.info(f"Inserted data to generate the view: {insert_success}") 364 365 self._unpersist_cached_views(unpersist_list) 366 367 def _process_use_case_query_step( 368 self, 369 stage: dict, 370 templated_file: str, 371 use_case: Row, 372 reconciliation_cadence: str, 373 cadence: str, 374 snapshot_flag: str, 375 window_start_date: str, 376 partition_end: str, 377 filter_start_date: str, 378 filter_end_date: str, 379 partition_filter: str, 380 ) -> str: 381 """Process each use case step. 382 383 Process any intermediate view defined in the gab configuration table as step for 384 the use case. 385 386 Args: 387 stage: stage to process. 388 templated_file: sql file to process at this stage. 389 use_case: gab use case to process. 390 reconciliation_cadence: configured use case reconciliation window. 391 cadence: cadence to process. 392 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 393 window_start_date: start date for the configured stage. 394 partition_end: end date for the configured stage. 395 filter_start_date: filter start date to replace in the stage query. 396 filter_end_date: filter end date to replace in the stage query. 397 partition_filter: partition condition. 398 """ 399 filter_col = stage["project_date_column"] 400 if stage["filter_date_column"]: 401 filter_col = stage["filter_date_column"] 402 403 # dummy value to avoid empty error if empty on the configuration 404 project_col = stage.get("project_date_column", "X") 405 406 gab_base_configuration_copy = copy.deepcopy( 407 GABCombinedConfiguration.COMBINED_CONFIGURATION.value 408 ) 409 410 for item in gab_base_configuration_copy.values(): 411 self._update_rendered_item_cadence( 412 reconciliation_cadence, cadence, project_col, item # type: ignore 413 ) 414 415 ( 416 rendered_date, 417 rendered_to_date, 418 join_condition, 419 ) = self._get_cadence_configuration( 420 gab_base_configuration_copy, 421 cadence, 422 reconciliation_cadence, 423 snapshot_flag, 424 use_case["start_of_the_week"], 425 project_col, 426 window_start_date, 427 partition_end, 428 ) 429 430 rendered_file = self._render_template_query( 431 templated=templated_file, 432 cadence=cadence, 433 start_of_the_week=use_case["start_of_the_week"], 434 query_id=use_case["query_id"], 435 rendered_date=rendered_date, 436 filter_start_date=filter_start_date, 437 filter_end_date=filter_end_date, 438 filter_col=filter_col, 439 timezone_offset=use_case["timezone_offset"], 440 join_condition=join_condition, 441 partition_filter=partition_filter, 442 rendered_to_date=rendered_to_date, 443 ) 444 445 return rendered_file 446 447 @classmethod 448 def _get_filtered_cadences( 449 cls, selected_cadences: list[str], configured_cadences: list[str] 450 ) -> list[str]: 451 """Get filtered cadences. 452 453 Get the intersection of user selected cadences and use case configured cadences. 454 455 Args: 456 selected_cadences: user selected cadences. 457 configured_cadences: use case configured cadences. 458 """ 459 return ( 460 configured_cadences 461 if "All" in selected_cadences 462 else GABCadence.order_cadences( 463 list(set(selected_cadences).intersection(configured_cadences)) 464 ) 465 ) 466 467 def _get_latest_usecase_data(self, query_id: str) -> tuple[datetime, datetime]: 468 """Get latest use case data. 469 470 Args: 471 query_id: use case query id. 472 """ 473 return ( 474 self._get_latest_run_date(query_id), 475 self._get_latest_use_case_date(query_id), 476 ) 477 478 def _get_latest_run_date(self, query_id: str) -> datetime: 479 """Get latest use case run date. 480 481 Args: 482 query_id: use case query id. 483 """ 484 last_success_run_sql = """ 485 SELECT run_start_time 486 FROM {database}.gab_log_events 487 WHERE query_id = {query_id} 488 AND stage_name = 'Final Insert' 489 AND status = 'Success' 490 ORDER BY 1 DESC 491 LIMIT 1 492 """.format( # nosec: B608 493 database=self.spec.target_database, query_id=query_id 494 ) 495 try: 496 latest_run_date: datetime = ExecEnv.SESSION.sql( 497 last_success_run_sql 498 ).collect()[0][0] 499 except Exception: 500 latest_run_date = datetime.strptime( 501 "2020-01-01", GABDefaults.DATE_FORMAT.value 502 ) 503 504 return latest_run_date 505 506 def _get_latest_use_case_date(self, query_id: str) -> datetime: 507 """Get latest use case configured date. 508 509 Args: 510 query_id: use case query id. 511 """ 512 query_config_sql = """ 513 SELECT lh_created_on 514 FROM {lkp_query_builder} 515 WHERE query_id = {query_id} 516 """.format( # nosec: B608 517 lkp_query_builder=self.spec.lookup_table, 518 query_id=query_id, 519 ) 520 521 latest_config_date: datetime = ExecEnv.SESSION.sql(query_config_sql).collect()[ 522 0 523 ][0] 524 525 return latest_config_date 526 527 @classmethod 528 def _set_week_configuration_by_uc_start_of_week(cls, start_of_week: str) -> None: 529 """Set week configuration by use case start of week. 530 531 Args: 532 start_of_week: use case start of week (MONDAY or SUNDAY). 533 """ 534 if start_of_week.upper() == "MONDAY": 535 pendulum.week_starts_at(pendulum.MONDAY) 536 pendulum.week_ends_at(pendulum.SUNDAY) 537 elif start_of_week.upper() == "SUNDAY": 538 pendulum.week_starts_at(pendulum.SUNDAY) 539 pendulum.week_ends_at(pendulum.SATURDAY) 540 else: 541 raise NotImplementedError( 542 f"The requested {start_of_week} is not implemented." 543 "Supported `start_of_week` values: [MONDAY, SUNDAY]" 544 ) 545 546 @classmethod 547 def _update_rendered_item_cadence( 548 cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict 549 ) -> None: 550 """Override item properties based in the rendered item cadence. 551 552 Args: 553 reconciliation_cadence: configured use case reconciliation window. 554 cadence: cadence to process. 555 project_col: use case projection date column name. 556 item: predefined use case combination. 557 """ 558 rendered_item = cls._get_rendered_item_cadence( 559 reconciliation_cadence, cadence, project_col, item 560 ) 561 item["join_select"] = rendered_item["join_select"] 562 item["project_start"] = rendered_item["project_start"] 563 item["project_end"] = rendered_item["project_end"] 564 565 @classmethod 566 def _get_rendered_item_cadence( 567 cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict 568 ) -> dict: 569 """Update pre-configured gab parameters with use case data. 570 571 Args: 572 reconciliation_cadence: configured use case reconciliation window. 573 cadence: cadence to process. 574 project_col: use case projection date column name. 575 item: predefined use case combination. 576 """ 577 return { 578 GABKeys.JOIN_SELECT: ( 579 item[GABKeys.JOIN_SELECT] 580 .replace(GABReplaceableKeys.CONFIG_WEEK_START, "Monday") 581 .replace( 582 GABReplaceableKeys.RECONCILIATION_CADENCE, 583 reconciliation_cadence, 584 ) 585 .replace(GABReplaceableKeys.CADENCE, cadence) 586 ), 587 GABKeys.PROJECT_START: ( 588 item[GABKeys.PROJECT_START] 589 .replace(GABReplaceableKeys.CADENCE, cadence) 590 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 591 ), 592 GABKeys.PROJECT_END: ( 593 item[GABKeys.PROJECT_END] 594 .replace(GABReplaceableKeys.CADENCE, cadence) 595 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 596 ), 597 } 598 599 @classmethod 600 def _get_cadence_configuration( 601 cls, 602 use_case_configuration: dict, 603 cadence: str, 604 reconciliation_cadence: str, 605 snapshot_flag: str, 606 start_of_week: str, 607 project_col: str, 608 window_start_date: str, 609 partition_end: str, 610 ) -> tuple[str, str, str]: 611 """Get use case configuration fields to replace pre-configured parameters. 612 613 Args: 614 use_case_configuration: use case configuration. 615 cadence: cadence to process. 616 reconciliation_cadence: cadence to be reconciliated. 617 snapshot_flag: flag indicating if for this cadence the snapshot is enabled. 618 start_of_week: use case start of week (MONDAY or SUNDAY). 619 project_col: use case projection date column name. 620 window_start_date: start date for the configured stage. 621 partition_end: end date for the configured stage. 622 623 Returns: 624 rendered_from_date: projection start date. 625 rendered_to_date: projection end date. 626 join_condition: string containing the join condition to replace in the 627 templated query by jinja substitution. 628 """ 629 cadence_dict = next( 630 ( 631 dict(configuration) 632 for configuration in use_case_configuration.values() 633 if ( 634 (cadence in configuration["cadence"]) 635 and (reconciliation_cadence in configuration["recon"]) 636 and (snapshot_flag in configuration["snap_flag"]) 637 and ( 638 GABStartOfWeek.get_start_of_week()[start_of_week.upper()] 639 in configuration["week_start"] 640 ) 641 ) 642 ), 643 None, 644 ) 645 rendered_from_date = None 646 rendered_to_date = None 647 join_condition = None 648 649 if cadence_dict: 650 rendered_from_date = ( 651 cadence_dict[GABKeys.PROJECT_START] 652 .replace(GABReplaceableKeys.CADENCE, cadence) 653 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 654 ) 655 rendered_to_date = ( 656 cadence_dict[GABKeys.PROJECT_END] 657 .replace(GABReplaceableKeys.CADENCE, cadence) 658 .replace(GABReplaceableKeys.DATE_COLUMN, project_col) 659 ) 660 661 if cadence_dict[GABKeys.JOIN_SELECT]: 662 join_condition = """ 663 inner join ( 664 {join_select} from df_cal 665 where calendar_date 666 between '{bucket_start}' and '{bucket_end}' 667 ) 668 df_cal on date({date_column}) 669 between df_cal.cadence_start_date and df_cal.cadence_end_date 670 """.format( 671 join_select=cadence_dict[GABKeys.JOIN_SELECT], 672 bucket_start=window_start_date, 673 bucket_end=partition_end, 674 date_column=project_col, 675 ) 676 677 return rendered_from_date, rendered_to_date, join_condition 678 679 def _render_template_query( 680 self, 681 templated: str, 682 cadence: str, 683 start_of_the_week: str, 684 query_id: str, 685 rendered_date: str, 686 filter_start_date: str, 687 filter_end_date: str, 688 filter_col: str, 689 timezone_offset: str, 690 join_condition: str, 691 partition_filter: str, 692 rendered_to_date: str, 693 ) -> str: 694 """Replace jinja templated parameters in the SQL with the actual data. 695 696 Args: 697 templated: templated sql file to process at this stage. 698 cadence: cadence to process. 699 start_of_the_week: use case start of week (MONDAY or SUNDAY). 700 query_id: gab configuration table use case identifier. 701 rendered_date: projection start date. 702 filter_start_date: filter start date to replace in the stage query. 703 filter_end_date: filter end date to replace in the stage query. 704 filter_col: use case projection date column name. 705 timezone_offset: timezone offset configured in the use case. 706 join_condition: string containing the join condition. 707 partition_filter: partition condition. 708 rendered_to_date: projection end date. 709 """ 710 return Template(templated).render( 711 cadence="'{cadence}' as cadence".format(cadence=cadence), 712 cadence_run=cadence, 713 week_start=start_of_the_week, 714 query_id="'{query_id}' as query_id".format(query_id=query_id), 715 project_date_column=rendered_date, 716 target_table=self.spec.target_table, 717 database=self.spec.source_database, 718 start_date=filter_start_date, 719 end_date=filter_end_date, 720 filter_date_column=filter_col, 721 offset_value=timezone_offset, 722 joins=join_condition if join_condition else "", 723 partition_filter=partition_filter, 724 to_date=rendered_to_date, 725 ) 726 727 def _create_stage_view( 728 self, 729 rendered_template: str, 730 stage: dict, 731 window_start_date: str, 732 window_end_date: str, 733 query_id: str, 734 query_label: str, 735 cadence: str, 736 stage_file_path: str, 737 ) -> str: 738 """Create each use case stage view. 739 740 Each stage has a specific order and refer to a specific SQL to be executed. 741 742 Args: 743 rendered_template: rendered stage SQL file. 744 stage: stage to process. 745 window_start_date: start date for the configured stage. 746 window_end_date: end date for the configured stage. 747 query_id: gab configuration table use case identifier. 748 query_label: gab configuration table use case name. 749 cadence: cadence to process. 750 stage_file_path: full stage file path (gab path + stage path). 751 """ 752 run_start_time = datetime.now() 753 creation_status: str 754 error_message: Union[Exception, str] 755 756 try: 757 tmp = ExecEnv.SESSION.sql(rendered_template) 758 num_partitions = ExecEnv.SESSION.conf.get( 759 self._SPARK_DEFAULT_PARALLELISM_CONFIG, 760 self._SPARK_DEFAULT_PARALLELISM_VALUE, 761 ) 762 763 if stage["repartition"]: 764 if stage["repartition"].get("numPartitions"): 765 num_partitions = stage["repartition"]["numPartitions"] 766 767 if stage["repartition"].get("keys"): 768 tmp = tmp.repartition( 769 int(num_partitions), *stage["repartition"]["keys"] 770 ) 771 self._LOGGER.info("Repartitioned on given Key(s)") 772 else: 773 tmp = tmp.repartition(int(num_partitions)) 774 self._LOGGER.info("Repartitioned on given partition count") 775 776 temp_step_view_name: str = stage["table_alias"] 777 tmp.createOrReplaceTempView(temp_step_view_name) 778 779 if stage["storage_level"]: 780 ExecEnv.SESSION.sql( 781 "CACHE TABLE {tbl} " 782 "OPTIONS ('storageLevel' '{type}')".format( 783 tbl=temp_step_view_name, 784 type=stage["storage_level"], 785 ) 786 ) 787 ExecEnv.SESSION.sql( 788 "SELECT COUNT(*) FROM {tbl}".format( # nosec: B608 789 tbl=temp_step_view_name 790 ) 791 ) 792 self._LOGGER.info(f"Cached stage view - {temp_step_view_name} ") 793 794 creation_status = "Success" 795 error_message = "NA" 796 except Exception as err: 797 creation_status = "Failed" 798 error_message = err 799 raise err 800 finally: 801 run_end_time = datetime.now() 802 GABUtils().logger( 803 run_start_time, 804 run_end_time, 805 window_start_date, 806 window_end_date, 807 query_id, 808 query_label, 809 cadence, 810 stage_file_path, 811 rendered_template, 812 creation_status, 813 error_message, 814 self.spec.target_database, 815 ) 816 817 return temp_step_view_name 818 819 def _generate_view_statement( 820 self, 821 query_id: str, 822 cadence: str, 823 temp_stage_view_name: str, 824 lookup_query_builder: DataFrame, 825 window_start_date: str, 826 window_end_date: str, 827 query_label: str, 828 ) -> bool: 829 """Feed use case data to the insights table (default: unified use case table). 830 831 Args: 832 query_id: gab configuration table use case identifier. 833 cadence: cadence to process. 834 temp_stage_view_name: name of the temp view generated by the stage. 835 lookup_query_builder: gab configuration data. 836 window_start_date: start date for the configured stage. 837 window_end_date: end date for the configured stage. 838 query_label: gab configuration table use case name. 839 """ 840 run_start_time = datetime.now() 841 creation_status: str 842 error_message: Union[Exception, str] 843 844 GABDeleteGenerator( 845 query_id=query_id, 846 cadence=cadence, 847 temp_stage_view_name=temp_stage_view_name, 848 lookup_query_builder=lookup_query_builder, 849 target_database=self.spec.target_database, 850 target_table=self.spec.target_table, 851 ).generate_sql() 852 853 gen_ins = GABInsertGenerator( 854 query_id=query_id, 855 cadence=cadence, 856 final_stage_table=temp_stage_view_name, 857 lookup_query_builder=lookup_query_builder, 858 target_database=self.spec.target_database, 859 target_table=self.spec.target_table, 860 ).generate_sql() 861 try: 862 ExecEnv.SESSION.sql(gen_ins) 863 864 creation_status = "Success" 865 error_message = "NA" 866 inserted = True 867 except Exception as err: 868 creation_status = "Failed" 869 error_message = err 870 raise 871 finally: 872 run_end_time = datetime.now() 873 GABUtils().logger( 874 run_start_time, 875 run_end_time, 876 window_start_date, 877 window_end_date, 878 query_id, 879 query_label, 880 cadence, 881 "Final Insert", 882 gen_ins, 883 creation_status, 884 error_message, 885 self.spec.target_database, 886 ) 887 888 return inserted 889 890 @classmethod 891 def _unpersist_cached_views(cls, unpersist_list: list[str]) -> None: 892 """Unpersist cached views. 893 894 Args: 895 unpersist_list: list containing the view names to unpersist. 896 """ 897 [ 898 ExecEnv.SESSION.sql("UNCACHE TABLE {tbl}".format(tbl=i)) 899 for i in unpersist_list 900 ] 901 902 def _generate_ddl( 903 self, 904 latest_config_date: datetime, 905 latest_run_date: datetime, 906 query_id: str, 907 lookup_query_builder: DataFrame, 908 ) -> None: 909 """Generate the actual gold asset. 910 911 It will create and return the view containing all specified dimensions, metrics 912 and computed metric for each cadence/reconciliation window. 913 914 Args: 915 latest_config_date: latest use case configuration date. 916 latest_run_date: latest use case run date. 917 query_id: gab configuration table use case identifier. 918 lookup_query_builder: gab configuration data. 919 """ 920 if str(latest_config_date) > str(latest_run_date): 921 GABViewManager( 922 query_id=query_id, 923 lookup_query_builder=lookup_query_builder, 924 target_database=self.spec.target_database, 925 target_table=self.spec.target_table, 926 ).generate_use_case_views() 927 else: 928 self._LOGGER.info( 929 "View is not being re-created as there are no changes in the " 930 "configuration after the latest run" 931 )
Class representing the gold asset builder.
GAB(acon: dict)
43 def __init__(self, acon: dict): 44 """Construct GAB instances. 45 46 Args: 47 acon: algorithm configuration. 48 """ 49 self.spec: GABSpec = GABSpec.create_from_acon(acon=acon)
Construct GAB instances.
Arguments:
- acon: algorithm configuration.
def
execute(self) -> None:
51 def execute(self) -> None: 52 """Execute the Gold Asset Builder.""" 53 self._LOGGER.info(f"Reading {self.spec.lookup_table} as lkp_query_builder") 54 lookup_query_builder_df = ExecEnv.SESSION.read.table(self.spec.lookup_table) 55 ExecEnv.SESSION.read.table(self.spec.calendar_table).createOrReplaceTempView( 56 "df_cal" 57 ) 58 self._LOGGER.info(f"Generating calendar from {self.spec.calendar_table}") 59 60 query_label = self.spec.query_label_filter 61 queue = self.spec.queue_filter 62 cadence = self.spec.cadence_filter 63 64 self._LOGGER.info(f"Query Label Filter {query_label}") 65 self._LOGGER.info(f"Queue Filter {queue}") 66 self._LOGGER.info(f"Cadence Filter {cadence}") 67 68 gab_path = self.spec.gab_base_path 69 self._LOGGER.info(f"Gab Base Path {gab_path}") 70 71 lookup_query_builder_df = lookup_query_builder_df.filter( 72 ( 73 (lookup_query_builder_df.query_label.isin(query_label)) 74 & (lookup_query_builder_df.queue.isin(queue)) 75 & (lookup_query_builder_df.is_active != lit("N")) 76 ) 77 ) 78 79 lookup_query_builder_df.cache() 80 81 for use_case in lookup_query_builder_df.collect(): 82 self._process_use_case( 83 use_case=use_case, 84 lookup_query_builder=lookup_query_builder_df, 85 selected_cadences=cadence, 86 gab_path=gab_path, 87 ) 88 89 lookup_query_builder_df.unpersist()
Execute the Gold Asset Builder.