lakehouse_engine.utils.gab_utils

Module to define GAB Utility classes.

  1"""Module to define GAB Utility classes."""
  2
  3import ast
  4import calendar
  5import json
  6from datetime import datetime
  7from typing import Optional, Union
  8
  9import pendulum
 10from pyspark.sql import DataFrame
 11from pyspark.sql.functions import col, lit, struct, to_json
 12
 13from lakehouse_engine.core.definitions import GABCadence, GABDefaults
 14from lakehouse_engine.core.exec_env import ExecEnv
 15from lakehouse_engine.utils.logging_handler import LoggingHandler
 16
 17
 18class GABUtils(object):
 19    """Class containing utility functions for GAB."""
 20
 21    _LOGGER = LoggingHandler(__name__).get_logger()
 22
 23    def logger(
 24        self,
 25        run_start_time: datetime,
 26        run_end_time: datetime,
 27        start: str,
 28        end: str,
 29        query_id: str,
 30        query_label: str,
 31        cadence: str,
 32        stage_file_path: str,
 33        query: str,
 34        status: str,
 35        error_message: Union[Exception, str],
 36        target_database: str,
 37    ) -> None:
 38        """Store the execution of each stage in the log events table.
 39
 40        Args:
 41            run_start_time: execution start time.
 42            run_end_time: execution end time.
 43            start: use case start date.
 44            end: use case end date.
 45            query_id: gab configuration table use case identifier.
 46            query_label: gab configuration table use case name.
 47            cadence: cadence to process.
 48            stage_file_path: stage file path.
 49            query: query to execute.
 50            status: status of the query execution.
 51            error_message: error message if present.
 52            target_database: target database to write.
 53        """
 54        ins = """
 55        INSERT INTO {database}.gab_log_events
 56        VALUES (
 57            '{run_start_time}',
 58            '{run_end_time}',
 59            '{start}',
 60            '{end}',
 61            {query_id},
 62            '{query_label}',
 63            '{cadence}',
 64            '{stage_file_path}',
 65            '{query}',
 66            '{status}',
 67            '{error_message}'
 68        )""".format(  # nosec: B608
 69            database=target_database,
 70            run_start_time=run_start_time,
 71            run_end_time=run_end_time,
 72            start=start,
 73            end=end,
 74            query_id=query_id,
 75            query_label=query_label,
 76            cadence=cadence,
 77            stage_file_path=stage_file_path,
 78            query=self._escape_quote(query),
 79            status=status,
 80            error_message=(
 81                self._escape_quote(str(error_message))
 82                if status == "Failed"
 83                else error_message
 84            ),
 85        )
 86
 87        ExecEnv.SESSION.sql(ins)
 88
 89    @classmethod
 90    def _escape_quote(cls, to_escape: str) -> str:
 91        """Escape quote on string.
 92
 93        Args:
 94            to_escape: string to escape.
 95        """
 96        return to_escape.replace("'", r"\'").replace('"', r"\"")
 97
 98    @classmethod
 99    def get_json_column_as_dict(
100        cls, lookup_query_builder: DataFrame, query_id: str, query_column: str
101    ) -> dict:  # type: ignore
102        """Get JSON column as dictionary.
103
104        Args:
105            lookup_query_builder: gab configuration data.
106            query_id: gab configuration table use case identifier.
107            query_column: column to get as json.
108        """
109        column_df = lookup_query_builder.filter(
110            col("query_id") == lit(query_id)
111        ).select(col(query_column))
112
113        column_df_json = column_df.select(
114            to_json(struct([column_df[x] for x in column_df.columns]))
115        ).collect()[0][0]
116
117        json_column = json.loads(column_df_json)
118
119        for mapping in json_column.values():
120            column_as_json = ast.literal_eval(mapping)
121
122        return column_as_json  # type: ignore
123
124    @classmethod
125    def extract_columns_from_mapping(
126        cls,
127        columns: dict,
128        is_dimension: bool,
129        extract_column_without_alias: bool = False,
130        table_alias: Optional[str] = None,
131        is_extracted_value_as_name: bool = True,
132    ) -> Union[tuple[list[str], list[str]], list[str]]:
133        """Extract and transform columns to SQL select statement.
134
135        Args:
136            columns: data to extract the columns.
137            is_dimension: flag identifying if is a dimension or a metric.
138            extract_column_without_alias: flag to inform if it's to extract columns
139                without aliases.
140            table_alias: name or alias from the source table.
141            is_extracted_value_as_name: identify if the extracted value is the
142                column name.
143        """
144        column_with_alias = (
145            "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}"
146        )
147        column_without_alias = (
148            "".join([table_alias, ".", "{}"]) if table_alias else "{}"
149        )
150
151        extracted_columns_with_alias = []
152        extracted_columns_without_alias = []
153        for column_name, column_value in columns.items():
154            if extract_column_without_alias:
155                extracted_column_without_alias = column_without_alias.format(
156                    cls._get_column_format_without_alias(
157                        is_dimension,
158                        column_name,
159                        column_value,
160                        is_extracted_value_as_name,
161                    )
162                )
163                extracted_columns_without_alias.append(extracted_column_without_alias)
164
165            extracted_column_with_alias = column_with_alias.format(
166                *cls._extract_column_with_alias(
167                    is_dimension,
168                    column_name,
169                    column_value,
170                    is_extracted_value_as_name,
171                )
172            )
173            extracted_columns_with_alias.append(extracted_column_with_alias)
174
175        return (
176            (extracted_columns_with_alias, extracted_columns_without_alias)
177            if extract_column_without_alias
178            else extracted_columns_with_alias
179        )
180
181    @classmethod
182    def _extract_column_with_alias(
183        cls,
184        is_dimension: bool,
185        column_name: str,
186        column_value: Union[str, dict],
187        is_extracted_value_as_name: bool = True,
188    ) -> tuple[str, str]:
189        """Extract column name with alias.
190
191        Args:
192            is_dimension: flag indicating if the column is a dimension.
193            column_name: name of the column.
194            column_value: value of the column.
195            is_extracted_value_as_name: flag indicating if the name of the column is the
196                extracted value.
197        """
198        extracted_value = (
199            column_value
200            if is_dimension
201            else (column_value["metric_name"])  # type: ignore
202        )
203
204        return (
205            (extracted_value, column_name)  # type: ignore
206            if is_extracted_value_as_name
207            else (column_name, extracted_value)
208        )
209
210    @classmethod
211    def _get_column_format_without_alias(
212        cls,
213        is_dimension: bool,
214        column_name: str,
215        column_value: Union[str, dict],
216        is_extracted_value_as_name: bool = True,
217    ) -> str:
218        """Extract column name without alias.
219
220        Args:
221            is_dimension: flag indicating if the column is a dimension.
222            column_name: name of the column.
223            column_value: value of the column.
224            is_extracted_value_as_name: flag indicating if the name of the column is the
225                extracted value.
226        """
227        extracted_value: str = (
228            column_value
229            if is_dimension
230            else (column_value["metric_name"])  # type: ignore
231        )
232
233        return extracted_value if is_extracted_value_as_name else column_name
234
235    @classmethod
236    def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict:
237        """A dictionary that corresponds to the conclusion of a cadence.
238
239        Any end date inputted by the user we check this end date is actually end of
240            a cadence (YEAR, QUARTER, MONTH, WEEK).
241        If the user input is 2024-03-31 this is a month end and a quarter end that
242            means any use cases configured as month or quarter need to be calculated.
243
244        Args:
245            end_date: base end date.
246        """
247        init_end_date_dict = {}
248
249        expected_end_cadence_date = pendulum.datetime(
250            int(end_date.strftime("%Y")),
251            int(end_date.strftime("%m")),
252            int(end_date.strftime("%d")),
253        ).replace(tzinfo=None)
254
255        # Validating YEAR cadence
256        if end_date == expected_end_cadence_date.last_of("year"):
257            init_end_date_dict["YEAR"] = "N"
258
259        # Validating QUARTER cadence
260        if end_date == expected_end_cadence_date.last_of("quarter"):
261            init_end_date_dict["QUARTER"] = "N"
262
263        # Validating MONTH cadence
264        if end_date == datetime(
265            int(end_date.strftime("%Y")),
266            int(end_date.strftime("%m")),
267            calendar.monthrange(
268                int(end_date.strftime("%Y")), int(end_date.strftime("%m"))
269            )[1],
270        ):
271            init_end_date_dict["MONTH"] = "N"
272
273        # Validating WEEK cadence
274        if end_date == expected_end_cadence_date.end_of("week").replace(
275            hour=0, minute=0, second=0, microsecond=0
276        ):
277            init_end_date_dict["WEEK"] = "N"
278
279        init_end_date_dict["DAY"] = "N"
280
281        return init_end_date_dict
282
283    def get_reconciliation_cadences(
284        self,
285        cadence: str,
286        selected_reconciliation_window: dict,
287        cadence_configuration_at_end_date: dict,
288        rerun_flag: str,
289    ) -> dict:
290        """Get reconciliation cadences based on the use case configuration.
291
292        Args:
293            cadence: cadence to process.
294            selected_reconciliation_window: configured use case reconciliation window.
295            cadence_configuration_at_end_date: cadences to execute at the end date.
296            rerun_flag: flag indicating if it's a rerun or a normal run.
297        """
298        configured_cadences = self._get_configured_cadences_by_snapshot(
299            cadence, selected_reconciliation_window, cadence_configuration_at_end_date
300        )
301
302        return self._get_cadences_to_execute(
303            configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag
304        )
305
306    @classmethod
307    def _get_cadences_to_execute(
308        cls,
309        configured_cadences: dict,
310        cadence: str,
311        cadence_configuration_at_end_date: dict,
312        rerun_flag: str,
313    ) -> dict:
314        """Get cadences to execute.
315
316        Args:
317            cadence: cadence to process.
318            configured_cadences: configured use case reconciliation window.
319            cadence_configuration_at_end_date: cadences to execute at the end date.
320            rerun_flag: flag indicating if it's a rerun or a normal run.
321        """
322        cadences_to_execute = {}
323        cad_order = GABCadence.get_ordered_cadences()
324
325        for snapshot_cadence, snapshot_flag in configured_cadences.items():
326            if (
327                (cad_order[cadence] > cad_order[snapshot_cadence])
328                and (rerun_flag == "Y")
329            ) or snapshot_cadence in cadence_configuration_at_end_date:
330                cadences_to_execute[snapshot_cadence] = snapshot_flag
331            elif snapshot_cadence not in cadence_configuration_at_end_date:
332                continue
333
334        return cls._sort_cadences_to_execute(cadences_to_execute, cad_order)
335
336    @classmethod
337    def _sort_cadences_to_execute(
338        cls, cadences_to_execute: dict, cad_order: dict
339    ) -> dict:
340        """Sort the cadences to execute.
341
342        Args:
343            cadences_to_execute: cadences to execute.
344            cad_order: all cadences with order.
345        """
346        # ordering it because when grouping cadences with snapshot and without snapshot
347        # can impact the cadence ordering.
348        sorted_cadences_to_execute: dict = dict(
349            sorted(
350                cadences_to_execute.items(),
351                key=lambda item: cad_order.get(item[0]),  # type: ignore
352            )
353        )
354        # ordering cadences to execute it from bigger (YEAR) to smaller (DAY)
355        cadences_to_execute_items = []
356
357        for cadence_name, cadence_value in sorted_cadences_to_execute.items():
358            cadences_to_execute_items.append((cadence_name, cadence_value))
359
360        cadences_sorted_by_bigger_cadence_to_execute: dict = dict(
361            reversed(cadences_to_execute_items)
362        )
363
364        return cadences_sorted_by_bigger_cadence_to_execute
365
366    @classmethod
367    def _get_configured_cadences_by_snapshot(
368        cls,
369        cadence: str,
370        selected_reconciliation_window: dict,
371        cadence_configuration_at_end_date: dict,
372    ) -> dict:
373        """Get configured cadences to execute.
374
375        Args:
376            cadence: selected cadence.
377            selected_reconciliation_window: configured use case reconciliation window.
378            cadence_configuration_at_end_date: cadences to execute at the end date.
379
380        Returns:
381            Each cadence with the corresponding information if it's to execute with
382                snapshot or not.
383        """
384        cadences_by_snapshot = {}
385
386        (
387            no_snapshot_cadences,
388            snapshot_cadences,
389        ) = cls._generate_reconciliation_by_snapshot(
390            cadence, selected_reconciliation_window
391        )
392
393        for snapshot_cadence, snapshot_flag in no_snapshot_cadences.items():
394            if snapshot_cadence in cadence_configuration_at_end_date:
395                cadences_by_snapshot[snapshot_cadence] = snapshot_flag
396
397                cls._LOGGER.info(f"{snapshot_cadence} is present in {cadence} cadence")
398                break
399
400        cadences_by_snapshot.update(snapshot_cadences)
401
402        if (not cadences_by_snapshot) and (
403            cadence in cadence_configuration_at_end_date
404        ):
405            cadences_by_snapshot[cadence] = "N"
406
407        return cadences_by_snapshot
408
409    @classmethod
410    def _generate_reconciliation_by_snapshot(
411        cls, cadence: str, selected_reconciliation_window: dict
412    ) -> tuple[dict, dict]:
413        """Generate reconciliation by snapshot.
414
415        Args:
416            cadence: cadence to process.
417            selected_reconciliation_window: configured use case reconciliation window.
418        """
419        cadence_snapshot_configuration = {cadence: "N"}
420        for cadence in GABCadence.get_cadences():
421            cls._add_cadence_snapshot_to_cadence_snapshot_config(
422                cadence, selected_reconciliation_window, cadence_snapshot_configuration
423            )
424        cadence_snapshot_configuration = dict(
425            sorted(
426                cadence_snapshot_configuration.items(),
427                key=(
428                    lambda item: GABCadence.get_ordered_cadences().get(  # type: ignore
429                        item[0]
430                    )
431                ),
432            )
433        )
434
435        cadence_snapshot_configuration = dict(
436            reversed(list(cadence_snapshot_configuration.items()))
437        )
438
439        cadences_without_snapshot = {
440            key: value
441            for key, value in cadence_snapshot_configuration.items()
442            if value == "N"
443        }
444
445        cadences_with_snapshot = {
446            key: value
447            for key, value in cadence_snapshot_configuration.items()
448            if value == "Y"
449        }
450
451        return cadences_with_snapshot, cadences_without_snapshot
452
453    @classmethod
454    def _add_cadence_snapshot_to_cadence_snapshot_config(
455        cls,
456        cadence: str,
457        selected_reconciliation_window: dict,
458        cadence_snapshot_configuration: dict,
459    ) -> None:
460        """Add the selected reconciliation to cadence snapshot configuration.
461
462        Args:
463            cadence: selected cadence.
464            selected_reconciliation_window:  configured use case reconciliation window.
465            cadence_snapshot_configuration: cadence snapshot configuration dictionary
466                who will be updated with the new value.
467        """
468        if cadence in selected_reconciliation_window:
469            cadence_snapshot_configuration[cadence] = selected_reconciliation_window[
470                cadence
471            ]["snapshot"]
472
473    @classmethod
474    def format_datetime_to_default(cls, date_to_format: datetime) -> str:
475        """Format datetime to GAB default format.
476
477        Args:
478            date_to_format: date to format.
479        """
480        return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value)
481
482
483class GABPartitionUtils(object):
484    """Class to extract a partition based in a date period."""
485
486    _LOGGER = LoggingHandler(__name__).get_logger()
487
488    @classmethod
489    def get_years(cls, start_date: str, end_date: str) -> list[str]:
490        """Return a list of distinct years from the input parameters.
491
492        Args:
493            start_date: start of the period.
494            end_date: end of the period.
495        """
496        year = []
497        if start_date > end_date:
498            raise ValueError(
499                "Input Error: Invalid start_date and end_date. "
500                "Start_date is greater than end_date"
501            )
502
503        for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1):
504            year.append(str(i))
505
506        return year
507
508    @classmethod
509    def get_partition_condition(cls, start_date: str, end_date: str) -> str:
510        """Return year,month and day partition statement from the input parameters.
511
512        Args:
513            start_date: start of the period.
514            end_date: end of the period.
515        """
516        years = cls.get_years(start_date, end_date)
517        if len(years) > 1:
518            partition_condition = cls._get_multiple_years_partition(
519                start_date, end_date, years
520            )
521        else:
522            partition_condition = cls._get_single_year_partition(start_date, end_date)
523        return partition_condition
524
525    @classmethod
526    def _get_multiple_years_partition(
527        cls, start_date: str, end_date: str, years: list[str]
528    ) -> str:
529        """Return partition when executing multiple years (>1).
530
531        Args:
532            start_date: start of the period.
533            end_date: end of the period.
534            years: list of years.
535        """
536        start_date_month = cls._extract_date_part_from_date("MONTH", start_date)
537        start_date_day = cls._extract_date_part_from_date("DAY", start_date)
538
539        end_date_month = cls._extract_date_part_from_date("MONTH", end_date)
540        end_date_day = cls._extract_date_part_from_date("DAY", end_date)
541
542        year_statement = "(year = {0} and (".format(years[0]) + "{})"
543        if start_date_month != "12":
544            start_date_partition = year_statement.format(
545                "(month = {0} and day between {1} and 31)".format(
546                    start_date_month, start_date_day
547                )
548                + " or (month between {0} and 12)".format(int(start_date_month) + 1)
549            )
550        else:
551            start_date_partition = year_statement.format(
552                "month = {0} and day between {1} and 31".format(
553                    start_date_month, start_date_day
554                )
555            )
556
557        period_years_partition = ""
558
559        if len(years) == 3:
560            period_years_partition = ") or (year = {0}".format(years[1])
561        elif len(years) > 3:
562            period_years_partition = ") or (year between {0} and {1})".format(
563                years[1], years[-2]
564            )
565
566        if end_date_month != "01":
567            end_date_partition = (
568                ") or (year = {0} and ((month between 01 and {1})".format(
569                    years[-1], int(end_date_month) - 1
570                )
571                + " or (month = {0} and day between 1 and {1})))".format(
572                    end_date_month, end_date_day
573                )
574            )
575        else:
576            end_date_partition = (
577                ") or (year = {0} and month = 1 and day between 01 and {1})".format(
578                    years[-1], end_date_day
579                )
580            )
581        partition_condition = (
582            start_date_partition + period_years_partition + end_date_partition
583        )
584
585        return partition_condition
586
587    @classmethod
588    def _get_single_year_partition(cls, start_date: str, end_date: str) -> str:
589        """Return partition when executing a single year.
590
591        Args:
592            start_date: start of the period.
593            end_date: end of the period.
594        """
595        start_date_year = cls._extract_date_part_from_date("YEAR", start_date)
596        start_date_month = cls._extract_date_part_from_date("MONTH", start_date)
597        start_date_day = cls._extract_date_part_from_date("DAY", start_date)
598
599        end_date_year = cls._extract_date_part_from_date("YEAR", end_date)
600        end_date_month = cls._extract_date_part_from_date("MONTH", end_date)
601        end_date_day = cls._extract_date_part_from_date("DAY", end_date)
602
603        if start_date_month != end_date_month:
604            months = []
605            for i in range(int(start_date_month), int(end_date_month) + 1):
606                months.append(i)
607
608            start_date_partition = (
609                "year = {0} and ((month={1} and day between {2} and 31)".format(
610                    start_date_year, months[0], start_date_day
611                )
612            )
613            period_years_partition = ""
614            if len(months) == 2:
615                period_years_partition = start_date_partition
616            elif len(months) == 3:
617                period_years_partition = (
618                    start_date_partition + " or (month = {0})".format(months[1])
619                )
620            elif len(months) > 3:
621                period_years_partition = (
622                    start_date_partition
623                    + " or (month between {0} and {1})".format(months[1], months[-2])
624                )
625            partition_condition = (
626                period_years_partition
627                + " or (month = {0} and day between 1 and {1}))".format(
628                    end_date_month, end_date_day
629                )
630            )
631        else:
632            partition_condition = (
633                "year = {0} and month = {1} and day between {2} and {3}".format(
634                    end_date_year, end_date_month, start_date_day, end_date_day
635                )
636            )
637
638        return partition_condition
639
640    @classmethod
641    def _extract_date_part_from_date(cls, part: str, date: str) -> str:
642        """Extract date part from string date.
643
644        Args:
645            part: date part (possible values: DAY, MONTH, YEAR)
646            date: string date.
647        """
648        if "DAY" == part.upper():
649            return date[8:10]
650        elif "MONTH" == part.upper():
651            return date[5:7]
652        else:
653            return date[0:4]
class GABUtils:
 19class GABUtils(object):
 20    """Class containing utility functions for GAB."""
 21
 22    _LOGGER = LoggingHandler(__name__).get_logger()
 23
 24    def logger(
 25        self,
 26        run_start_time: datetime,
 27        run_end_time: datetime,
 28        start: str,
 29        end: str,
 30        query_id: str,
 31        query_label: str,
 32        cadence: str,
 33        stage_file_path: str,
 34        query: str,
 35        status: str,
 36        error_message: Union[Exception, str],
 37        target_database: str,
 38    ) -> None:
 39        """Store the execution of each stage in the log events table.
 40
 41        Args:
 42            run_start_time: execution start time.
 43            run_end_time: execution end time.
 44            start: use case start date.
 45            end: use case end date.
 46            query_id: gab configuration table use case identifier.
 47            query_label: gab configuration table use case name.
 48            cadence: cadence to process.
 49            stage_file_path: stage file path.
 50            query: query to execute.
 51            status: status of the query execution.
 52            error_message: error message if present.
 53            target_database: target database to write.
 54        """
 55        ins = """
 56        INSERT INTO {database}.gab_log_events
 57        VALUES (
 58            '{run_start_time}',
 59            '{run_end_time}',
 60            '{start}',
 61            '{end}',
 62            {query_id},
 63            '{query_label}',
 64            '{cadence}',
 65            '{stage_file_path}',
 66            '{query}',
 67            '{status}',
 68            '{error_message}'
 69        )""".format(  # nosec: B608
 70            database=target_database,
 71            run_start_time=run_start_time,
 72            run_end_time=run_end_time,
 73            start=start,
 74            end=end,
 75            query_id=query_id,
 76            query_label=query_label,
 77            cadence=cadence,
 78            stage_file_path=stage_file_path,
 79            query=self._escape_quote(query),
 80            status=status,
 81            error_message=(
 82                self._escape_quote(str(error_message))
 83                if status == "Failed"
 84                else error_message
 85            ),
 86        )
 87
 88        ExecEnv.SESSION.sql(ins)
 89
 90    @classmethod
 91    def _escape_quote(cls, to_escape: str) -> str:
 92        """Escape quote on string.
 93
 94        Args:
 95            to_escape: string to escape.
 96        """
 97        return to_escape.replace("'", r"\'").replace('"', r"\"")
 98
 99    @classmethod
