lakehouse_engine.utils.gab_utils
Module to define GAB Utility classes.
1"""Module to define GAB Utility classes.""" 2 3import ast 4import calendar 5import json 6from datetime import datetime 7from typing import Optional, Union 8 9import pendulum 10from pyspark.sql import DataFrame 11from pyspark.sql.functions import col, lit, struct, to_json 12 13from lakehouse_engine.core.definitions import GABCadence, GABDefaults 14from lakehouse_engine.core.exec_env import ExecEnv 15from lakehouse_engine.utils.logging_handler import LoggingHandler 16 17 18class GABUtils(object): 19 """Class containing utility functions for GAB.""" 20 21 _LOGGER = LoggingHandler(__name__).get_logger() 22 23 def logger( 24 self, 25 run_start_time: datetime, 26 run_end_time: datetime, 27 start: str, 28 end: str, 29 query_id: str, 30 query_label: str, 31 cadence: str, 32 stage_file_path: str, 33 query: str, 34 status: str, 35 error_message: Union[Exception, str], 36 target_database: str, 37 ) -> None: 38 """Store the execution of each stage in the log events table. 39 40 Args: 41 run_start_time: execution start time. 42 run_end_time: execution end time. 43 start: use case start date. 44 end: use case end date. 45 query_id: gab configuration table use case identifier. 46 query_label: gab configuration table use case name. 47 cadence: cadence to process. 48 stage_file_path: stage file path. 49 query: query to execute. 50 status: status of the query execution. 51 error_message: error message if present. 52 target_database: target database to write. 53 """ 54 ins = """ 55 INSERT INTO {database}.gab_log_events 56 VALUES ( 57 '{run_start_time}', 58 '{run_end_time}', 59 '{start}', 60 '{end}', 61 {query_id}, 62 '{query_label}', 63 '{cadence}', 64 '{stage_file_path}', 65 '{query}', 66 '{status}', 67 '{error_message}' 68 )""".format( # nosec: B608 69 database=target_database, 70 run_start_time=run_start_time, 71 run_end_time=run_end_time, 72 start=start, 73 end=end, 74 query_id=query_id, 75 query_label=query_label, 76 cadence=cadence, 77 stage_file_path=stage_file_path, 78 query=self._escape_quote(query), 79 status=status, 80 error_message=( 81 self._escape_quote(str(error_message)) 82 if status == "Failed" 83 else error_message 84 ), 85 ) 86 87 ExecEnv.SESSION.sql(ins) 88 89 @classmethod 90 def _escape_quote(cls, to_escape: str) -> str: 91 """Escape quote on string. 92 93 Args: 94 to_escape: string to escape. 95 """ 96 return to_escape.replace("'", r"\'").replace('"', r"\"") 97 98 @classmethod 99 def get_json_column_as_dict( 100 cls, lookup_query_builder: DataFrame, query_id: str, query_column: str 101 ) -> dict: # type: ignore 102 """Get JSON column as dictionary. 103 104 Args: 105 lookup_query_builder: gab configuration data. 106 query_id: gab configuration table use case identifier. 107 query_column: column to get as json. 108 """ 109 column_df = lookup_query_builder.filter( 110 col("query_id") == lit(query_id) 111 ).select(col(query_column)) 112 113 column_df_json = column_df.select( 114 to_json(struct([column_df[x] for x in column_df.columns])) 115 ).collect()[0][0] 116 117 json_column = json.loads(column_df_json) 118 119 for mapping in json_column.values(): 120 column_as_json = ast.literal_eval(mapping) 121 122 return column_as_json # type: ignore 123 124 @classmethod 125 def extract_columns_from_mapping( 126 cls, 127 columns: dict, 128 is_dimension: bool, 129 extract_column_without_alias: bool = False, 130 table_alias: Optional[str] = None, 131 is_extracted_value_as_name: bool = True, 132 ) -> Union[tuple[list[str], list[str]], list[str]]: 133 """Extract and transform columns to SQL select statement. 134 135 Args: 136 columns: data to extract the columns. 137 is_dimension: flag identifying if is a dimension or a metric. 138 extract_column_without_alias: flag to inform if it's to extract columns 139 without aliases. 140 table_alias: name or alias from the source table. 141 is_extracted_value_as_name: identify if the extracted value is the 142 column name. 143 """ 144 column_with_alias = ( 145 "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}" 146 ) 147 column_without_alias = ( 148 "".join([table_alias, ".", "{}"]) if table_alias else "{}" 149 ) 150 151 extracted_columns_with_alias = [] 152 extracted_columns_without_alias = [] 153 for column_name, column_value in columns.items(): 154 if extract_column_without_alias: 155 extracted_column_without_alias = column_without_alias.format( 156 cls._get_column_format_without_alias( 157 is_dimension, 158 column_name, 159 column_value, 160 is_extracted_value_as_name, 161 ) 162 ) 163 extracted_columns_without_alias.append(extracted_column_without_alias) 164 165 extracted_column_with_alias = column_with_alias.format( 166 *cls._extract_column_with_alias( 167 is_dimension, 168 column_name, 169 column_value, 170 is_extracted_value_as_name, 171 ) 172 ) 173 extracted_columns_with_alias.append(extracted_column_with_alias) 174 175 return ( 176 (extracted_columns_with_alias, extracted_columns_without_alias) 177 if extract_column_without_alias 178 else extracted_columns_with_alias 179 ) 180 181 @classmethod 182 def _extract_column_with_alias( 183 cls, 184 is_dimension: bool, 185 column_name: str, 186 column_value: Union[str, dict], 187 is_extracted_value_as_name: bool = True, 188 ) -> tuple[str, str]: 189 """Extract column name with alias. 190 191 Args: 192 is_dimension: flag indicating if the column is a dimension. 193 column_name: name of the column. 194 column_value: value of the column. 195 is_extracted_value_as_name: flag indicating if the name of the column is the 196 extracted value. 197 """ 198 extracted_value = ( 199 column_value 200 if is_dimension 201 else (column_value["metric_name"]) # type: ignore 202 ) 203 204 return ( 205 (extracted_value, column_name) # type: ignore 206 if is_extracted_value_as_name 207 else (column_name, extracted_value) 208 ) 209 210 @classmethod 211 def _get_column_format_without_alias( 212 cls, 213 is_dimension: bool, 214 column_name: str, 215 column_value: Union[str, dict], 216 is_extracted_value_as_name: bool = True, 217 ) -> str: 218 """Extract column name without alias. 219 220 Args: 221 is_dimension: flag indicating if the column is a dimension. 222 column_name: name of the column. 223 column_value: value of the column. 224 is_extracted_value_as_name: flag indicating if the name of the column is the 225 extracted value. 226 """ 227 extracted_value: str = ( 228 column_value 229 if is_dimension 230 else (column_value["metric_name"]) # type: ignore 231 ) 232 233 return extracted_value if is_extracted_value_as_name else column_name 234 235 @classmethod 236 def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict: 237 """A dictionary that corresponds to the conclusion of a cadence. 238 239 Any end date inputted by the user we check this end date is actually end of 240 a cadence (YEAR, QUARTER, MONTH, WEEK). 241 If the user input is 2024-03-31 this is a month end and a quarter end that 242 means any use cases configured as month or quarter need to be calculated. 243 244 Args: 245 end_date: base end date. 246 """ 247 init_end_date_dict = {} 248 249 expected_end_cadence_date = pendulum.datetime( 250 int(end_date.strftime("%Y")), 251 int(end_date.strftime("%m")), 252 int(end_date.strftime("%d")), 253 ).replace(tzinfo=None) 254 255 # Validating YEAR cadence 256 if end_date == expected_end_cadence_date.last_of("year"): 257 init_end_date_dict["YEAR"] = "N" 258 259 # Validating QUARTER cadence 260 if end_date == expected_end_cadence_date.last_of("quarter"): 261 init_end_date_dict["QUARTER"] = "N" 262 263 # Validating MONTH cadence 264 if end_date == datetime( 265 int(end_date.strftime("%Y")), 266 int(end_date.strftime("%m")), 267 calendar.monthrange( 268 int(end_date.strftime("%Y")), int(end_date.strftime("%m")) 269 )[1], 270 ): 271 init_end_date_dict["MONTH"] = "N" 272 273 # Validating WEEK cadence 274 if end_date == expected_end_cadence_date.end_of("week").replace( 275 hour=0, minute=0, second=0, microsecond=0 276 ): 277 init_end_date_dict["WEEK"] = "N" 278 279 init_end_date_dict["DAY"] = "N" 280 281 return init_end_date_dict 282 283 def get_reconciliation_cadences( 284 self, 285 cadence: str, 286 selected_reconciliation_window: dict, 287 cadence_configuration_at_end_date: dict, 288 rerun_flag: str, 289 ) -> dict: 290 """Get reconciliation cadences based on the use case configuration. 291 292 Args: 293 cadence: cadence to process. 294 selected_reconciliation_window: configured use case reconciliation window. 295 cadence_configuration_at_end_date: cadences to execute at the end date. 296 rerun_flag: flag indicating if it's a rerun or a normal run. 297 """ 298 configured_cadences = self._get_configured_cadences_by_snapshot( 299 cadence, selected_reconciliation_window, cadence_configuration_at_end_date 300 ) 301 302 return self._get_cadences_to_execute( 303 configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag 304 ) 305 306 @classmethod 307 def _get_cadences_to_execute( 308 cls, 309 configured_cadences: dict, 310 cadence: str, 311 cadence_configuration_at_end_date: dict, 312 rerun_flag: str, 313 ) -> dict: 314 """Get cadences to execute. 315 316 Args: 317 cadence: cadence to process. 318 configured_cadences: configured use case reconciliation window. 319 cadence_configuration_at_end_date: cadences to execute at the end date. 320 rerun_flag: flag indicating if it's a rerun or a normal run. 321 """ 322 cadences_to_execute = {} 323 cad_order = GABCadence.get_ordered_cadences() 324 325 for snapshot_cadence, snapshot_flag in configured_cadences.items(): 326 if ( 327 (cad_order[cadence] > cad_order[snapshot_cadence]) 328 and (rerun_flag == "Y") 329 ) or snapshot_cadence in cadence_configuration_at_end_date: 330 cadences_to_execute[snapshot_cadence] = snapshot_flag 331 elif snapshot_cadence not in cadence_configuration_at_end_date: 332 continue 333 334 return cls._sort_cadences_to_execute(cadences_to_execute, cad_order) 335 336 @classmethod 337 def _sort_cadences_to_execute( 338 cls, cadences_to_execute: dict, cad_order: dict 339 ) -> dict: 340 """Sort the cadences to execute. 341 342 Args: 343 cadences_to_execute: cadences to execute. 344 cad_order: all cadences with order. 345 """ 346 # ordering it because when grouping cadences with snapshot and without snapshot 347 # can impact the cadence ordering. 348 sorted_cadences_to_execute: dict = dict( 349 sorted( 350 cadences_to_execute.items(), 351 key=lambda item: cad_order.get(item[0]), # type: ignore 352 ) 353 ) 354 # ordering cadences to execute it from bigger (YEAR) to smaller (DAY) 355 cadences_to_execute_items = [] 356 357 for cadence_name, cadence_value in sorted_cadences_to_execute.items(): 358 cadences_to_execute_items.append((cadence_name, cadence_value)) 359 360 cadences_sorted_by_bigger_cadence_to_execute: dict = dict( 361 reversed(cadences_to_execute_items) 362 ) 363 364 return cadences_sorted_by_bigger_cadence_to_execute 365 366 @classmethod 367 def _get_configured_cadences_by_snapshot( 368 cls, 369 cadence: str, 370 selected_reconciliation_window: dict, 371 cadence_configuration_at_end_date: dict, 372 ) -> dict: 373 """Get configured cadences to execute. 374 375 Args: 376 cadence: selected cadence. 377 selected_reconciliation_window: configured use case reconciliation window. 378 cadence_configuration_at_end_date: cadences to execute at the end date. 379 380 Returns: 381 Each cadence with the corresponding information if it's to execute with 382 snapshot or not. 383 """ 384 cadences_by_snapshot = {} 385 386 ( 387 no_snapshot_cadences, 388 snapshot_cadences, 389 ) = cls._generate_reconciliation_by_snapshot( 390 cadence, selected_reconciliation_window 391 ) 392 393 for snapshot_cadence, snapshot_flag in no_snapshot_cadences.items(): 394 if snapshot_cadence in cadence_configuration_at_end_date: 395 cadences_by_snapshot[snapshot_cadence] = snapshot_flag 396 397 cls._LOGGER.info(f"{snapshot_cadence} is present in {cadence} cadence") 398 break 399 400 cadences_by_snapshot.update(snapshot_cadences) 401 402 if (not cadences_by_snapshot) and ( 403 cadence in cadence_configuration_at_end_date 404 ): 405 cadences_by_snapshot[cadence] = "N" 406 407 return cadences_by_snapshot 408 409 @classmethod 410 def _generate_reconciliation_by_snapshot( 411 cls, cadence: str, selected_reconciliation_window: dict 412 ) -> tuple[dict, dict]: 413 """Generate reconciliation by snapshot. 414 415 Args: 416 cadence: cadence to process. 417 selected_reconciliation_window: configured use case reconciliation window. 418 """ 419 cadence_snapshot_configuration = {cadence: "N"} 420 for cadence in GABCadence.get_cadences(): 421 cls._add_cadence_snapshot_to_cadence_snapshot_config( 422 cadence, selected_reconciliation_window, cadence_snapshot_configuration 423 ) 424 cadence_snapshot_configuration = dict( 425 sorted( 426 cadence_snapshot_configuration.items(), 427 key=( 428 lambda item: GABCadence.get_ordered_cadences().get( # type: ignore 429 item[0] 430 ) 431 ), 432 ) 433 ) 434 435 cadence_snapshot_configuration = dict( 436 reversed(list(cadence_snapshot_configuration.items())) 437 ) 438 439 cadences_without_snapshot = { 440 key: value 441 for key, value in cadence_snapshot_configuration.items() 442 if value == "N" 443 } 444 445 cadences_with_snapshot = { 446 key: value 447 for key, value in cadence_snapshot_configuration.items() 448 if value == "Y" 449 } 450 451 return cadences_with_snapshot, cadences_without_snapshot 452 453 @classmethod 454 def _add_cadence_snapshot_to_cadence_snapshot_config( 455 cls, 456 cadence: str, 457 selected_reconciliation_window: dict, 458 cadence_snapshot_configuration: dict, 459 ) -> None: 460 """Add the selected reconciliation to cadence snapshot configuration. 461 462 Args: 463 cadence: selected cadence. 464 selected_reconciliation_window: configured use case reconciliation window. 465 cadence_snapshot_configuration: cadence snapshot configuration dictionary 466 who will be updated with the new value. 467 """ 468 if cadence in selected_reconciliation_window: 469 cadence_snapshot_configuration[cadence] = selected_reconciliation_window[ 470 cadence 471 ]["snapshot"] 472 473 @classmethod 474 def format_datetime_to_default(cls, date_to_format: datetime) -> str: 475 """Format datetime to GAB default format. 476 477 Args: 478 date_to_format: date to format. 479 """ 480 return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value) 481 482 483class GABPartitionUtils(object): 484 """Class to extract a partition based in a date period.""" 485 486 _LOGGER = LoggingHandler(__name__).get_logger() 487 488 @classmethod 489 def get_years(cls, start_date: str, end_date: str) -> list[str]: 490 """Return a list of distinct years from the input parameters. 491 492 Args: 493 start_date: start of the period. 494 end_date: end of the period. 495 """ 496 year = [] 497 if start_date > end_date: 498 raise ValueError( 499 "Input Error: Invalid start_date and end_date. " 500 "Start_date is greater than end_date" 501 ) 502 503 for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1): 504 year.append(str(i)) 505 506 return year 507 508 @classmethod 509 def get_partition_condition(cls, start_date: str, end_date: str) -> str: 510 """Return year,month and day partition statement from the input parameters. 511 512 Args: 513 start_date: start of the period. 514 end_date: end of the period. 515 """ 516 years = cls.get_years(start_date, end_date) 517 if len(years) > 1: 518 partition_condition = cls._get_multiple_years_partition( 519 start_date, end_date, years 520 ) 521 else: 522 partition_condition = cls._get_single_year_partition(start_date, end_date) 523 return partition_condition 524 525 @classmethod 526 def _get_multiple_years_partition( 527 cls, start_date: str, end_date: str, years: list[str] 528 ) -> str: 529 """Return partition when executing multiple years (>1). 530 531 Args: 532 start_date: start of the period. 533 end_date: end of the period. 534 years: list of years. 535 """ 536 start_date_month = cls._extract_date_part_from_date("MONTH", start_date) 537 start_date_day = cls._extract_date_part_from_date("DAY", start_date) 538 539 end_date_month = cls._extract_date_part_from_date("MONTH", end_date) 540 end_date_day = cls._extract_date_part_from_date("DAY", end_date) 541 542 year_statement = "(year = {0} and (".format(years[0]) + "{})" 543 if start_date_month != "12": 544 start_date_partition = year_statement.format( 545 "(month = {0} and day between {1} and 31)".format( 546 start_date_month, start_date_day 547 ) 548 + " or (month between {0} and 12)".format(int(start_date_month) + 1) 549 ) 550 else: 551 start_date_partition = year_statement.format( 552 "month = {0} and day between {1} and 31".format( 553 start_date_month, start_date_day 554 ) 555 ) 556 557 period_years_partition = "" 558 559 if len(years) == 3: 560 period_years_partition = ") or (year = {0}".format(years[1]) 561 elif len(years) > 3: 562 period_years_partition = ") or (year between {0} and {1})".format( 563 years[1], years[-2] 564 ) 565 566 if end_date_month != "01": 567 end_date_partition = ( 568 ") or (year = {0} and ((month between 01 and {1})".format( 569 years[-1], int(end_date_month) - 1 570 ) 571 + " or (month = {0} and day between 1 and {1})))".format( 572 end_date_month, end_date_day 573 ) 574 ) 575 else: 576 end_date_partition = ( 577 ") or (year = {0} and month = 1 and day between 01 and {1})".format( 578 years[-1], end_date_day 579 ) 580 ) 581 partition_condition = ( 582 start_date_partition + period_years_partition + end_date_partition 583 ) 584 585 return partition_condition 586 587 @classmethod 588 def _get_single_year_partition(cls, start_date: str, end_date: str) -> str: 589 """Return partition when executing a single year. 590 591 Args: 592 start_date: start of the period. 593 end_date: end of the period. 594 """ 595 start_date_year = cls._extract_date_part_from_date("YEAR", start_date) 596 start_date_month = cls._extract_date_part_from_date("MONTH", start_date) 597 start_date_day = cls._extract_date_part_from_date("DAY", start_date) 598 599 end_date_year = cls._extract_date_part_from_date("YEAR", end_date) 600 end_date_month = cls._extract_date_part_from_date("MONTH", end_date) 601 end_date_day = cls._extract_date_part_from_date("DAY", end_date) 602 603 if start_date_month != end_date_month: 604 months = [] 605 for i in range(int(start_date_month), int(end_date_month) + 1): 606 months.append(i) 607 608 start_date_partition = ( 609 "year = {0} and ((month={1} and day between {2} and 31)".format( 610 start_date_year, months[0], start_date_day 611 ) 612 ) 613 period_years_partition = "" 614 if len(months) == 2: 615 period_years_partition = start_date_partition 616 elif len(months) == 3: 617 period_years_partition = ( 618 start_date_partition + " or (month = {0})".format(months[1]) 619 ) 620 elif len(months) > 3: 621 period_years_partition = ( 622 start_date_partition 623 + " or (month between {0} and {1})".format(months[1], months[-2]) 624 ) 625 partition_condition = ( 626 period_years_partition 627 + " or (month = {0} and day between 1 and {1}))".format( 628 end_date_month, end_date_day 629 ) 630 ) 631 else: 632 partition_condition = ( 633 "year = {0} and month = {1} and day between {2} and {3}".format( 634 end_date_year, end_date_month, start_date_day, end_date_day 635 ) 636 ) 637 638 return partition_condition 639 640 @classmethod 641 def _extract_date_part_from_date(cls, part: str, date: str) -> str: 642 """Extract date part from string date. 643 644 Args: 645 part: date part (possible values: DAY, MONTH, YEAR) 646 date: string date. 647 """ 648 if "DAY" == part.upper(): 649 return date[8:10] 650 elif "MONTH" == part.upper(): 651 return date[5:7] 652 else: 653 return date[0:4]
19class GABUtils(object): 20 """Class containing utility functions for GAB.""" 21 22 _LOGGER = LoggingHandler(__name__).get_logger() 23 24 def logger( 25 self, 26 run_start_time: datetime, 27 run_end_time: datetime, 28 start: str, 29 end: str, 30 query_id: str, 31 query_label: str, 32 cadence: str, 33 stage_file_path: str, 34 query: str, 35 status: str, 36 error_message: Union[Exception, str], 37 target_database: str, 38 ) -> None: 39 """Store the execution of each stage in the log events table. 40 41 Args: 42 run_start_time: execution start time. 43 run_end_time: execution end time. 44 start: use case start date. 45 end: use case end date. 46 query_id: gab configuration table use case identifier. 47 query_label: gab configuration table use case name. 48 cadence: cadence to process. 49 stage_file_path: stage file path. 50 query: query to execute. 51 status: status of the query execution. 52 error_message: error message if present. 53 target_database: target database to write. 54 """ 55 ins = """ 56 INSERT INTO {database}.gab_log_events 57 VALUES ( 58 '{run_start_time}', 59 '{run_end_time}', 60 '{start}', 61 '{end}', 62 {query_id}, 63 '{query_label}', 64 '{cadence}', 65 '{stage_file_path}', 66 '{query}', 67 '{status}', 68 '{error_message}' 69 )""".format( # nosec: B608 70 database=target_database, 71 run_start_time=run_start_time, 72 run_end_time=run_end_time, 73 start=start, 74 end=end, 75 query_id=query_id, 76 query_label=query_label, 77 cadence=cadence, 78 stage_file_path=stage_file_path, 79 query=self._escape_quote(query), 80 status=status, 81 error_message=( 82 self._escape_quote(str(error_message)) 83 if status == "Failed" 84 else error_message 85 ), 86 ) 87 88 ExecEnv.SESSION.sql(ins) 89 90 @classmethod 91 def _escape_quote(cls, to_escape: str) -> str: 92 """Escape quote on string. 93 94 Args: 95 to_escape: string to escape. 96 """ 97 return to_escape.replace("'", r"\'").replace('"', r"\"") 98 99 @classmethod 100 def get_json_column_as_dict( 101 cls, lookup_query_builder: DataFrame, query_id: str, query_column: str 102 ) -> dict: # type: ignore 103 """Get JSON column as dictionary. 104 105 Args: 106 lookup_query_builder: gab configuration data. 107 query_id: gab configuration table use case identifier. 108 query_column: column to get as json. 109 """ 110 column_df = lookup_query_builder.filter( 111 col("query_id") == lit(query_id) 112 ).select(col(query_column)) 113 114 column_df_json = column_df.select( 115 to_json(struct([column_df[x] for x in column_df.columns])) 116 ).collect()[0][0] 117 118 json_column = json.loads(column_df_json) 119 120 for mapping in json_column.values(): 121 column_as_json = ast.literal_eval(mapping) 122 123 return column_as_json # type: ignore 124 125 @classmethod 126 def extract_columns_from_mapping( 127 cls, 128 columns: dict, 129 is_dimension: bool, 130 extract_column_without_alias: bool = False, 131 table_alias: Optional[str] = None, 132 is_extracted_value_as_name: bool = True, 133 ) -> Union[tuple[list[str], list[str]], list[str]]: 134 """Extract and transform columns to SQL select statement. 135 136 Args: 137 columns: data to extract the columns. 138 is_dimension: flag identifying if is a dimension or a metric. 139 extract_column_without_alias: flag to inform if it's to extract columns 140 without aliases. 141 table_alias: name or alias from the source table. 142 is_extracted_value_as_name: identify if the extracted value is the 143 column name. 144 """ 145 column_with_alias = ( 146 "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}" 147 ) 148 column_without_alias = ( 149 "".join([table_alias, ".", "{}"]) if table_alias else "{}" 150 ) 151 152 extracted_columns_with_alias = [] 153 extracted_columns_without_alias = [] 154 for column_name, column_value in columns.items(): 155 if extract_column_without_alias: 156 extracted_column_without_alias = column_without_alias.format( 157 cls._get_column_format_without_alias( 158 is_dimension, 159 column_name, 160 column_value, 161 is_extracted_value_as_name, 162 ) 163 ) 164 extracted_columns_without_alias.append(extracted_column_without_alias) 165 166 extracted_column_with_alias = column_with_alias.format( 167 *cls._extract_column_with_alias( 168 is_dimension, 169 column_name, 170 column_value, 171 is_extracted_value_as_name, 172 ) 173 ) 174 extracted_columns_with_alias.append(extracted_column_with_alias) 175 176 return ( 177 (extracted_columns_with_alias, extracted_columns_without_alias) 178 if extract_column_without_alias 179 else extracted_columns_with_alias 180 ) 181 182 @classmethod 183 def _extract_column_with_alias( 184 cls, 185 is_dimension: bool, 186 column_name: str, 187 column_value: Union[str, dict], 188 is_extracted_value_as_name: bool = True, 189 ) -> tuple[str, str]: 190 """Extract column name with alias. 191 192 Args: 193 is_dimension: flag indicating if the column is a dimension. 194 column_name: name of the column. 195 column_value: value of the column. 196 is_extracted_value_as_name: flag indicating if the name of the column is the 197 extracted value. 198 """ 199 extracted_value = ( 200 column_value 201 if is_dimension 202 else (column_value["metric_name"]) # type: ignore 203 ) 204 205 return ( 206 (extracted_value, column_name) # type: ignore 207 if is_extracted_value_as_name 208 else (column_name, extracted_value) 209 ) 210 211 @classmethod 212 def _get_column_format_without_alias( 213 cls, 214 is_dimension: bool, 215 column_name: str, 216 column_value: Union[str, dict], 217 is_extracted_value_as_name: bool = True, 218 ) -> str: 219 """Extract column name without alias. 220 221 Args: 222 is_dimension: flag indicating if the column is a dimension. 223 column_name: name of the column. 224 column_value: value of the column. 225 is_extracted_value_as_name: flag indicating if the name of the column is the 226 extracted value. 227 """ 228 extracted_value: str = ( 229 column_value 230 if is_dimension 231 else (column_value["metric_name"]) # type: ignore 232 ) 233 234 return extracted_value if is_extracted_value_as_name else column_name 235 236 @classmethod 237 def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict: 238 """A dictionary that corresponds to the conclusion of a cadence. 239 240 Any end date inputted by the user we check this end date is actually end of 241 a cadence (YEAR, QUARTER, MONTH, WEEK). 242 If the user input is 2024-03-31 this is a month end and a quarter end that 243 means any use cases configured as month or quarter need to be calculated. 244 245 Args: 246 end_date: base end date. 247 """ 248 init_end_date_dict = {} 249 250 expected_end_cadence_date = pendulum.datetime( 251 int(end_date.strftime("%Y")), 252 int(end_date.strftime("%m")), 253 int(end_date.strftime("%d")), 254 ).replace(tzinfo=None) 255 256 # Validating YEAR cadence 257 if end_date == expected_end_cadence_date.last_of("year"): 258 init_end_date_dict["YEAR"] = "N" 259 260 # Validating QUARTER cadence 261 if end_date == expected_end_cadence_date.last_of("quarter"): 262 init_end_date_dict["QUARTER"] = "N" 263 264 # Validating MONTH cadence 265 if end_date == datetime( 266 int(end_date.strftime("%Y")), 267 int(end_date.strftime("%m")), 268 calendar.monthrange( 269 int(end_date.strftime("%Y")), int(end_date.strftime("%m")) 270 )[1], 271 ): 272 init_end_date_dict["MONTH"] = "N" 273 274 # Validating WEEK cadence 275 if end_date == expected_end_cadence_date.end_of("week").replace( 276 hour=0, minute=0, second=0, microsecond=0 277 ): 278 init_end_date_dict["WEEK"] = "N" 279 280 init_end_date_dict["DAY"] = "N" 281 282 return init_end_date_dict 283 284 def get_reconciliation_cadences( 285 self, 286 cadence: str, 287 selected_reconciliation_window: dict, 288 cadence_configuration_at_end_date: dict, 289 rerun_flag: str, 290 ) -> dict: 291 """Get reconciliation cadences based on the use case configuration. 292 293 Args: 294 cadence: cadence to process. 295 selected_reconciliation_window: configured use case reconciliation window. 296 cadence_configuration_at_end_date: cadences to execute at the end date. 297 rerun_flag: flag indicating if it's a rerun or a normal run. 298 """ 299 configured_cadences = self._get_configured_cadences_by_snapshot( 300 cadence, selected_reconciliation_window, cadence_configuration_at_end_date 301 ) 302 303 return self._get_cadences_to_execute( 304 configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag 305 ) 306 307 @classmethod 308 def _get_cadences_to_execute( 309 cls, 310 configured_cadences: dict, 311 cadence: str, 312 cadence_configuration_at_end_date: dict, 313 rerun_flag: str, 314 ) -> dict: 315 """Get cadences to execute. 316 317 Args: 318 cadence: cadence to process. 319 configured_cadences: configured use case reconciliation window. 320 cadence_configuration_at_end_date: cadences to execute at the end date. 321 rerun_flag: flag indicating if it's a rerun or a normal run. 322 """ 323 cadences_to_execute = {} 324 cad_order = GABCadence.get_ordered_cadences() 325 326 for snapshot_cadence, snapshot_flag in configured_cadences.items(): 327 if ( 328 (cad_order[cadence] > cad_order[snapshot_cadence]) 329 and (rerun_flag == "Y") 330 ) or snapshot_cadence in cadence_configuration_at_end_date: 331 cadences_to_execute[snapshot_cadence] = snapshot_flag 332 elif snapshot_cadence not in cadence_configuration_at_end_date: 333 continue 334 335 return cls._sort_cadences_to_execute(cadences_to_execute, cad_order) 336 337 @classmethod 338 def _sort_cadences_to_execute( 339 cls, cadences_to_execute: dict, cad_order: dict 340 ) -> dict: 341 """Sort the cadences to execute. 342 343 Args: 344 cadences_to_execute: cadences to execute. 345 cad_order: all cadences with order. 346 """ 347 # ordering it because when grouping cadences with snapshot and without snapshot 348 # can impact the cadence ordering. 349 sorted_cadences_to_execute: dict = dict( 350 sorted( 351 cadences_to_execute.items(), 352 key=lambda item: cad_order.get(item[0]), # type: ignore 353 ) 354 ) 355 # ordering cadences to execute it from bigger (YEAR) to smaller (DAY) 356 cadences_to_execute_items = [] 357 358 for cadence_name, cadence_value in sorted_cadences_to_execute.items(): 359 cadences_to_execute_items.append((cadence_name, cadence_value)) 360 361 cadences_sorted_by_bigger_cadence_to_execute: dict = dict( 362 reversed(cadences_to_execute_items) 363 ) 364 365 return cadences_sorted_by_bigger_cadence_to_execute 366 367 @classmethod 368 def _get_configured_cadences_by_snapshot( 369 cls, 370 cadence: str, 371 selected_reconciliation_window: dict, 372 cadence_configuration_at_end_date: dict, 373 ) -> dict: 374 """Get configured cadences to execute. 375 376 Args: 377 cadence: selected cadence. 378 selected_reconciliation_window: configured use case reconciliation window. 379 cadence_configuration_at_end_date: cadences to execute at the end date. 380 381 Returns: 382 Each cadence with the corresponding information if it's to execute with 383 snapshot or not. 384 """ 385 cadences_by_snapshot = {} 386 387 ( 388 no_snapshot_cadences, 389 snapshot_cadences, 390 ) = cls._generate_reconciliation_by_snapshot( 391 cadence, selected_reconciliation_window 392 ) 393 394 for snapshot_cadence, snapshot_flag in no_snapshot_cadences.items(): 395 if snapshot_cadence in cadence_configuration_at_end_date: 396 cadences_by_snapshot[snapshot_cadence] = snapshot_flag 397 398 cls._LOGGER.info(f"{snapshot_cadence} is present in {cadence} cadence") 399 break 400 401 cadences_by_snapshot.update(snapshot_cadences) 402 403 if (not cadences_by_snapshot) and ( 404 cadence in cadence_configuration_at_end_date 405 ): 406 cadences_by_snapshot[cadence] = "N" 407 408 return cadences_by_snapshot 409 410 @classmethod 411 def _generate_reconciliation_by_snapshot( 412 cls, cadence: str, selected_reconciliation_window: dict 413 ) -> tuple[dict, dict]: 414 """Generate reconciliation by snapshot. 415 416 Args: 417 cadence: cadence to process. 418 selected_reconciliation_window: configured use case reconciliation window. 419 """ 420 cadence_snapshot_configuration = {cadence: "N"} 421 for cadence in GABCadence.get_cadences(): 422 cls._add_cadence_snapshot_to_cadence_snapshot_config( 423 cadence, selected_reconciliation_window, cadence_snapshot_configuration 424 ) 425 cadence_snapshot_configuration = dict( 426 sorted( 427 cadence_snapshot_configuration.items(), 428 key=( 429 lambda item: GABCadence.get_ordered_cadences().get( # type: ignore 430 item[0] 431 ) 432 ), 433 ) 434 ) 435 436 cadence_snapshot_configuration = dict( 437 reversed(list(cadence_snapshot_configuration.items())) 438 ) 439 440 cadences_without_snapshot = { 441 key: value 442 for key, value in cadence_snapshot_configuration.items() 443 if value == "N" 444 } 445 446 cadences_with_snapshot = { 447 key: value 448 for key, value in cadence_snapshot_configuration.items() 449 if value == "Y" 450 } 451 452 return cadences_with_snapshot, cadences_without_snapshot 453 454 @classmethod 455 def _add_cadence_snapshot_to_cadence_snapshot_config( 456 cls, 457 cadence: str, 458 selected_reconciliation_window: dict, 459 cadence_snapshot_configuration: dict, 460 ) -> None: 461 """Add the selected reconciliation to cadence snapshot configuration. 462 463 Args: 464 cadence: selected cadence. 465 selected_reconciliation_window: configured use case reconciliation window. 466 cadence_snapshot_configuration: cadence snapshot configuration dictionary 467 who will be updated with the new value. 468 """ 469 if cadence in selected_reconciliation_window: 470 cadence_snapshot_configuration[cadence] = selected_reconciliation_window[ 471 cadence 472 ]["snapshot"] 473 474 @classmethod 475 def format_datetime_to_default(cls, date_to_format: datetime) -> str: 476 """Format datetime to GAB default format. 477 478 Args: 479 date_to_format: date to format. 480 """ 481 return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value)
Class containing utility functions for GAB.
24 def logger( 25 self, 26 run_start_time: datetime, 27 run_end_time: datetime, 28 start: str, 29 end: str, 30 query_id: str, 31 query_label: str, 32 cadence: str, 33 stage_file_path: str, 34 query: str, 35 status: str, 36 error_message: Union[Exception, str], 37 target_database: str, 38 ) -> None: 39 """Store the execution of each stage in the log events table. 40 41 Args: 42 run_start_time: execution start time. 43 run_end_time: execution end time. 44 start: use case start date. 45 end: use case end date. 46 query_id: gab configuration table use case identifier. 47 query_label: gab configuration table use case name. 48 cadence: cadence to process. 49 stage_file_path: stage file path. 50 query: query to execute. 51 status: status of the query execution. 52 error_message: error message if present. 53 target_database: target database to write. 54 """ 55 ins = """ 56 INSERT INTO {database}.gab_log_events 57 VALUES ( 58 '{run_start_time}', 59 '{run_end_time}', 60 '{start}', 61 '{end}', 62 {query_id}, 63 '{query_label}', 64 '{cadence}', 65 '{stage_file_path}', 66 '{query}', 67 '{status}', 68 '{error_message}' 69 )""".format( # nosec: B608 70 database=target_database, 71 run_start_time=run_start_time, 72 run_end_time=run_end_time, 73 start=start, 74 end=end, 75 query_id=query_id, 76 query_label=query_label, 77 cadence=cadence, 78 stage_file_path=stage_file_path, 79 query=self._escape_quote(query), 80 status=status, 81 error_message=( 82 self._escape_quote(str(error_message)) 83 if status == "Failed" 84 else error_message 85 ), 86 ) 87 88 ExecEnv.SESSION.sql(ins)
Store the execution of each stage in the log events table.
Arguments:
- run_start_time: execution start time.
- run_end_time: execution end time.
- start: use case start date.
- end: use case end date.
- query_id: gab configuration table use case identifier.
- query_label: gab configuration table use case name.
- cadence: cadence to process.
- stage_file_path: stage file path.
- query: query to execute.
- status: status of the query execution.
- error_message: error message if present.
- target_database: target database to write.
99 @classmethod 100 def get_json_column_as_dict( 101 cls, lookup_query_builder: DataFrame, query_id: str, query_column: str 102 ) -> dict: # type: ignore 103 """Get JSON column as dictionary. 104 105 Args: 106 lookup_query_builder: gab configuration data. 107 query_id: gab configuration table use case identifier. 108 query_column: column to get as json. 109 """ 110 column_df = lookup_query_builder.filter( 111 col("query_id") == lit(query_id) 112 ).select(col(query_column)) 113 114 column_df_json = column_df.select( 115 to_json(struct([column_df[x] for x in column_df.columns])) 116 ).collect()[0][0] 117 118 json_column = json.loads(column_df_json) 119 120 for mapping in json_column.values(): 121 column_as_json = ast.literal_eval(mapping) 122 123 return column_as_json # type: ignore
Get JSON column as dictionary.
Arguments:
- lookup_query_builder: gab configuration data.
- query_id: gab configuration table use case identifier.
- query_column: column to get as json.
125 @classmethod 126 def extract_columns_from_mapping( 127 cls, 128 columns: dict, 129 is_dimension: bool, 130 extract_column_without_alias: bool = False, 131 table_alias: Optional[str] = None, 132 is_extracted_value_as_name: bool = True, 133 ) -> Union[tuple[list[str], list[str]], list[str]]: 134 """Extract and transform columns to SQL select statement. 135 136 Args: 137 columns: data to extract the columns. 138 is_dimension: flag identifying if is a dimension or a metric. 139 extract_column_without_alias: flag to inform if it's to extract columns 140 without aliases. 141 table_alias: name or alias from the source table. 142 is_extracted_value_as_name: identify if the extracted value is the 143 column name. 144 """ 145 column_with_alias = ( 146 "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}" 147 ) 148 column_without_alias = ( 149 "".join([table_alias, ".", "{}"]) if table_alias else "{}" 150 ) 151 152 extracted_columns_with_alias = [] 153 extracted_columns_without_alias = [] 154 for column_name, column_value in columns.items(): 155 if extract_column_without_alias: 156 extracted_column_without_alias = column_without_alias.format( 157 cls._get_column_format_without_alias( 158 is_dimension, 159 column_name, 160 column_value, 161 is_extracted_value_as_name, 162 ) 163 ) 164 extracted_columns_without_alias.append(extracted_column_without_alias) 165 166 extracted_column_with_alias = column_with_alias.format( 167 *cls._extract_column_with_alias( 168 is_dimension, 169 column_name, 170 column_value, 171 is_extracted_value_as_name, 172 ) 173 ) 174 extracted_columns_with_alias.append(extracted_column_with_alias) 175 176 return ( 177 (extracted_columns_with_alias, extracted_columns_without_alias) 178 if extract_column_without_alias 179 else extracted_columns_with_alias 180 )
Extract and transform columns to SQL select statement.
Arguments:
- columns: data to extract the columns.
- is_dimension: flag identifying if is a dimension or a metric.
- extract_column_without_alias: flag to inform if it's to extract columns without aliases.
- table_alias: name or alias from the source table.
- is_extracted_value_as_name: identify if the extracted value is the column name.
236 @classmethod 237 def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict: 238 """A dictionary that corresponds to the conclusion of a cadence. 239 240 Any end date inputted by the user we check this end date is actually end of 241 a cadence (YEAR, QUARTER, MONTH, WEEK). 242 If the user input is 2024-03-31 this is a month end and a quarter end that 243 means any use cases configured as month or quarter need to be calculated. 244 245 Args: 246 end_date: base end date. 247 """ 248 init_end_date_dict = {} 249 250 expected_end_cadence_date = pendulum.datetime( 251 int(end_date.strftime("%Y")), 252 int(end_date.strftime("%m")), 253 int(end_date.strftime("%d")), 254 ).replace(tzinfo=None) 255 256 # Validating YEAR cadence 257 if end_date == expected_end_cadence_date.last_of("year"): 258 init_end_date_dict["YEAR"] = "N" 259 260 # Validating QUARTER cadence 261 if end_date == expected_end_cadence_date.last_of("quarter"): 262 init_end_date_dict["QUARTER"] = "N" 263 264 # Validating MONTH cadence 265 if end_date == datetime( 266 int(end_date.strftime("%Y")), 267 int(end_date.strftime("%m")), 268 calendar.monthrange( 269 int(end_date.strftime("%Y")), int(end_date.strftime("%m")) 270 )[1], 271 ): 272 init_end_date_dict["MONTH"] = "N" 273 274 # Validating WEEK cadence 275 if end_date == expected_end_cadence_date.end_of("week").replace( 276 hour=0, minute=0, second=0, microsecond=0 277 ): 278 init_end_date_dict["WEEK"] = "N" 279 280 init_end_date_dict["DAY"] = "N" 281 282 return init_end_date_dict
A dictionary that corresponds to the conclusion of a cadence.
Any end date inputted by the user we check this end date is actually end of a cadence (YEAR, QUARTER, MONTH, WEEK). If the user input is 2024-03-31 this is a month end and a quarter end that means any use cases configured as month or quarter need to be calculated.
Arguments:
- end_date: base end date.
284 def get_reconciliation_cadences( 285 self, 286 cadence: str, 287 selected_reconciliation_window: dict, 288 cadence_configuration_at_end_date: dict, 289 rerun_flag: str, 290 ) -> dict: 291 """Get reconciliation cadences based on the use case configuration. 292 293 Args: 294 cadence: cadence to process. 295 selected_reconciliation_window: configured use case reconciliation window. 296 cadence_configuration_at_end_date: cadences to execute at the end date. 297 rerun_flag: flag indicating if it's a rerun or a normal run. 298 """ 299 configured_cadences = self._get_configured_cadences_by_snapshot( 300 cadence, selected_reconciliation_window, cadence_configuration_at_end_date 301 ) 302 303 return self._get_cadences_to_execute( 304 configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag 305 )
Get reconciliation cadences based on the use case configuration.
Arguments:
- cadence: cadence to process.
- selected_reconciliation_window: configured use case reconciliation window.
- cadence_configuration_at_end_date: cadences to execute at the end date.
- rerun_flag: flag indicating if it's a rerun or a normal run.
474 @classmethod 475 def format_datetime_to_default(cls, date_to_format: datetime) -> str: 476 """Format datetime to GAB default format. 477 478 Args: 479 date_to_format: date to format. 480 """ 481 return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value)
Format datetime to GAB default format.
Arguments:
- date_to_format: date to format.
484class GABPartitionUtils(object): 485 """Class to extract a partition based in a date period.""" 486 487 _LOGGER = LoggingHandler(__name__).get_logger() 488 489 @classmethod 490 def get_years(cls, start_date: str, end_date: str) -> list[str]: 491 """Return a list of distinct years from the input parameters. 492 493 Args: 494 start_date: start of the period. 495 end_date: end of the period. 496 """ 497 year = [] 498 if start_date > end_date: 499 raise ValueError( 500 "Input Error: Invalid start_date and end_date. " 501 "Start_date is greater than end_date" 502 ) 503 504 for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1): 505 year.append(str(i)) 506 507 return year 508 509 @classmethod 510 def get_partition_condition(cls, start_date: str, end_date: str) -> str: 511 """Return year,month and day partition statement from the input parameters. 512 513 Args: 514 start_date: start of the period. 515 end_date: end of the period. 516 """ 517 years = cls.get_years(start_date, end_date) 518 if len(years) > 1: 519 partition_condition = cls._get_multiple_years_partition( 520 start_date, end_date, years 521 ) 522 else: 523 partition_condition = cls._get_single_year_partition(start_date, end_date) 524 return partition_condition 525 526 @classmethod 527 def _get_multiple_years_partition( 528 cls, start_date: str, end_date: str, years: list[str] 529 ) -> str: 530 """Return partition when executing multiple years (>1). 531 532 Args: 533 start_date: start of the period. 534 end_date: end of the period. 535 years: list of years. 536 """ 537 start_date_month = cls._extract_date_part_from_date("MONTH", start_date) 538 start_date_day = cls._extract_date_part_from_date("DAY", start_date) 539 540 end_date_month = cls._extract_date_part_from_date("MONTH", end_date) 541 end_date_day = cls._extract_date_part_from_date("DAY", end_date) 542 543 year_statement = "(year = {0} and (".format(years[0]) + "{})" 544 if start_date_month != "12": 545 start_date_partition = year_statement.format( 546 "(month = {0} and day between {1} and 31)".format( 547 start_date_month, start_date_day 548 ) 549 + " or (month between {0} and 12)".format(int(start_date_month) + 1) 550 ) 551 else: 552 start_date_partition = year_statement.format( 553 "month = {0} and day between {1} and 31".format( 554 start_date_month, start_date_day 555 ) 556 ) 557 558 period_years_partition = "" 559 560 if len(years) == 3: 561 period_years_partition = ") or (year = {0}".format(years[1]) 562 elif len(years) > 3: 563 period_years_partition = ") or (year between {0} and {1})".format( 564 years[1], years[-2] 565 ) 566 567 if end_date_month != "01": 568 end_date_partition = ( 569 ") or (year = {0} and ((month between 01 and {1})".format( 570 years[-1], int(end_date_month) - 1 571 ) 572 + " or (month = {0} and day between 1 and {1})))".format( 573 end_date_month, end_date_day 574 ) 575 ) 576 else: 577 end_date_partition = ( 578 ") or (year = {0} and month = 1 and day between 01 and {1})".format( 579 years[-1], end_date_day 580 ) 581 ) 582 partition_condition = ( 583 start_date_partition + period_years_partition + end_date_partition 584 ) 585 586 return partition_condition 587 588 @classmethod 589 def _get_single_year_partition(cls, start_date: str, end_date: str) -> str: 590 """Return partition when executing a single year. 591 592 Args: 593 start_date: start of the period. 594 end_date: end of the period. 595 """ 596 start_date_year = cls._extract_date_part_from_date("YEAR", start_date) 597 start_date_month = cls._extract_date_part_from_date("MONTH", start_date) 598 start_date_day = cls._extract_date_part_from_date("DAY", start_date) 599 600 end_date_year = cls._extract_date_part_from_date("YEAR", end_date) 601 end_date_month = cls._extract_date_part_from_date("MONTH", end_date) 602 end_date_day = cls._extract_date_part_from_date("DAY", end_date) 603 604 if start_date_month != end_date_month: 605 months = [] 606 for i in range(int(start_date_month), int(end_date_month) + 1): 607 months.append(i) 608 609 start_date_partition = ( 610 "year = {0} and ((month={1} and day between {2} and 31)".format( 611 start_date_year, months[0], start_date_day 612 ) 613 ) 614 period_years_partition = "" 615 if len(months) == 2: 616 period_years_partition = start_date_partition 617 elif len(months) == 3: 618 period_years_partition = ( 619 start_date_partition + " or (month = {0})".format(months[1]) 620 ) 621 elif len(months) > 3: 622 period_years_partition = ( 623 start_date_partition 624 + " or (month between {0} and {1})".format(months[1], months[-2]) 625 ) 626 partition_condition = ( 627 period_years_partition 628 + " or (month = {0} and day between 1 and {1}))".format( 629 end_date_month, end_date_day 630 ) 631 ) 632 else: 633 partition_condition = ( 634 "year = {0} and month = {1} and day between {2} and {3}".format( 635 end_date_year, end_date_month, start_date_day, end_date_day 636 ) 637 ) 638 639 return partition_condition 640 641 @classmethod 642 def _extract_date_part_from_date(cls, part: str, date: str) -> str: 643 """Extract date part from string date. 644 645 Args: 646 part: date part (possible values: DAY, MONTH, YEAR) 647 date: string date. 648 """ 649 if "DAY" == part.upper(): 650 return date[8:10] 651 elif "MONTH" == part.upper(): 652 return date[5:7] 653 else: 654 return date[0:4]
Class to extract a partition based in a date period.
489 @classmethod 490 def get_years(cls, start_date: str, end_date: str) -> list[str]: 491 """Return a list of distinct years from the input parameters. 492 493 Args: 494 start_date: start of the period. 495 end_date: end of the period. 496 """ 497 year = [] 498 if start_date > end_date: 499 raise ValueError( 500 "Input Error: Invalid start_date and end_date. " 501 "Start_date is greater than end_date" 502 ) 503 504 for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1): 505 year.append(str(i)) 506 507 return year
Return a list of distinct years from the input parameters.
Arguments:
- start_date: start of the period.
- end_date: end of the period.
509 @classmethod 510 def get_partition_condition(cls, start_date: str, end_date: str) -> str: 511 """Return year,month and day partition statement from the input parameters. 512 513 Args: 514 start_date: start of the period. 515 end_date: end of the period. 516 """ 517 years = cls.get_years(start_date, end_date) 518 if len(years) > 1: 519 partition_condition = cls._get_multiple_years_partition( 520 start_date, end_date, years 521 ) 522 else: 523 partition_condition = cls._get_single_year_partition(start_date, end_date) 524 return partition_condition
Return year,month and day partition statement from the input parameters.
Arguments:
- start_date: start of the period.
- end_date: end of the period.