100    def get_json_column_as_dict(
101        cls, lookup_query_builder: DataFrame, query_id: str, query_column: str
102    ) -> dict:  # type: ignore
103        """Get JSON column as dictionary.
104
105        Args:
106            lookup_query_builder: gab configuration data.
107            query_id: gab configuration table use case identifier.
108            query_column: column to get as json.
109        """
110        column_df = lookup_query_builder.filter(
111            col("query_id") == lit(query_id)
112        ).select(col(query_column))
113
114        column_df_json = column_df.select(
115            to_json(struct([column_df[x] for x in column_df.columns]))
116        ).collect()[0][0]
117
118        json_column = json.loads(column_df_json)
119
120        for mapping in json_column.values():
121            column_as_json = ast.literal_eval(mapping)
122
123        return column_as_json  # type: ignore
124
125    @classmethod
126    def extract_columns_from_mapping(
127        cls,
128        columns: dict,
129        is_dimension: bool,
130        extract_column_without_alias: bool = False,
131        table_alias: Optional[str] = None,
132        is_extracted_value_as_name: bool = True,
133    ) -> Union[tuple[list[str], list[str]], list[str]]:
134        """Extract and transform columns to SQL select statement.
135
136        Args:
137            columns: data to extract the columns.
138            is_dimension: flag identifying if is a dimension or a metric.
139            extract_column_without_alias: flag to inform if it's to extract columns
140                without aliases.
141            table_alias: name or alias from the source table.
142            is_extracted_value_as_name: identify if the extracted value is the
143                column name.
144        """
145        column_with_alias = (
146            "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}"
147        )
148        column_without_alias = (
149            "".join([table_alias, ".", "{}"]) if table_alias else "{}"
150        )
151
152        extracted_columns_with_alias = []
153        extracted_columns_without_alias = []
154        for column_name, column_value in columns.items():
155            if extract_column_without_alias:
156                extracted_column_without_alias = column_without_alias.format(
157                    cls._get_column_format_without_alias(
158                        is_dimension,
159                        column_name,
160                        column_value,
161                        is_extracted_value_as_name,
162                    )
163                )
164                extracted_columns_without_alias.append(extracted_column_without_alias)
165
166            extracted_column_with_alias = column_with_alias.format(
167                *cls._extract_column_with_alias(
168                    is_dimension,
169                    column_name,
170                    column_value,
171                    is_extracted_value_as_name,
172                )
173            )
174            extracted_columns_with_alias.append(extracted_column_with_alias)
175
176        return (
177            (extracted_columns_with_alias, extracted_columns_without_alias)
178            if extract_column_without_alias
179            else extracted_columns_with_alias
180        )
181
182    @classmethod
183    def _extract_column_with_alias(
184        cls,
185        is_dimension: bool,
186        column_name: str,
187        column_value: Union[str, dict],
188        is_extracted_value_as_name: bool = True,
189    ) -> tuple[str, str]:
190        """Extract column name with alias.
191
192        Args:
193            is_dimension: flag indicating if the column is a dimension.
194            column_name: name of the column.
195            column_value: value of the column.
196            is_extracted_value_as_name: flag indicating if the name of the column is the
197                extracted value.
198        """
199        extracted_value = (
200            column_value
201            if is_dimension
202            else (column_value["metric_name"])  # type: ignore
203        )
204
205        return (
206            (extracted_value, column_name)  # type: ignore
207            if is_extracted_value_as_name
208            else (column_name, extracted_value)
209        )
210
211    @classmethod
212    def _get_column_format_without_alias(
213        cls,
214        is_dimension: bool,
215        column_name: str,
216        column_value: Union[str, dict],
217        is_extracted_value_as_name: bool = True,
218    ) -> str:
219        """Extract column name without alias.
220
221        Args:
222            is_dimension: flag indicating if the column is a dimension.
223            column_name: name of the column.
224            column_value: value of the column.
225            is_extracted_value_as_name: flag indicating if the name of the column is the
226                extracted value.
227        """
228        extracted_value: str = (
229            column_value
230            if is_dimension
231            else (column_value["metric_name"])  # type: ignore
232        )
233
234        return extracted_value if is_extracted_value_as_name else column_name
235
236    @classmethod
237    def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict:
238        """A dictionary that corresponds to the conclusion of a cadence.
239
240        Any end date inputted by the user we check this end date is actually end of
241            a cadence (YEAR, QUARTER, MONTH, WEEK).
242        If the user input is 2024-03-31 this is a month end and a quarter end that
243            means any use cases configured as month or quarter need to be calculated.
244
245        Args:
246            end_date: base end date.
247        """
248        init_end_date_dict = {}
249
250        expected_end_cadence_date = pendulum.datetime(
251            int(end_date.strftime("%Y")),
252            int(end_date.strftime("%m")),
253            int(end_date.strftime("%d")),
254        ).replace(tzinfo=None)
255
256        # Validating YEAR cadence
257        if end_date == expected_end_cadence_date.last_of("year"):
258            init_end_date_dict["YEAR"] = "N"
259
260        # Validating QUARTER cadence
261        if end_date == expected_end_cadence_date.last_of("quarter"):
262            init_end_date_dict["QUARTER"] = "N"
263
264        # Validating MONTH cadence
265        if end_date == datetime(
266            int(end_date.strftime("%Y")),
267            int(end_date.strftime("%m")),
268            calendar.monthrange(
269                int(end_date.strftime("%Y")), int(end_date.strftime("%m"))
270            )[1],
271        ):
272            init_end_date_dict["MONTH"] = "N"
273
274        # Validating WEEK cadence
275        if end_date == expected_end_cadence_date.end_of("week").replace(
276            hour=0, minute=0, second=0, microsecond=0
277        ):
278            init_end_date_dict["WEEK"] = "N"
279
280        init_end_date_dict["DAY"] = "N"
281
282        return init_end_date_dict
283
284    def get_reconciliation_cadences(
285        self,
286        cadence: str,
287        selected_reconciliation_window: dict,
288        cadence_configuration_at_end_date: dict,
289        rerun_flag: str,
290    ) -> dict:
291        """Get reconciliation cadences based on the use case configuration.
292
293        Args:
294            cadence: cadence to process.
295            selected_reconciliation_window: configured use case reconciliation window.
296            cadence_configuration_at_end_date: cadences to execute at the end date.
297            rerun_flag: flag indicating if it's a rerun or a normal run.
298        """
299        configured_cadences = self._get_configured_cadences_by_snapshot(
300            cadence, selected_reconciliation_window, cadence_configuration_at_end_date
301        )
302
303        return self._get_cadences_to_execute(
304            configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag
305        )
306
307    @classmethod
308    def _get_cadences_to_execute(
309        cls,
310        configured_cadences: dict,
311        cadence: str,
312        cadence_configuration_at_end_date: dict,
313        rerun_flag: str,
314    ) -> dict:
315        """Get cadences to execute.
316
317        Args:
318            cadence: cadence to process.
319            configured_cadences: configured use case reconciliation window.
320            cadence_configuration_at_end_date: cadences to execute at the end date.
321            rerun_flag: flag indicating if it's a rerun or a normal run.
322        """
323        cadences_to_execute = {}
324        cad_order = GABCadence.get_ordered_cadences()
325
326        for snapshot_cadence, snapshot_flag in configured_cadences.items():
327            if (
328                (cad_order[cadence] > cad_order[snapshot_cadence])
329                and (rerun_flag == "Y")
330            ) or snapshot_cadence in cadence_configuration_at_end_date:
331                cadences_to_execute[snapshot_cadence] = snapshot_flag
332            elif snapshot_cadence not in cadence_configuration_at_end_date:
333                continue
334
335        return cls._sort_cadences_to_execute(cadences_to_execute, cad_order)
336
337    @classmethod
338    def _sort_cadences_to_execute(
339        cls, cadences_to_execute: dict, cad_order: dict
340    ) -> dict:
341        """Sort the cadences to execute.
342
343        Args:
344            cadences_to_execute: cadences to execute.
345            cad_order: all cadences with order.
346        """
347        # ordering it because when grouping cadences with snapshot and without snapshot
348        # can impact the cadence ordering.
349        sorted_cadences_to_execute: dict = dict(
350            sorted(
351                cadences_to_execute.items(),
352                key=lambda item: cad_order.get(item[0]),  # type: ignore
353            )
354        )
355        # ordering cadences to execute it from bigger (YEAR) to smaller (DAY)
356        cadences_to_execute_items = []
357
358        for cadence_name, cadence_value in sorted_cadences_to_execute.items():
359            cadences_to_execute_items.append((cadence_name, cadence_value))
360
361        cadences_sorted_by_bigger_cadence_to_execute: dict = dict(
362            reversed(cadences_to_execute_items)
363        )
364
365        return cadences_sorted_by_bigger_cadence_to_execute
366
367    @classmethod
368    def _get_configured_cadences_by_snapshot(
369        cls,
370        cadence: str,
371        selected_reconciliation_window: dict,
372        cadence_configuration_at_end_date: dict,
373    ) -> dict:
374        """Get configured cadences to execute.
375
376        Args:
377            cadence: selected cadence.
378            selected_reconciliation_window: configured use case reconciliation window.
379            cadence_configuration_at_end_date: cadences to execute at the end date.
380
381        Returns:
382            Each cadence with the corresponding information if it's to execute with
383                snapshot or not.
384        """
385        cadences_by_snapshot = {}
386
387        (
388            no_snapshot_cadences,
389            snapshot_cadences,
390        ) = cls._generate_reconciliation_by_snapshot(
391            cadence, selected_reconciliation_window
392        )
393
394        for snapshot_cadence, snapshot_flag in no_snapshot_cadences.items():
395            if snapshot_cadence in cadence_configuration_at_end_date:
396                cadences_by_snapshot[snapshot_cadence] = snapshot_flag
397
398                cls._LOGGER.info(f"{snapshot_cadence} is present in {cadence} cadence")
399                break
400
401        cadences_by_snapshot.update(snapshot_cadences)
402
403        if (not cadences_by_snapshot) and (
404            cadence in cadence_configuration_at_end_date
405        ):
406            cadences_by_snapshot[cadence] = "N"
407
408        return cadences_by_snapshot
409
410    @classmethod
411    def _generate_reconciliation_by_snapshot(
412        cls, cadence: str, selected_reconciliation_window: dict
413    ) -> tuple[dict, dict]:
414        """Generate reconciliation by snapshot.
415
416        Args:
417            cadence: cadence to process.
418            selected_reconciliation_window: configured use case reconciliation window.
419        """
420        cadence_snapshot_configuration = {cadence: "N"}
421        for cadence in GABCadence.get_cadences():
422            cls._add_cadence_snapshot_to_cadence_snapshot_config(
423                cadence, selected_reconciliation_window, cadence_snapshot_configuration
424            )
425        cadence_snapshot_configuration = dict(
426            sorted(
427                cadence_snapshot_configuration.items(),
428                key=(
429                    lambda item: GABCadence.get_ordered_cadences().get(  # type: ignore
430                        item[0]
431                    )
432                ),
433            )
434        )
435
436        cadence_snapshot_configuration = dict(
437            reversed(list(cadence_snapshot_configuration.items()))
438        )
439
440        cadences_without_snapshot = {
441            key: value
442            for key, value in cadence_snapshot_configuration.items()
443            if value == "N"
444        }
445
446        cadences_with_snapshot = {
447            key: value
448            for key, value in cadence_snapshot_configuration.items()
449            if value == "Y"
450        }
451
452        return cadences_with_snapshot, cadences_without_snapshot
453
454    @classmethod
455    def _add_cadence_snapshot_to_cadence_snapshot_config(
456        cls,
457        cadence: str,
458        selected_reconciliation_window: dict,
459        cadence_snapshot_configuration: dict,
460    ) -> None:
461        """Add the selected reconciliation to cadence snapshot configuration.
462
463        Args:
464            cadence: selected cadence.
465            selected_reconciliation_window:  configured use case reconciliation window.
466            cadence_snapshot_configuration: cadence snapshot configuration dictionary
467                who will be updated with the new value.
468        """
469        if cadence in selected_reconciliation_window:
470            cadence_snapshot_configuration[cadence] = selected_reconciliation_window[
471                cadence
472            ]["snapshot"]
473
474    @classmethod
475    def format_datetime_to_default(cls, date_to_format: datetime) -> str:
476        """Format datetime to GAB default format.
477
478        Args:
479            date_to_format: date to format.
480        """
481        return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value)

Class containing utility functions for GAB.

def logger( self, run_start_time: datetime.datetime, run_end_time: datetime.datetime, start: str, end: str, query_id: str, query_label: str, cadence: str, stage_file_path: str, query: str, status: str, error_message: Union[Exception, str], target_database: str) -> None:
24    def logger(
25        self,
26        run_start_time: datetime,
27        run_end_time: datetime,
28        start: str,
29        end: str,
30        query_id: str,
31        query_label: str,
32        cadence: str,
33        stage_file_path: str,
34        query: str,
35        status: str,
36        error_message: Union[Exception, str],
37        target_database: str,
38    ) -> None:
39        """Store the execution of each stage in the log events table.
40
41        Args:
42            run_start_time: execution start time.
43            run_end_time: execution end time.
44            start: use case start date.
45            end: use case end date.
46            query_id: gab configuration table use case identifier.
47            query_label: gab configuration table use case name.
48            cadence: cadence to process.
49            stage_file_path: stage file path.
50            query: query to execute.
51            status: status of the query execution.
52            error_message: error message if present.
53            target_database: target database to write.
54        """
55        ins = """
56        INSERT INTO {database}.gab_log_events
57        VALUES (
58            '{run_start_time}',
59            '{run_end_time}',
60            '{start}',
61            '{end}',
62            {query_id},
63            '{query_label}',
64            '{cadence}',
65            '{stage_file_path}',
66            '{query}',
67            '{status}',
68            '{error_message}'
69        )""".format(  # nosec: B608
70            database=target_database,
71            run_start_time=run_start_time,
72            run_end_time=run_end_time,
73            start=start,
74            end=end,
75            query_id=query_id,
76            query_label=query_label,
77            cadence=cadence,
78            stage_file_path=stage_file_path,
79            query=self._escape_quote(query),
80            status=status,
81            error_message=(
82                self._escape_quote(str(error_message))
83                if status == "Failed"
84                else error_message
85            ),
86        )
87
88        ExecEnv.SESSION.sql(ins)

Store the execution of each stage in the log events table.

Arguments:
  • run_start_time: execution start time.
  • run_end_time: execution end time.
  • start: use case start date.
  • end: use case end date.
  • query_id: gab configuration table use case identifier.
  • query_label: gab configuration table use case name.
  • cadence: cadence to process.
  • stage_file_path: stage file path.
  • query: query to execute.
  • status: status of the query execution.
  • error_message: error message if present.
  • target_database: target database to write.
@classmethod
def get_json_column_as_dict( cls, lookup_query_builder: pyspark.sql.dataframe.DataFrame, query_id: str, query_column: str) -> dict:
 99    @classmethod
100    def get_json_column_as_dict(
101        cls, lookup_query_builder: DataFrame, query_id: str, query_column: str
102    ) -> dict:  # type: ignore
103        """Get JSON column as dictionary.
104
105        Args:
106            lookup_query_builder: gab configuration data.
107            query_id: gab configuration table use case identifier.
108            query_column: column to get as json.
109        """
110        column_df = lookup_query_builder.filter(
111            col("query_id") == lit(query_id)
112        ).select(col(query_column))
113
114        column_df_json = column_df.select(
115            to_json(struct([column_df[x] for x in column_df.columns]))
116        ).collect()[0][0]
117
118        json_column = json.loads(column_df_json)
119
120        for mapping in json_column.values():
121            column_as_json = ast.literal_eval(mapping)
122
123        return column_as_json  # type: ignore

Get JSON column as dictionary.

Arguments:
  • lookup_query_builder: gab configuration data.
  • query_id: gab configuration table use case identifier.
  • query_column: column to get as json.
@classmethod
def extract_columns_from_mapping( cls, columns: dict, is_dimension: bool, extract_column_without_alias: bool = False, table_alias: Optional[str] = None, is_extracted_value_as_name: bool = True) -> Union[tuple[list[str], list[str]], list[str]]:
125    @classmethod
126    def extract_columns_from_mapping(
127        cls,
128        columns: dict,
129        is_dimension: bool,
130        extract_column_without_alias: bool = False,
131        table_alias: Optional[str] = None,
132        is_extracted_value_as_name: bool = True,
133    ) -> Union[tuple[list[str], list[str]], list[str]]:
134        """Extract and transform columns to SQL select statement.
135
136        Args:
137            columns: data to extract the columns.
138            is_dimension: flag identifying if is a dimension or a metric.
139            extract_column_without_alias: flag to inform if it's to extract columns
140                without aliases.
141            table_alias: name or alias from the source table.
142            is_extracted_value_as_name: identify if the extracted value is the
143                column name.
144        """
145        column_with_alias = (
146            "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}"
147        )
148        column_without_alias = (
149            "".join([table_alias, ".", "{}"]) if table_alias else "{}"
150        )
151
152        extracted_columns_with_alias = []
153        extracted_columns_without_alias = []
154        for column_name, column_value in columns.items():
155            if extract_column_without_alias:
156                extracted_column_without_alias = column_without_alias.format(
157                    cls._get_column_format_without_alias(
158                        is_dimension,
159                        column_name,
160                        column_value,
161                        is_extracted_value_as_name,
162                    )
163                )
164                extracted_columns_without_alias.append(extracted_column_without_alias)
165
166            extracted_column_with_alias = column_with_alias.format(
167                *cls._extract_column_with_alias(
168                    is_dimension,
169                    column_name,
170                    column_value,
171                    is_extracted_value_as_name,
172                )
173            )
174            extracted_columns_with_alias.append(extracted_column_with_alias)
175
176        return (
177            (extracted_columns_with_alias, extracted_columns_without_alias)
178            if extract_column_without_alias
179            else extracted_columns_with_alias
180        )

Extract and transform columns to SQL select statement.

Arguments:
  • columns: data to extract the columns.
  • is_dimension: flag identifying if is a dimension or a metric.
  • extract_column_without_alias: flag to inform if it's to extract columns without aliases.
  • table_alias: name or alias from the source table.
  • is_extracted_value_as_name: identify if the extracted value is the column name.
@classmethod
def get_cadence_configuration_at_end_date(cls, end_date: datetime.datetime) -> dict:
236    @classmethod
237    def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict:
238        """A dictionary that corresponds to the conclusion of a cadence.
239
240        Any end date inputted by the user we check this end date is actually end of
241            a cadence (YEAR, QUARTER, MONTH, WEEK).
242        If the user input is 2024-03-31 this is a month end and a quarter end that
243            means any use cases configured as month or quarter need to be calculated.
244
245        Args:
246            end_date: base end date.
247        """
248        init_end_date_dict = {}
249
250        expected_end_cadence_date = pendulum.datetime(
251            int(end_date.strftime("%Y")),
252            int(end_date.strftime("%m")),
253            int(end_date.strftime("%d")),
254        ).replace(tzinfo=None)
255
256        # Validating YEAR cadence
257        if end_date == expected_end_cadence_date.last_of("year"):
258            init_end_date_dict["YEAR"] = "N"
259
260        # Validating QUARTER cadence
261        if end_date == expected_end_cadence_date.last_of("quarter"):
262            init_end_date_dict["QUARTER"] = "N"
263
264        # Validating MONTH cadence
265        if end_date == datetime(
266            int(end_date.strftime("%Y")),
267            int(end_date.strftime("%m")),
268            calendar.monthrange(
269                int(end_date.strftime("%Y")), int(end_date.strftime("%m"))
270            )[1],
271        ):
272            init_end_date_dict["MONTH"] = "N"
273
274        # Validating WEEK cadence
275        if end_date == expected_end_cadence_date.end_of("week").replace(
276            hour=0, minute=0, second=0, microsecond=0
277        ):
278            init_end_date_dict["WEEK"] = "N"
279
280        init_end_date_dict["DAY"] = "N"
281
282        return init_end_date_dict

A dictionary that corresponds to the conclusion of a cadence.

Any end date inputted by the user we check this end date is actually end of a cadence (YEAR, QUARTER, MONTH, WEEK). If the user input is 2024-03-31 this is a month end and a quarter end that means any use cases configured as month or quarter need to be calculated.

Arguments:
  • end_date: base end date.
def get_reconciliation_cadences( self, cadence: str, selected_reconciliation_window: dict, cadence_configuration_at_end_date: dict, rerun_flag: str) -> dict:
284    def get_reconciliation_cadences(
285        self,
286        cadence: str,
287        selected_reconciliation_window: dict,
288        cadence_configuration_at_end_date: dict,
289        rerun_flag: str,
290    ) -> dict:
291        """Get reconciliation cadences based on the use case configuration.
292
293        Args:
294            cadence: cadence to process.
295            selected_reconciliation_window: configured use case reconciliation window.
296            cadence_configuration_at_end_date: cadences to execute at the end date.
297            rerun_flag: flag indicating if it's a rerun or a normal run.
298        """
299        configured_cadences = self._get_configured_cadences_by_snapshot(
300            cadence, selected_reconciliation_window, cadence_configuration_at_end_date
301        )
302
303        return self._get_cadences_to_execute(
304            configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag
305        )

Get reconciliation cadences based on the use case configuration.

Arguments:
  • cadence: cadence to process.
  • selected_reconciliation_window: configured use case reconciliation window.
  • cadence_configuration_at_end_date: cadences to execute at the end date.
  • rerun_flag: flag indicating if it's a rerun or a normal run.
@classmethod
def format_datetime_to_default(cls, date_to_format: datetime.datetime) -> str:
474    @classmethod
475    def format_datetime_to_default(cls, date_to_format: datetime) -> str:
476        """Format datetime to GAB default format.
477
478        Args:
479            date_to_format: date to format.
480        """
481        return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value)

Format datetime to GAB default format.

Arguments:
  • date_to_format: date to format.
class GABPartitionUtils:
484class GABPartitionUtils(object):
485    """Class to extract a partition based in a date period."""
486
487    _LOGGER = LoggingHandler(__name__).get_logger()
488
489    @classmethod
490    def get_years(cls, start_date: str, end_date: str) -> list[str]:
491        """Return a list of distinct years from the input parameters.
492
493        Args:
494            start_date: start of the period.
495            end_date: end of the period.
496        """
497        year = []
498        if start_date > end_date:
499            raise ValueError(
500                "Input Error: Invalid start_date and end_date. "
501                "Start_date is greater than end_date"
502            )
503
504        for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1):
505            year.append(str(i))
506
507        return year
508
509    @classmethod
510    def get_partition_condition(cls, start_date: str, end_date: str) -> str:
511        """Return year,month and day partition statement from the input parameters.
512
513        Args:
514            start_date: start of the period.
515            end_date: end of the period.
516        """
517        years = cls.get_years(start_date, end_date)
518        if len(years) > 1:
519            partition_condition = cls._get_multiple_years_partition(
520                start_date, end_date, years
521            )
522        else:
523            partition_condition = cls._get_single_year_partition(start_date, end_date)
524        return partition_condition
525
526    @classmethod
527    def _get_multiple_years_partition(
528        cls, start_date: str, end_date: str, years: list[str]
529    ) -> str:
530        """Return partition when executing multiple years (>1).
531
532        Args:
533            start_date: start of the period.
534            end_date: end of the period.
535            years: list of years.
536        """
537        start_date_month = cls._extract_date_part_from_date("MONTH", start_date)
538        start_date_day = cls._extract_date_part_from_date("DAY", start_date)
539
540        end_date_month = cls._extract_date_part_from_date("MONTH", end_date)
541        end_date_day = cls._extract_date_part_from_date("DAY", end_date)
542
543        year_statement = "(year = {0} and (".format(years[0]) + "{})"
544        if start_date_month != "12":
545            start_date_partition = year_statement.format(
546                "(month = {0} and day between {1} and 31)".format(
547                    start_date_month, start_date_day
548                )
549                + " or (month between {0} and 12)".format(int(start_date_month) + 1)
550            )
551        else:
552            start_date_partition = year_statement.format(
553                "month = {0} and day between {1} and 31".format(
554                    start_date_month, start_date_day
555                )
556            )
557
558        period_years_partition = ""
559
560        if len(years) == 3:
561            period_years_partition = ") or (year = {0}".format(years[1])
562        elif len(years) > 3:
563            period_years_partition = ") or (year between {0} and {1})".format(
564                years[1], years[-2]
565            )
566
567        if end_date_month != "01":
568            end_date_partition = (
569                ") or (year = {0} and ((month between 01 and {1})".format(
570                    years[-1], int(end_date_month) - 1
571                )
572                + " or (month = {0} and day between 1 and {1})))".format(
573                    end_date_month, end_date_day
574                )
575            )
576        else:
577            end_date_partition = (
578                ") or (year = {0} and month = 1 and day between 01 and {1})".format(
579                    years[-1], end_date_day
580                )
581            )
582        partition_condition = (
583            start_date_partition + period_years_partition + end_date_partition
584        )
585
586        return partition_condition
587
588    @classmethod
589    def _get_single_year_partition(cls, start_date: str, end_date: str) -> str:
590        """Return partition when executing a single year.
591
592        Args:
593            start_date: start of the period.
594            end_date: end of the period.
595        """
596        start_date_year = cls._extract_date_part_from_date("YEAR", start_date)
597        start_date_month = cls._extract_date_part_from_date("MONTH", start_date)
598        start_date_day = cls._extract_date_part_from_date("DAY", start_date)
599
600        end_date_year = cls._extract_date_part_from_date("YEAR", end_date)
601        end_date_month = cls._extract_date_part_from_date("MONTH", end_date)
602        end_date_day = cls._extract_date_part_from_date("DAY", end_date)
603
604        if start_date_month != end_date_month:
605            months = []
606            for i in range(int(start_date_month), int(end_date_month) + 1):
607                months.append(i)
608
609            start_date_partition = (
610                "year = {0} and ((month={1} and day between {2} and 31)".format(
611                    start_date_year, months[0], start_date_day
612                )
613            )
614            period_years_partition = ""
615            if len(months) == 2:
616                period_years_partition = start_date_partition
617            elif len(months) == 3:
618                period_years_partition = (
619                    start_date_partition + " or (month = {0})".format(months[1])
620                )
621            elif len(months) > 3:
622                period_years_partition = (
623                    start_date_partition
624                    + " or (month between {0} and {1})".format(months[1], months[-2])
625                )
626            partition_condition = (
627                period_years_partition
628                + " or (month = {0} and day between 1 and {1}))".format(
629                    end_date_month, end_date_day
630                )
631            )
632        else:
633            partition_condition = (
634                "year = {0} and month = {1} and day between {2} and {3}".format(
635                    end_date_year, end_date_month, start_date_day, end_date_day
636                )
637            )
638
639        return partition_condition
640
641    @classmethod
642    def _extract_date_part_from_date(cls, part: str, date: str) -> str:
643        """Extract date part from string date.
644
645        Args:
646            part: date part (possible values: DAY, MONTH, YEAR)
647            date: string date.
648        """
649        if "DAY" == part.upper():
650            return date[8:10]
651        elif "MONTH" == part.upper():
652            return date[5:7]
653        else:
654            return date[0:4]

Class to extract a partition based in a date period.

@classmethod
def get_years(cls, start_date: str, end_date: str) -> list[str]:
489    @classmethod
490    def get_years(cls, start_date: str, end_date: str) -> list[str]:
491        """Return a list of distinct years from the input parameters.
492
493        Args:
494            start_date: start of the period.
495            end_date: end of the period.
496        """
497        year = []
498        if start_date > end_date:
499            raise ValueError(
500                "Input Error: Invalid start_date and end_date. "
501                "Start_date is greater than end_date"
502            )
503
504        for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1):
505            year.append(str(i))
506
507        return year

Return a list of distinct years from the input parameters.

Arguments:
  • start_date: start of the period.
  • end_date: end of the period.
@classmethod
def get_partition_condition(cls, start_date: str, end_date: str) -> str:
509    @classmethod
510    def get_partition_condition(cls, start_date: str, end_date: str) -> str:
511        """Return year,month and day partition statement from the input parameters.
512
513        Args:
514            start_date: start of the period.
515            end_date: end of the period.
516        """
517        years = cls.get_years(start_date, end_date)
518        if len(years) > 1:
519            partition_condition = cls._get_multiple_years_partition(
520                start_date, end_date, years
521            )
522        else:
523            partition_condition = cls._get_single_year_partition(start_date, end_date)
524        return partition_condition

Return year,month and day partition statement from the input parameters.

Arguments:
  • start_date: start of the period.
  • end_date: end of the period.