lakehouse_engine.core.gab_manager

Module to define GAB Manager classes.

  1"""Module to define GAB Manager classes."""
  2
  3import calendar
  4from datetime import datetime, timedelta
  5from typing import Tuple, Union, cast
  6
  7import pendulum
  8from pendulum import DateTime
  9from pyspark.sql import DataFrame
 10
 11from lakehouse_engine.core.definitions import GABCadence, GABDefaults
 12from lakehouse_engine.core.gab_sql_generator import GABViewGenerator
 13from lakehouse_engine.utils.gab_utils import GABUtils
 14from lakehouse_engine.utils.logging_handler import LoggingHandler
 15
 16
 17class GABCadenceManager(object):
 18    """Class to control the GAB Cadence Window."""
 19
 20    _LOGGER = LoggingHandler(__name__).get_logger()
 21
 22    def extended_window_calculator(
 23        self,
 24        cadence: str,
 25        reconciliation_cadence: str,
 26        current_date: datetime,
 27        start_date_str: str,
 28        end_date_str: str,
 29        query_type: str,
 30        rerun_flag: str,
 31        snapshot_flag: str,
 32    ) -> tuple[datetime, datetime, datetime, datetime]:
 33        """extended_window_calculator function.
 34
 35        Calculates the extended window of any cadence despite the user providing
 36        custom dates which are not the exact start and end dates of a cadence.
 37
 38        Args:
 39            cadence: cadence to process
 40            reconciliation_cadence: reconciliation to process.
 41            current_date: current date.
 42            start_date_str: start date of the period to process.
 43            end_date_str: end date of the period to process.
 44            query_type: use case query type.
 45            rerun_flag: flag indicating if it's a rerun or a normal run.
 46            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
 47        """
 48        cad_order = GABCadence.get_ordered_cadences()
 49
 50        derived_cadence = self._get_reconciliation_cadence(
 51            cad_order, rerun_flag, cadence, reconciliation_cadence, snapshot_flag
 52        )
 53
 54        self._LOGGER.info(f"cadence passed to extended window: {derived_cadence}")
 55
 56        start_date = datetime.strptime(start_date_str, GABDefaults.DATE_FORMAT.value)
 57        end_date = datetime.strptime(end_date_str, GABDefaults.DATE_FORMAT.value)
 58
 59        bucket_start_date, bucket_end_date = self.get_cadence_start_end_dates(
 60            cadence, derived_cadence, start_date, end_date, query_type, current_date
 61        )
 62
 63        self._LOGGER.info(f"bucket dates: {bucket_start_date} - {bucket_end_date}")
 64
 65        filter_start_date, filter_end_date = self.get_cadence_start_end_dates(
 66            cadence,
 67            (
 68                reconciliation_cadence
 69                if cad_order[cadence] < cad_order[reconciliation_cadence]
 70                else cadence
 71            ),
 72            start_date,
 73            end_date,
 74            query_type,
 75            current_date,
 76        )
 77
 78        self._LOGGER.info(f"filter dates: {filter_start_date} - {filter_end_date}")
 79
 80        return bucket_start_date, bucket_end_date, filter_start_date, filter_end_date
 81
 82    @classmethod
 83    def _get_reconciliation_cadence(
 84        cls,
 85        cadence_order: dict,
 86        rerun_flag: str,
 87        cadence: str,
 88        reconciliation_cadence: str,
 89        snapshot_flag: str,
 90    ) -> str:
 91        """Get bigger cadence when rerun_flag or snapshot.
 92
 93        Args:
 94            cadence_order: ordered cadences.
 95            rerun_flag: flag indicating if it's a rerun or a normal run.
 96            cadence: cadence to process.
 97            reconciliation_cadence: reconciliation to process.
 98            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
 99        """
100        derived_cadence = reconciliation_cadence
101
102        if rerun_flag == "Y":
103            if cadence_order[cadence] > cadence_order[reconciliation_cadence]:
104                derived_cadence = cadence
105            elif cadence_order[cadence] < cadence_order[reconciliation_cadence]:
106                derived_cadence = reconciliation_cadence
107        else:
108            if (
109                cadence_order[cadence] > cadence_order[reconciliation_cadence]
110                and snapshot_flag == "Y"
111            ) or (cadence_order[cadence] < cadence_order[reconciliation_cadence]):
112                derived_cadence = reconciliation_cadence
113            elif (
114                cadence_order[cadence] > cadence_order[reconciliation_cadence]
115                and snapshot_flag == "N"
116            ):
117                derived_cadence = cadence
118
119        return derived_cadence
120
121    def get_cadence_start_end_dates(
122        self,
123        cadence: str,
124        derived_cadence: str,
125        start_date: datetime,
126        end_date: datetime,
127        query_type: str,
128        current_date: datetime,
129    ) -> tuple[datetime, datetime]:
130        """Generate the new set of extended start and end dates based on the cadence.
131
132        Running week cadence again to extend to correct week start and end date in case
133            of recon window for Week cadence is present.
134        For end_date 2012-12-31,in case of Quarter Recon window present for Week
135            cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31.
136        But these are not start and end dates of week. Hence, to correct this, new dates
137            are passed again to get the correct dates.
138
139        Args:
140            cadence: cadence to process.
141            derived_cadence: cadence reconciliation to process.
142            start_date: start date of the period to process.
143            end_date: end date of the period to process.
144            query_type: use case query type.
145            current_date: current date to be used in the end date, in case the end date
146                is greater than current date so the end date should be the current date.
147        """
148        new_start_date = self._get_cadence_calculated_date(
149            derived_cadence=derived_cadence, base_date=start_date, is_start=True
150        )
151        new_end_date = self._get_cadence_calculated_date(
152            derived_cadence=derived_cadence, base_date=end_date, is_start=False
153        )
154
155        if cadence.upper() == "WEEK":
156            new_start_date = (
157                pendulum.datetime(
158                    int(new_start_date.strftime("%Y")),
159                    int(new_start_date.strftime("%m")),
160                    int(new_start_date.strftime("%d")),
161                )
162                .start_of("week")
163                .replace(tzinfo=None)
164            )
165            new_end_date = (
166                pendulum.datetime(
167                    int(new_end_date.strftime("%Y")),
168                    int(new_end_date.strftime("%m")),
169                    int(new_end_date.strftime("%d")),
170                )
171                .end_of("week")
172                .replace(hour=0, minute=0, second=0, microsecond=0)
173                .replace(tzinfo=None)
174            )
175
176        new_end_date = new_end_date + timedelta(days=1)
177
178        if new_end_date >= current_date:
179            new_end_date = current_date
180
181        if query_type == "NAM":
182            new_end_date = new_end_date + timedelta(days=1)
183
184        return new_start_date, new_end_date
185
186    @classmethod
187    def _get_cadence_calculated_date(
188        cls, derived_cadence: str, base_date: datetime, is_start: bool
189    ) -> Union[datetime, DateTime]:  # type: ignore
190        cadence_base_date = cls._get_cadence_base_date(derived_cadence, base_date)
191        cadence_date_calculated: Union[DateTime, datetime]
192
193        if derived_cadence.upper() == "WEEK":
194            cadence_date_calculated = cls._get_calculated_week_date(
195                cast(DateTime, cadence_base_date), is_start
196            )
197        elif derived_cadence.upper() == "MONTH":
198            cadence_date_calculated = cls._get_calculated_month_date(
199                cast(datetime, cadence_base_date), is_start
200            )
201        elif derived_cadence.upper() in ["QUARTER", "YEAR"]:
202            cadence_date_calculated = cls._get_calculated_quarter_or_year_date(
203                cast(DateTime, cadence_base_date), is_start, derived_cadence
204            )
205        else:
206            cadence_date_calculated = cadence_base_date  # type: ignore
207
208        return cadence_date_calculated  # type: ignore
209
210    @classmethod
211    def _get_cadence_base_date(
212        cls, derived_cadence: str, base_date: datetime
213    ) -> Union[datetime, DateTime, str]:  # type: ignore
214        """Get start date for the selected cadence.
215
216        Args:
217            derived_cadence: cadence reconciliation to process.
218            base_date: base date used to compute the start date of the cadence.
219        """
220        if derived_cadence.upper() in ["DAY", "MONTH"]:
221            cadence_date_calculated = base_date
222        elif derived_cadence.upper() in ["WEEK", "QUARTER", "YEAR"]:
223            cadence_date_calculated = pendulum.datetime(
224                int(base_date.strftime("%Y")),
225                int(base_date.strftime("%m")),
226                int(base_date.strftime("%d")),
227            )
228        else:
229            cadence_date_calculated = "0"  # type: ignore
230
231        return cadence_date_calculated
232
233    @classmethod
234    def _get_calculated_week_date(
235        cls, cadence_date_calculated: DateTime, is_start: bool
236    ) -> DateTime:
237        """Get WEEK start/end date.
238
239        Args:
240            cadence_date_calculated: base date to compute the week date.
241            is_start: flag indicating if we should get the start or end for the cadence.
242        """
243        if is_start:
244            cadence_date_calculated = cadence_date_calculated.start_of("week").replace(
245                tzinfo=None
246            )
247        else:
248            cadence_date_calculated = (
249                cadence_date_calculated.end_of("week")
250                .replace(hour=0, minute=0, second=0, microsecond=0)
251                .replace(tzinfo=None)
252            )
253
254        return cadence_date_calculated
255
256    @classmethod
257    def _get_calculated_month_date(
258        cls, cadence_date_calculated: datetime, is_start: bool
259    ) -> datetime:
260        """Get MONTH start/end date.
261
262        Args:
263            cadence_date_calculated: base date to compute the month date.
264            is_start: flag indicating if we should get the start or end for the cadence.
265        """
266        if is_start:
267            cadence_date_calculated = cadence_date_calculated - timedelta(
268                days=(int(cadence_date_calculated.strftime("%d")) - 1)
269            )
270        else:
271            cadence_date_calculated = datetime(
272                int(cadence_date_calculated.strftime("%Y")),
273                int(cadence_date_calculated.strftime("%m")),
274                calendar.monthrange(
275                    int(cadence_date_calculated.strftime("%Y")),
276                    int(cadence_date_calculated.strftime("%m")),
277                )[1],
278            )
279
280        return cadence_date_calculated
281
282    @classmethod
283    def _get_calculated_quarter_or_year_date(
284        cls, cadence_date_calculated: DateTime, is_start: bool, cadence: str
285    ) -> DateTime:
286        """Get QUARTER/YEAR start/end date.
287
288        Args:
289            cadence_date_calculated: base date to compute the quarter/year date.
290            is_start: flag indicating if we should get the start or end for the cadence.
291            cadence: selected cadence (possible values: QUARTER or YEAR).
292        """
293        if is_start:
294            cadence_date_calculated = cadence_date_calculated.first_of(
295                cadence.lower()
296            ).replace(tzinfo=None)
297        else:
298            cadence_date_calculated = cadence_date_calculated.last_of(
299                cadence.lower()
300            ).replace(tzinfo=None)
301
302        return cadence_date_calculated
303
304
305class GABViewManager(object):
306    """Class to control the GAB View creation."""
307
308    _LOGGER = LoggingHandler(__name__).get_logger()
309
310    def __init__(
311        self,
312        query_id: str,
313        lookup_query_builder: DataFrame,
314        target_database: str,
315        target_table: str,
316    ):
317        """Construct GABViewManager instances.
318
319        Args:
320            query_id: gab configuration table use case identifier.
321            lookup_query_builder: gab configuration data.
322            target_database: target database to write.
323            target_table: target table to write.
324        """
325        self.query_id = query_id
326        self.lookup_query_builder = lookup_query_builder
327        self.target_database = target_database
328        self.target_table = target_table
329
330    def generate_use_case_views(self) -> None:
331        """Generate all the use case views.
332
333        Generates the DDLs for each of the views. This DDL is dynamically built based on
334        the mappings provided in the config table.
335        """
336        reconciliation_window = GABUtils.get_json_column_as_dict(
337            self.lookup_query_builder, self.query_id, "recon_window"
338        )
339
340        cadence_snapshot_status = self._get_cadence_snapshot_status(
341            reconciliation_window
342        )
343
344        (
345            cadences_with_snapshot,
346            cadences_without_snapshot,
347        ) = self._split_cadence_by_snapshot(cadence_snapshot_status)
348
349        mappings = GABUtils.get_json_column_as_dict(
350            self.lookup_query_builder, self.query_id, "mappings"
351        )
352
353        for view_name in mappings.keys():
354            self._generate_use_case_view(
355                mappings,
356                view_name,
357                cadence_snapshot_status,
358                cadences_with_snapshot,
359                cadences_without_snapshot,
360                self.target_database,
361                self.target_table,
362                self.query_id,
363            )
364
365    @classmethod
366    def _generate_use_case_view(
367        cls,
368        mappings: dict,
369        view_name: str,
370        cadence_snapshot_status: dict,
371        cadences_with_snapshot: list[str],
372        cadences_without_snapshot: list[str],
373        target_database: str,
374        target_table: str,
375        query_id: str,
376    ) -> None:
377        """Generate the selected use case views.
378
379        Args:
380            mappings: use case mappings configuration.
381            view_name: name of the view to be generated.
382            cadence_snapshot_status: cadences to execute with the information if it has
383                snapshot.
384            cadences_with_snapshot: cadences to execute with snapshot.
385            cadences_without_snapshot: cadences to execute without snapshot.
386            target_database: target database to write.
387            target_table: target table to write.
388            query_id: gab configuration table use case identifier.
389        """
390        view_configuration = mappings[view_name]
391
392        view_dimensions = view_configuration["dimensions"]
393        view_metrics = view_configuration["metric"]
394        custom_filter = view_configuration["filter"]
395
396        view_filter = " "
397        if custom_filter:
398            view_filter = " AND " + custom_filter
399
400        (
401            dimensions,
402            dimensions_and_metrics,
403            dimensions_and_metrics_with_alias,
404        ) = cls._get_dimensions_and_metrics_from_use_case_view(
405            view_dimensions, view_metrics
406        )
407
408        (
409            final_cols,
410            final_calculated_script,
411            final_calculated_script_snapshot,
412        ) = cls._get_calculated_and_derived_metrics_from_use_case_view(
413            view_metrics, view_dimensions, cadence_snapshot_status
414        )
415
416        GABViewGenerator(
417            cadence_snapshot_status=cadence_snapshot_status,
418            target_database=target_database,
419            view_name=view_name,
420            final_cols=final_cols,
421            target_table=target_table,
422            dimensions_and_metrics_with_alias=dimensions_and_metrics_with_alias,
423            dimensions=dimensions,
424            dimensions_and_metrics=dimensions_and_metrics,
425            final_calculated_script=final_calculated_script,
426            query_id=query_id,
427            view_filter=view_filter,
428            final_calculated_script_snapshot=final_calculated_script_snapshot,
429            without_snapshot_cadences=cadences_without_snapshot,
430            with_snapshot_cadences=cadences_with_snapshot,
431        ).generate_sql()
432
433    @classmethod
434    def _get_dimensions_and_metrics_from_use_case_view(
435        cls, view_dimensions: dict, view_metrics: dict
436    ) -> Tuple[str, str, str]:
437        """Get dimensions and metrics from use case.
438
439        Args:
440            view_dimensions: use case configured dimensions.
441            view_metrics: use case configured metrics.
442        """
443        (
444            extracted_dimensions_with_alias,
445            extracted_dimensions_without_alias,
446        ) = GABUtils.extract_columns_from_mapping(
447            columns=view_dimensions,
448            is_dimension=True,
449            extract_column_without_alias=True,
450            table_alias="a",
451            is_extracted_value_as_name=False,
452        )
453
454        dimensions_without_default_columns = [
455            extracted_dimension
456            for extracted_dimension in extracted_dimensions_without_alias
457            if extracted_dimension not in GABDefaults.DIMENSIONS_DEFAULT_COLUMNS.value
458        ]
459
460        dimensions = ",".join(dimensions_without_default_columns)
461        dimensions_with_alias = ",".join(extracted_dimensions_with_alias)
462
463        (
464            extracted_metrics_with_alias,
465            extracted_metrics_without_alias,
466        ) = GABUtils.extract_columns_from_mapping(
467            columns=view_metrics,
468            is_dimension=False,
469            extract_column_without_alias=True,
470            table_alias="a",
471            is_extracted_value_as_name=False,
472        )
473        metrics = ",".join(extracted_metrics_without_alias)
474        metrics_with_alias = ",".join(extracted_metrics_with_alias)
475
476        dimensions_and_metrics_with_alias = (
477            dimensions_with_alias + "," + metrics_with_alias
478        )
479        dimensions_and_metrics = dimensions + "," + metrics
480
481        return dimensions, dimensions_and_metrics, dimensions_and_metrics_with_alias
482
483    @classmethod
484    def _get_calculated_and_derived_metrics_from_use_case_view(
485        cls, view_metrics: dict, view_dimensions: dict, cadence_snapshot_status: dict
486    ) -> Tuple[str, str, str]:
487        """Get calculated and derived metrics from use case.
488
489        Args:
490            view_dimensions: use case configured dimensions.
491            view_metrics: use case configured metrics.
492            cadence_snapshot_status: cadences to execute with the information if it has
493                snapshot.
494        """
495        calculated_script = []
496        calculated_script_snapshot = []
497        derived_script = []
498        for metric_key, metric_value in view_metrics.items():
499            (
500                calculated_metrics_script,
501                calculated_metrics_script_snapshot,
502                derived_metrics_script,
503            ) = cls._get_calculated_metrics(
504                metric_key, metric_value, view_dimensions, cadence_snapshot_status
505            )
506            calculated_script += [*calculated_metrics_script]
507            calculated_script_snapshot += [*calculated_metrics_script_snapshot]
508            derived_script += [*derived_metrics_script]
509
510        joined_calculated_script = cls._join_list_to_string_when_present(
511            calculated_script
512        )
513        joined_calculated_script_snapshot = cls._join_list_to_string_when_present(
514            calculated_script_snapshot
515        )
516
517        joined_derived = cls._join_list_to_string_when_present(
518            to_join=derived_script, starting_value="*,", default_value="*"
519        )
520
521        return (
522            joined_derived,
523            joined_calculated_script,
524            joined_calculated_script_snapshot,
525        )
526
527    @classmethod
528    def _join_list_to_string_when_present(
529        cls,
530        to_join: list[str],
531        separator: str = ",",
532        starting_value: str = ",",
533        default_value: str = "",
534    ) -> str:
535        """Join list to string when has values, otherwise return the default value.
536
537        Args:
538            to_join: values to join.
539            separator: separator to be used in the join.
540            starting_value: value to be started before the join.
541            default_value: value to be returned if the list is empty.
542        """
543        return starting_value + separator.join(to_join) if to_join else default_value
544
545    @classmethod
546    def _get_cadence_snapshot_status(cls, result: dict) -> dict:
547        cadence_snapshot_status = {}
548        for k, v in result.items():
549            cadence_snapshot_status[k] = next(
550                (
551                    next(
552                        (
553                            snap_list["snapshot"]
554                            for snap_list in loop_outer_cad.values()
555                            if snap_list["snapshot"] == "Y"
556                        ),
557                        "N",
558                    )
559                    for loop_outer_cad in v.values()
560                    if v
561                ),
562                "N",
563            )
564
565        return cadence_snapshot_status
566
567    @classmethod
568    def _split_cadence_by_snapshot(
569        cls, cadence_snapshot_status: dict
570    ) -> tuple[list[str], list[str]]:
571        """Split cadences by the snapshot value.
572
573        Args:
574            cadence_snapshot_status: cadences to be split by snapshot status.
575        """
576        with_snapshot_cadences = []
577        without_snapshot_cadences = []
578
579        for key_snap_status, value_snap_status in cadence_snapshot_status.items():
580            if value_snap_status == "Y":
581                with_snapshot_cadences.append(key_snap_status)
582            else:
583                without_snapshot_cadences.append(key_snap_status)
584
585        return with_snapshot_cadences, without_snapshot_cadences
586
587    @classmethod
588    def _get_calculated_metrics(
589        cls,
590        metric_key: str,
591        metric_value: dict,
592        view_dimensions: dict,
593        cadence_snapshot_status: dict,
594    ) -> tuple[list[str], list[str], list[str]]:
595        """Get calculated metrics from use case.
596
597        Args:
598            metric_key: use case metric name.
599            metric_value: use case metric value.
600            view_dimensions: use case configured dimensions.
601            cadence_snapshot_status: cadences to execute with the information if it has
602                snapshot.
603        """
604        dim_partition = ",".join([str(i) for i in view_dimensions.keys()][2:])
605        dim_partition = "cadence," + dim_partition
606        calculated_metrics = metric_value["calculated_metric"]
607        derived_metrics = metric_value["derived_metric"]
608        calculated_metrics_script: list[str] = []
609        calculated_metrics_script_snapshot: list[str] = []
610        derived_metrics_script: list[str] = []
611
612        if calculated_metrics:
613            (
614                calculated_metrics_script,
615                calculated_metrics_script_snapshot,
616            ) = cls._get_calculated_metric(
617                metric_key, calculated_metrics, dim_partition, cadence_snapshot_status
618            )
619
620        if derived_metrics:
621            derived_metrics_script = cls._get_derived_metrics(derived_metrics)
622
623        return (
624            calculated_metrics_script,
625            calculated_metrics_script_snapshot,
626            derived_metrics_script,
627        )
628
629    @classmethod
630    def _get_derived_metrics(cls, derived_metric: dict) -> list[str]:
631        """Get derived metrics from use case.
632
633        Args:
634            derived_metric: use case derived metrics.
635        """
636        derived_metric_script = []
637
638        for i in range(0, len(derived_metric)):
639            derived_formula = str(derived_metric[i]["formula"])
640            derived_label = derived_metric[i]["label"]
641            derived_metric_script.append(derived_formula + " AS " + derived_label)
642
643        return derived_metric_script
644
645    @classmethod
646    def _get_calculated_metric(
647        cls,
648        metric_key: str,
649        calculated_metric: dict,
650        dimension_partition: str,
651        cadence_snapshot_status: dict,
652    ) -> tuple[list[str], list[str]]:
653        """Get calculated metrics from use case.
654
655        Args:
656            metric_key: use case metric name.
657            calculated_metric: use case calculated metrics.
658            dimension_partition: dimension partition.
659            cadence_snapshot_status: cadences to execute with the information if it has
660                snapshot.
661        """
662        last_cadence_script: list[str] = []
663        last_year_cadence_script: list[str] = []
664        window_script: list[str] = []
665        last_cadence_script_snapshot: list[str] = []
666        last_year_cadence_script_snapshot: list[str] = []
667        window_script_snapshot: list[str] = []
668
669        if "last_cadence" in calculated_metric:
670            (
671                last_cadence_script,
672                last_cadence_script_snapshot,
673            ) = cls._get_cadence_calculated_metric(
674                metric_key,
675                dimension_partition,
676                calculated_metric,
677                cadence_snapshot_status,
678                "last_cadence",
679            )
680        if "last_year_cadence" in calculated_metric:
681            (
682                last_year_cadence_script,
683                last_year_cadence_script_snapshot,
684            ) = cls._get_cadence_calculated_metric(
685                metric_key,
686                dimension_partition,
687                calculated_metric,
688                cadence_snapshot_status,
689                "last_year_cadence",
690            )
691        if "window_function" in calculated_metric:
692            window_script, window_script_snapshot = cls._get_window_calculated_metric(
693                metric_key,
694                dimension_partition,
695                calculated_metric,
696                cadence_snapshot_status,
697            )
698
699        calculated_script = [
700            *last_cadence_script,
701            *last_year_cadence_script,
702            *window_script,
703        ]
704        calculated_script_snapshot = [
705            *last_cadence_script_snapshot,
706            *last_year_cadence_script_snapshot,
707            *window_script_snapshot,
708        ]
709
710        return calculated_script, calculated_script_snapshot
711
712    @classmethod
713    def _get_window_calculated_metric(
714        cls,
715        metric_key: str,
716        dimension_partition: str,
717        calculated_metric: dict,
718        cadence_snapshot_status: dict,
719    ) -> tuple[list, list]:
720        """Get window calculated metrics from use case.
721
722        Args:
723            metric_key: use case metric name.
724            dimension_partition: dimension partition.
725            calculated_metric: use case calculated metrics.
726            cadence_snapshot_status: cadences to execute with the information if it has
727                snapshot.
728        """
729        calculated_script = []
730        calculated_script_snapshot = []
731
732        for i in range(0, len(calculated_metric["window_function"])):
733            window_function = calculated_metric["window_function"][i]["agg_func"]
734            window_function_start = calculated_metric["window_function"][i]["window"][0]
735            window_function_end = calculated_metric["window_function"][i]["window"][1]
736            window_label = calculated_metric["window_function"][i]["label"]
737
738            calculated_script.append(
739                f"""
740                NVL(
741                    {window_function}({metric_key}) OVER
742                    (
743                        PARTITION BY {dimension_partition}
744                        order by from_date ROWS BETWEEN
745                            {str(window_function_start)} PRECEDING
746                            AND {str(window_function_end)} PRECEDING
747                    ),
748                    0
749                ) AS
750                {window_label}
751                """
752            )
753
754            if "Y" in cadence_snapshot_status.values():
755                calculated_script_snapshot.append(
756                    f"""
757                    NVL(
758                        {window_function}({metric_key}) OVER
759                        (
760                            PARTITION BY {dimension_partition} ,rn
761                            order by from_date ROWS BETWEEN
762                                {str(window_function_start)} PRECEDING
763                                AND {str(window_function_end)} PRECEDING
764                        ),
765                        0
766                    ) AS
767                    {window_label}
768                    """
769                )
770
771        return calculated_script, calculated_script_snapshot
772
773    @classmethod
774    def _get_cadence_calculated_metric(
775        cls,
776        metric_key: str,
777        dimension_partition: str,
778        calculated_metric: dict,
779        cadence_snapshot_status: dict,
780        cadence: str,
781    ) -> tuple[list, list]:
782        """Get cadence calculated metrics from use case.
783
784        Args:
785            metric_key: use case metric name.
786            calculated_metric: use case calculated metrics.
787            dimension_partition: dimension partition.
788            cadence_snapshot_status: cadences to execute with the information if it has
789                snapshot.
790            cadence: cadence to process.
791        """
792        calculated_script = []
793        calculated_script_snapshot = []
794
795        for i in range(0, len(calculated_metric[cadence])):
796            cadence_lag = cls._get_cadence_item_lag(calculated_metric, cadence, i)
797            cadence_label = calculated_metric[cadence][i]["label"]
798
799            calculated_script.append(
800                cls._get_cadence_lag_statement(
801                    metric_key,
802                    cadence_lag,
803                    dimension_partition,
804                    cadence_label,
805                    snapshot=False,
806                    cadence=cadence,
807                )
808            )
809
810            if "Y" in cadence_snapshot_status.values():
811                calculated_script_snapshot.append(
812                    cls._get_cadence_lag_statement(
813                        metric_key,
814                        cadence_lag,
815                        dimension_partition,
816                        cadence_label,
817                        snapshot=True,
818                        cadence=cadence,
819                    )
820                )
821
822        return calculated_script, calculated_script_snapshot
823
824    @classmethod
825    def _get_cadence_item_lag(
826        cls, calculated_metric: dict, cadence: str, item: int
827    ) -> str:
828        """Get calculated metric item lag.
829
830        Args:
831            calculated_metric: use case calculated metrics.
832            cadence: cadence to process.
833            item: metric item.
834        """
835        return str(calculated_metric[cadence][item]["window"])
836
837    @classmethod
838    def _get_cadence_lag_statement(
839        cls,
840        metric_key: str,
841        cadence_lag: str,
842        dimension_partition: str,
843        cadence_label: str,
844        snapshot: bool,
845        cadence: str,
846    ) -> str:
847        """Get cadence lag statement.
848
849        Args:
850            metric_key: use case metric name.
851            cadence_lag: cadence window lag.
852            dimension_partition: dimension partition.
853            cadence_label: cadence name.
854            snapshot: indicate if the snapshot is enabled.
855            cadence: cadence to process.
856        """
857        cadence_lag_statement = ""
858        if cadence == "last_cadence":
859            cadence_lag_statement = (
860                "NVL(LAG("
861                + metric_key
862                + ","
863                + cadence_lag
864                + ") OVER(PARTITION BY "
865                + dimension_partition
866                + (",rn" if snapshot else "")
867                + " order by from_date),0) AS "
868                + cadence_label
869            )
870        elif cadence == "last_year_cadence":
871            cadence_lag_statement = (
872                "NVL(LAG("
873                + metric_key
874                + ","
875                + cadence_lag
876                + ") OVER(PARTITION BY "
877                + dimension_partition
878                + (",rn" if snapshot else "")
879                + """,
880                    case
881                        when cadence in ('DAY','MONTH','QUARTER')
882                            then struct(month(from_date), day(from_date))
883                        when cadence in('WEEK')
884                            then struct(weekofyear(from_date+1),1)
885                    end order by from_date),0) AS """
886                + cadence_label
887            )
888        else:
889            cls._LOGGER.error(f"Cadence {cadence} not implemented yet")
890
891        return cadence_lag_statement
class GABCadenceManager:
 18class GABCadenceManager(object):
 19    """Class to control the GAB Cadence Window."""
 20
 21    _LOGGER = LoggingHandler(__name__).get_logger()
 22
 23    def extended_window_calculator(
 24        self,
 25        cadence: str,
 26        reconciliation_cadence: str,
 27        current_date: datetime,
 28        start_date_str: str,
 29        end_date_str: str,
 30        query_type: str,
 31        rerun_flag: str,
 32        snapshot_flag: str,
 33    ) -> tuple[datetime, datetime, datetime, datetime]:
 34        """extended_window_calculator function.
 35
 36        Calculates the extended window of any cadence despite the user providing
 37        custom dates which are not the exact start and end dates of a cadence.
 38
 39        Args:
 40            cadence: cadence to process
 41            reconciliation_cadence: reconciliation to process.
 42            current_date: current date.
 43            start_date_str: start date of the period to process.
 44            end_date_str: end date of the period to process.
 45            query_type: use case query type.
 46            rerun_flag: flag indicating if it's a rerun or a normal run.
 47            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
 48        """
 49        cad_order = GABCadence.get_ordered_cadences()
 50
 51        derived_cadence = self._get_reconciliation_cadence(
 52            cad_order, rerun_flag, cadence, reconciliation_cadence, snapshot_flag
 53        )
 54
 55        self._LOGGER.info(f"cadence passed to extended window: {derived_cadence}")
 56
 57        start_date = datetime.strptime(start_date_str, GABDefaults.DATE_FORMAT.value)
 58        end_date = datetime.strptime(end_date_str, GABDefaults.DATE_FORMAT.value)
 59
 60        bucket_start_date, bucket_end_date = self.get_cadence_start_end_dates(
 61            cadence, derived_cadence, start_date, end_date, query_type, current_date
 62        )
 63
 64        self._LOGGER.info(f"bucket dates: {bucket_start_date} - {bucket_end_date}")
 65
 66        filter_start_date, filter_end_date = self.get_cadence_start_end_dates(
 67            cadence,
 68            (
 69                reconciliation_cadence
 70                if cad_order[cadence] < cad_order[reconciliation_cadence]
 71                else cadence
 72            ),
 73            start_date,
 74            end_date,
 75            query_type,
 76            current_date,
 77        )
 78
 79        self._LOGGER.info(f"filter dates: {filter_start_date} - {filter_end_date}")
 80
 81        return bucket_start_date, bucket_end_date, filter_start_date, filter_end_date
 82
 83    @classmethod
 84    def _get_reconciliation_cadence(
 85        cls,
 86        cadence_order: dict,
 87        rerun_flag: str,
 88        cadence: str,
 89        reconciliation_cadence: str,
 90        snapshot_flag: str,
 91    ) -> str:
 92        """Get bigger cadence when rerun_flag or snapshot.
 93
 94        Args:
 95            cadence_order: ordered cadences.
 96            rerun_flag: flag indicating if it's a rerun or a normal run.
 97            cadence: cadence to process.
 98            reconciliation_cadence: reconciliation to process.
 99            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
100        """
101        derived_cadence = reconciliation_cadence
102
103        if rerun_flag == "Y":
104            if cadence_order[cadence] > cadence_order[reconciliation_cadence]:
105                derived_cadence = cadence
106            elif cadence_order[cadence] < cadence_order[reconciliation_cadence]:
107                derived_cadence = reconciliation_cadence
108        else:
109            if (
110                cadence_order[cadence] > cadence_order[reconciliation_cadence]
111                and snapshot_flag == "Y"
112            ) or (cadence_order[cadence] < cadence_order[reconciliation_cadence]):
113                derived_cadence = reconciliation_cadence
114            elif (
115                cadence_order[cadence] > cadence_order[reconciliation_cadence]
116                and snapshot_flag == "N"
117            ):
118                derived_cadence = cadence
119
120        return derived_cadence
121
122    def get_cadence_start_end_dates(
123        self,
124        cadence: str,
125        derived_cadence: str,
126        start_date: datetime,
127        end_date: datetime,
128        query_type: str,
129        current_date: datetime,
130    ) -> tuple[datetime, datetime]:
131        """Generate the new set of extended start and end dates based on the cadence.
132
133        Running week cadence again to extend to correct week start and end date in case
134            of recon window for Week cadence is present.
135        For end_date 2012-12-31,in case of Quarter Recon window present for Week
136            cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31.
137        But these are not start and end dates of week. Hence, to correct this, new dates
138            are passed again to get the correct dates.
139
140        Args:
141            cadence: cadence to process.
142            derived_cadence: cadence reconciliation to process.
143            start_date: start date of the period to process.
144            end_date: end date of the period to process.
145            query_type: use case query type.
146            current_date: current date to be used in the end date, in case the end date
147                is greater than current date so the end date should be the current date.
148        """
149        new_start_date = self._get_cadence_calculated_date(
150            derived_cadence=derived_cadence, base_date=start_date, is_start=True
151        )
152        new_end_date = self._get_cadence_calculated_date(
153            derived_cadence=derived_cadence, base_date=end_date, is_start=False
154        )
155
156        if cadence.upper() == "WEEK":
157            new_start_date = (
158                pendulum.datetime(
159                    int(new_start_date.strftime("%Y")),
160                    int(new_start_date.strftime("%m")),
161                    int(new_start_date.strftime("%d")),
162                )
163                .start_of("week")
164                .replace(tzinfo=None)
165            )
166            new_end_date = (
167                pendulum.datetime(
168                    int(new_end_date.strftime("%Y")),
169                    int(new_end_date.strftime("%m")),
170                    int(new_end_date.strftime("%d")),
171                )
172                .end_of("week")
173                .replace(hour=0, minute=0, second=0, microsecond=0)
174                .replace(tzinfo=None)
175            )
176
177        new_end_date = new_end_date + timedelta(days=1)
178
179        if new_end_date >= current_date:
180            new_end_date = current_date
181
182        if query_type == "NAM":
183            new_end_date = new_end_date + timedelta(days=1)
184
185        return new_start_date, new_end_date
186
187    @classmethod
188    def _get_cadence_calculated_date(
189        cls, derived_cadence: str, base_date: datetime, is_start: bool
190    ) -> Union[datetime, DateTime]:  # type: ignore
191        cadence_base_date = cls._get_cadence_base_date(derived_cadence, base_date)
192        cadence_date_calculated: Union[DateTime, datetime]
193
194        if derived_cadence.upper() == "WEEK":
195            cadence_date_calculated = cls._get_calculated_week_date(
196                cast(DateTime, cadence_base_date), is_start
197            )
198        elif derived_cadence.upper() == "MONTH":
199            cadence_date_calculated = cls._get_calculated_month_date(
200                cast(datetime, cadence_base_date), is_start
201            )
202        elif derived_cadence.upper() in ["QUARTER", "YEAR"]:
203            cadence_date_calculated = cls._get_calculated_quarter_or_year_date(
204                cast(DateTime, cadence_base_date), is_start, derived_cadence
205            )
206        else:
207            cadence_date_calculated = cadence_base_date  # type: ignore
208
209        return cadence_date_calculated  # type: ignore
210
211    @classmethod
212    def _get_cadence_base_date(
213        cls, derived_cadence: str, base_date: datetime
214    ) -> Union[datetime, DateTime, str]:  # type: ignore
215        """Get start date for the selected cadence.
216
217        Args:
218            derived_cadence: cadence reconciliation to process.
219            base_date: base date used to compute the start date of the cadence.
220        """
221        if derived_cadence.upper() in ["DAY", "MONTH"]:
222            cadence_date_calculated = base_date
223        elif derived_cadence.upper() in ["WEEK", "QUARTER", "YEAR"]:
224            cadence_date_calculated = pendulum.datetime(
225                int(base_date.strftime("%Y")),
226                int(base_date.strftime("%m")),
227                int(base_date.strftime("%d")),
228            )
229        else:
230            cadence_date_calculated = "0"  # type: ignore
231
232        return cadence_date_calculated
233
234    @classmethod
235    def _get_calculated_week_date(
236        cls, cadence_date_calculated: DateTime, is_start: bool
237    ) -> DateTime:
238        """Get WEEK start/end date.
239
240        Args:
241            cadence_date_calculated: base date to compute the week date.
242            is_start: flag indicating if we should get the start or end for the cadence.
243        """
244        if is_start:
245            cadence_date_calculated = cadence_date_calculated.start_of("week").replace(
246                tzinfo=None
247            )
248        else:
249            cadence_date_calculated = (
250                cadence_date_calculated.end_of("week")
251                .replace(hour=0, minute=0, second=0, microsecond=0)
252                .replace(tzinfo=None)
253            )
254
255        return cadence_date_calculated
256
257    @classmethod
258    def _get_calculated_month_date(
259        cls, cadence_date_calculated: datetime, is_start: bool
260    ) -> datetime:
261        """Get MONTH start/end date.
262
263        Args:
264            cadence_date_calculated: base date to compute the month date.
265            is_start: flag indicating if we should get the start or end for the cadence.
266        """
267        if is_start:
268            cadence_date_calculated = cadence_date_calculated - timedelta(
269                days=(int(cadence_date_calculated.strftime("%d")) - 1)
270            )
271        else:
272            cadence_date_calculated = datetime(
273                int(cadence_date_calculated.strftime("%Y")),
274                int(cadence_date_calculated.strftime("%m")),
275                calendar.monthrange(
276                    int(cadence_date_calculated.strftime("%Y")),
277                    int(cadence_date_calculated.strftime("%m")),
278                )[1],
279            )
280
281        return cadence_date_calculated
282
283    @classmethod
284    def _get_calculated_quarter_or_year_date(
285        cls, cadence_date_calculated: DateTime, is_start: bool, cadence: str
286    ) -> DateTime:
287        """Get QUARTER/YEAR start/end date.
288
289        Args:
290            cadence_date_calculated: base date to compute the quarter/year date.
291            is_start: flag indicating if we should get the start or end for the cadence.
292            cadence: selected cadence (possible values: QUARTER or YEAR).
293        """
294        if is_start:
295            cadence_date_calculated = cadence_date_calculated.first_of(
296                cadence.lower()
297            ).replace(tzinfo=None)
298        else:
299            cadence_date_calculated = cadence_date_calculated.last_of(
300                cadence.lower()
301            ).replace(tzinfo=None)
302
303        return cadence_date_calculated

Class to control the GAB Cadence Window.

def extended_window_calculator( self, cadence: str, reconciliation_cadence: str, current_date: datetime.datetime, start_date_str: str, end_date_str: str, query_type: str, rerun_flag: str, snapshot_flag: str) -> tuple[datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime]:
23    def extended_window_calculator(
24        self,
25        cadence: str,
26        reconciliation_cadence: str,
27        current_date: datetime,
28        start_date_str: str,
29        end_date_str: str,
30        query_type: str,
31        rerun_flag: str,
32        snapshot_flag: str,
33    ) -> tuple[datetime, datetime, datetime, datetime]:
34        """extended_window_calculator function.
35
36        Calculates the extended window of any cadence despite the user providing
37        custom dates which are not the exact start and end dates of a cadence.
38
39        Args:
40            cadence: cadence to process
41            reconciliation_cadence: reconciliation to process.
42            current_date: current date.
43            start_date_str: start date of the period to process.
44            end_date_str: end date of the period to process.
45            query_type: use case query type.
46            rerun_flag: flag indicating if it's a rerun or a normal run.
47            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
48        """
49        cad_order = GABCadence.get_ordered_cadences()
50
51        derived_cadence = self._get_reconciliation_cadence(
52            cad_order, rerun_flag, cadence, reconciliation_cadence, snapshot_flag
53        )
54
55        self._LOGGER.info(f"cadence passed to extended window: {derived_cadence}")
56
57        start_date = datetime.strptime(start_date_str, GABDefaults.DATE_FORMAT.value)
58        end_date = datetime.strptime(end_date_str, GABDefaults.DATE_FORMAT.value)
59
60        bucket_start_date, bucket_end_date = self.get_cadence_start_end_dates(
61            cadence, derived_cadence, start_date, end_date, query_type, current_date
62        )
63
64        self._LOGGER.info(f"bucket dates: {bucket_start_date} - {bucket_end_date}")
65
66        filter_start_date, filter_end_date = self.get_cadence_start_end_dates(
67            cadence,
68            (
69                reconciliation_cadence
70                if cad_order[cadence] < cad_order[reconciliation_cadence]
71                else cadence
72            ),
73            start_date,
74            end_date,
75            query_type,
76            current_date,
77        )
78
79        self._LOGGER.info(f"filter dates: {filter_start_date} - {filter_end_date}")
80
81        return bucket_start_date, bucket_end_date, filter_start_date, filter_end_date

extended_window_calculator function.

Calculates the extended window of any cadence despite the user providing custom dates which are not the exact start and end dates of a cadence.

Arguments:
  • cadence: cadence to process
  • reconciliation_cadence: reconciliation to process.
  • current_date: current date.
  • start_date_str: start date of the period to process.
  • end_date_str: end date of the period to process.
  • query_type: use case query type.
  • rerun_flag: flag indicating if it's a rerun or a normal run.
  • snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
def get_cadence_start_end_dates( self, cadence: str, derived_cadence: str, start_date: datetime.datetime, end_date: datetime.datetime, query_type: str, current_date: datetime.datetime) -> tuple[datetime.datetime, datetime.datetime]:
122    def get_cadence_start_end_dates(
123        self,
124        cadence: str,
125        derived_cadence: str,
126        start_date: datetime,
127        end_date: datetime,
128        query_type: str,
129        current_date: datetime,
130    ) -> tuple[datetime, datetime]:
131        """Generate the new set of extended start and end dates based on the cadence.
132
133        Running week cadence again to extend to correct week start and end date in case
134            of recon window for Week cadence is present.
135        For end_date 2012-12-31,in case of Quarter Recon window present for Week
136            cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31.
137        But these are not start and end dates of week. Hence, to correct this, new dates
138            are passed again to get the correct dates.
139
140        Args:
141            cadence: cadence to process.
142            derived_cadence: cadence reconciliation to process.
143            start_date: start date of the period to process.
144            end_date: end date of the period to process.
145            query_type: use case query type.
146            current_date: current date to be used in the end date, in case the end date
147                is greater than current date so the end date should be the current date.
148        """
149        new_start_date = self._get_cadence_calculated_date(
150            derived_cadence=derived_cadence, base_date=start_date, is_start=True
151        )
152        new_end_date = self._get_cadence_calculated_date(
153            derived_cadence=derived_cadence, base_date=end_date, is_start=False
154        )
155
156        if cadence.upper() == "WEEK":
157            new_start_date = (
158                pendulum.datetime(
159                    int(new_start_date.strftime("%Y")),
160                    int(new_start_date.strftime("%m")),
161                    int(new_start_date.strftime("%d")),
162                )
163                .start_of("week")
164                .replace(tzinfo=None)
165            )
166            new_end_date = (
167                pendulum.datetime(
168                    int(new_end_date.strftime("%Y")),
169                    int(new_end_date.strftime("%m")),
170                    int(new_end_date.strftime("%d")),
171                )
172                .end_of("week")
173                .replace(hour=0, minute=0, second=0, microsecond=0)
174                .replace(tzinfo=None)
175            )
176
177        new_end_date = new_end_date + timedelta(days=1)
178
179        if new_end_date >= current_date:
180            new_end_date = current_date
181
182        if query_type == "NAM":
183            new_end_date = new_end_date + timedelta(days=1)
184
185        return new_start_date, new_end_date

Generate the new set of extended start and end dates based on the cadence.

Running week cadence again to extend to correct week start and end date in case of recon window for Week cadence is present. For end_date 2012-12-31,in case of Quarter Recon window present for Week cadence, start and end dates are recalculated to 2022-10-01 to 2022-12-31. But these are not start and end dates of week. Hence, to correct this, new dates are passed again to get the correct dates.

Arguments:
  • cadence: cadence to process.
  • derived_cadence: cadence reconciliation to process.
  • start_date: start date of the period to process.
  • end_date: end date of the period to process.
  • query_type: use case query type.
  • current_date: current date to be used in the end date, in case the end date is greater than current date so the end date should be the current date.
class GABViewManager:
306class GABViewManager(object):
307    """Class to control the GAB View creation."""
308
309    _LOGGER = LoggingHandler(__name__).get_logger()
310
311    def __init__(
312        self,
313        query_id: str,
314        lookup_query_builder: DataFrame,
315        target_database: str,
316        target_table: str,
317    ):
318        """Construct GABViewManager instances.
319
320        Args:
321            query_id: gab configuration table use case identifier.
322            lookup_query_builder: gab configuration data.
323            target_database: target database to write.
324            target_table: target table to write.
325        """
326        self.query_id = query_id
327        self.lookup_query_builder = lookup_query_builder
328        self.target_database = target_database
329        self.target_table = target_table
330
331    def generate_use_case_views(self) -> None:
332        """Generate all the use case views.
333
334        Generates the DDLs for each of the views. This DDL is dynamically built based on
335        the mappings provided in the config table.
336        """
337        reconciliation_window = GABUtils.get_json_column_as_dict(
338            self.lookup_query_builder, self.query_id, "recon_window"
339        )
340
341        cadence_snapshot_status = self._get_cadence_snapshot_status(
342            reconciliation_window
343        )
344
345        (
346            cadences_with_snapshot,
347            cadences_without_snapshot,
348        ) = self._split_cadence_by_snapshot(cadence_snapshot_status)
349
350        mappings = GABUtils.get_json_column_as_dict(
351            self.lookup_query_builder, self.query_id, "mappings"
352        )
353
354        for view_name in mappings.keys():
355            self._generate_use_case_view(
356                mappings,
357                view_name,
358                cadence_snapshot_status,
359                cadences_with_snapshot,
360                cadences_without_snapshot,
361                self.target_database,
362                self.target_table,
363                self.query_id,
364            )
365
366    @classmethod
367    def _generate_use_case_view(
368        cls,
369        mappings: dict,
370        view_name: str,
371        cadence_snapshot_status: dict,
372        cadences_with_snapshot: list[str],
373        cadences_without_snapshot: list[str],
374        target_database: str,
375        target_table: str,
376        query_id: str,
377    ) -> None:
378        """Generate the selected use case views.
379
380        Args:
381            mappings: use case mappings configuration.
382            view_name: name of the view to be generated.
383            cadence_snapshot_status: cadences to execute with the information if it has
384                snapshot.
385            cadences_with_snapshot: cadences to execute with snapshot.
386            cadences_without_snapshot: cadences to execute without snapshot.
387            target_database: target database to write.
388            target_table: target table to write.
389            query_id: gab configuration table use case identifier.
390        """
391        view_configuration = mappings[view_name]
392
393        view_dimensions = view_configuration["dimensions"]
394        view_metrics = view_configuration["metric"]
395        custom_filter = view_configuration["filter"]
396
397        view_filter = " "
398        if custom_filter:
399            view_filter = " AND " + custom_filter
400
401        (
402            dimensions,
403            dimensions_and_metrics,
404            dimensions_and_metrics_with_alias,
405        ) = cls._get_dimensions_and_metrics_from_use_case_view(
406            view_dimensions, view_metrics
407        )
408
409        (
410            final_cols,
411            final_calculated_script,
412            final_calculated_script_snapshot,
413        ) = cls._get_calculated_and_derived_metrics_from_use_case_view(
414            view_metrics, view_dimensions, cadence_snapshot_status
415        )
416
417        GABViewGenerator(
418            cadence_snapshot_status=cadence_snapshot_status,
419            target_database=target_database,
420            view_name=view_name,
421            final_cols=final_cols,
422            target_table=target_table,
423            dimensions_and_metrics_with_alias=dimensions_and_metrics_with_alias,
424            dimensions=dimensions,
425            dimensions_and_metrics=dimensions_and_metrics,
426            final_calculated_script=final_calculated_script,
427            query_id=query_id,
428            view_filter=view_filter,
429            final_calculated_script_snapshot=final_calculated_script_snapshot,
430            without_snapshot_cadences=cadences_without_snapshot,
431            with_snapshot_cadences=cadences_with_snapshot,
432        ).generate_sql()
433
434    @classmethod
435    def _get_dimensions_and_metrics_from_use_case_view(
436        cls, view_dimensions: dict, view_metrics: dict
437    ) -> Tuple[str, str, str]:
438        """Get dimensions and metrics from use case.
439
440        Args:
441            view_dimensions: use case configured dimensions.
442            view_metrics: use case configured metrics.
443        """
444        (
445            extracted_dimensions_with_alias,
446            extracted_dimensions_without_alias,
447        ) = GABUtils.extract_columns_from_mapping(
448            columns=view_dimensions,
449            is_dimension=True,
450            extract_column_without_alias=True,
451            table_alias="a",
452            is_extracted_value_as_name=False,
453        )
454
455        dimensions_without_default_columns = [
456            extracted_dimension
457            for extracted_dimension in extracted_dimensions_without_alias
458            if extracted_dimension not in GABDefaults.DIMENSIONS_DEFAULT_COLUMNS.value
459        ]
460
461        dimensions = ",".join(dimensions_without_default_columns)
462        dimensions_with_alias = ",".join(extracted_dimensions_with_alias)
463
464        (
465            extracted_metrics_with_alias,
466            extracted_metrics_without_alias,
467        ) = GABUtils.extract_columns_from_mapping(
468            columns=view_metrics,
469            is_dimension=False,
470            extract_column_without_alias=True,
471            table_alias="a",
472            is_extracted_value_as_name=False,
473        )
474        metrics = ",".join(extracted_metrics_without_alias)
475        metrics_with_alias = ",".join(extracted_metrics_with_alias)
476
477        dimensions_and_metrics_with_alias = (
478            dimensions_with_alias + "," + metrics_with_alias
479        )
480        dimensions_and_metrics = dimensions + "," + metrics
481
482        return dimensions, dimensions_and_metrics, dimensions_and_metrics_with_alias
483
484    @classmethod
485    def _get_calculated_and_derived_metrics_from_use_case_view(
486        cls, view_metrics: dict, view_dimensions: dict, cadence_snapshot_status: dict
487    ) -> Tuple[str, str, str]:
488        """Get calculated and derived metrics from use case.
489
490        Args:
491            view_dimensions: use case configured dimensions.
492            view_metrics: use case configured metrics.
493            cadence_snapshot_status: cadences to execute with the information if it has
494                snapshot.
495        """
496        calculated_script = []
497        calculated_script_snapshot = []
498        derived_script = []
499        for metric_key, metric_value in view_metrics.items():
500            (
501                calculated_metrics_script,
502                calculated_metrics_script_snapshot,
503                derived_metrics_script,
504            ) = cls._get_calculated_metrics(
505                metric_key, metric_value, view_dimensions, cadence_snapshot_status
506            )
507            calculated_script += [*calculated_metrics_script]
508            calculated_script_snapshot += [*calculated_metrics_script_snapshot]
509            derived_script += [*derived_metrics_script]
510
511        joined_calculated_script = cls._join_list_to_string_when_present(
512            calculated_script
513        )
514        joined_calculated_script_snapshot = cls._join_list_to_string_when_present(
515            calculated_script_snapshot
516        )
517
518        joined_derived = cls._join_list_to_string_when_present(
519            to_join=derived_script, starting_value="*,", default_value="*"
520        )
521
522        return (
523            joined_derived,
524            joined_calculated_script,
525            joined_calculated_script_snapshot,
526        )
527
528    @classmethod
529    def _join_list_to_string_when_present(
530        cls,
531        to_join: list[str],
532        separator: str = ",",
533        starting_value: str = ",",
534        default_value: str = "",
535    ) -> str:
536        """Join list to string when has values, otherwise return the default value.
537
538        Args:
539            to_join: values to join.
540            separator: separator to be used in the join.
541            starting_value: value to be started before the join.
542            default_value: value to be returned if the list is empty.
543        """
544        return starting_value + separator.join(to_join) if to_join else default_value
545
546    @classmethod
547    def _get_cadence_snapshot_status(cls, result: dict) -> dict:
548        cadence_snapshot_status = {}
549        for k, v in result.items():
550            cadence_snapshot_status[k] = next(
551                (
552                    next(
553                        (
554                            snap_list["snapshot"]
555                            for snap_list in loop_outer_cad.values()
556                            if snap_list["snapshot"] == "Y"
557                        ),
558                        "N",
559                    )
560                    for loop_outer_cad in v.values()
561                    if v
562                ),
563                "N",
564            )
565
566        return cadence_snapshot_status
567
568    @classmethod
569    def _split_cadence_by_snapshot(
570        cls, cadence_snapshot_status: dict
571    ) -> tuple[list[str], list[str]]:
572        """Split cadences by the snapshot value.
573
574        Args:
575            cadence_snapshot_status: cadences to be split by snapshot status.
576        """
577        with_snapshot_cadences = []
578        without_snapshot_cadences = []
579
580        for key_snap_status, value_snap_status in cadence_snapshot_status.items():
581            if value_snap_status == "Y":
582                with_snapshot_cadences.append(key_snap_status)
583            else:
584                without_snapshot_cadences.append(key_snap_status)
585
586        return with_snapshot_cadences, without_snapshot_cadences
587
588    @classmethod
589    def _get_calculated_metrics(
590        cls,
591        metric_key: str,
592        metric_value: dict,
593        view_dimensions: dict,
594        cadence_snapshot_status: dict,
595    ) -> tuple[list[str], list[str], list[str]]:
596        """Get calculated metrics from use case.
597
598        Args:
599            metric_key: use case metric name.
600            metric_value: use case metric value.
601            view_dimensions: use case configured dimensions.
602            cadence_snapshot_status: cadences to execute with the information if it has
603                snapshot.
604        """
605        dim_partition = ",".join([str(i) for i in view_dimensions.keys()][2:])
606        dim_partition = "cadence," + dim_partition
607        calculated_metrics = metric_value["calculated_metric"]
608        derived_metrics = metric_value["derived_metric"]
609        calculated_metrics_script: list[str] = []
610        calculated_metrics_script_snapshot: list[str] = []
611        derived_metrics_script: list[str] = []
612
613        if calculated_metrics:
614            (
615                calculated_metrics_script,
616                calculated_metrics_script_snapshot,
617            ) = cls._get_calculated_metric(
618                metric_key, calculated_metrics, dim_partition, cadence_snapshot_status
619            )
620
621        if derived_metrics:
622            derived_metrics_script = cls._get_derived_metrics(derived_metrics)
623
624        return (
625            calculated_metrics_script,
626            calculated_metrics_script_snapshot,
627            derived_metrics_script,
628        )
629
630    @classmethod
631    def _get_derived_metrics(cls, derived_metric: dict) -> list[str]:
632        """Get derived metrics from use case.
633
634        Args:
635            derived_metric: use case derived metrics.
636        """
637        derived_metric_script = []
638
639        for i in range(0, len(derived_metric)):
640            derived_formula = str(derived_metric[i]["formula"])
641            derived_label = derived_metric[i]["label"]
642            derived_metric_script.append(derived_formula + " AS " + derived_label)
643
644        return derived_metric_script
645
646    @classmethod
647    def _get_calculated_metric(
648        cls,
649        metric_key: str,
650        calculated_metric: dict,
651        dimension_partition: str,
652        cadence_snapshot_status: dict,
653    ) -> tuple[list[str], list[str]]:
654        """Get calculated metrics from use case.
655
656        Args:
657            metric_key: use case metric name.
658            calculated_metric: use case calculated metrics.
659            dimension_partition: dimension partition.
660            cadence_snapshot_status: cadences to execute with the information if it has
661                snapshot.
662        """
663        last_cadence_script: list[str] = []
664        last_year_cadence_script: list[str] = []
665        window_script: list[str] = []
666        last_cadence_script_snapshot: list[str] = []
667        last_year_cadence_script_snapshot: list[str] = []
668        window_script_snapshot: list[str] = []
669
670        if "last_cadence" in calculated_metric:
671            (
672                last_cadence_script,
673                last_cadence_script_snapshot,
674            ) = cls._get_cadence_calculated_metric(
675                metric_key,
676                dimension_partition,
677                calculated_metric,
678                cadence_snapshot_status,
679                "last_cadence",
680            )
681        if "last_year_cadence" in calculated_metric:
682            (
683                last_year_cadence_script,
684                last_year_cadence_script_snapshot,
685            ) = cls._get_cadence_calculated_metric(
686                metric_key,
687                dimension_partition,
688                calculated_metric,
689                cadence_snapshot_status,
690                "last_year_cadence",
691            )
692        if "window_function" in calculated_metric:
693            window_script, window_script_snapshot = cls._get_window_calculated_metric(
694                metric_key,
695                dimension_partition,
696                calculated_metric,
697                cadence_snapshot_status,
698            )
699
700        calculated_script = [
701            *last_cadence_script,
702            *last_year_cadence_script,
703            *window_script,
704        ]
705        calculated_script_snapshot = [
706            *last_cadence_script_snapshot,
707            *last_year_cadence_script_snapshot,
708            *window_script_snapshot,
709        ]
710
711        return calculated_script, calculated_script_snapshot
712
713    @classmethod
714    def _get_window_calculated_metric(
715        cls,
716        metric_key: str,
717        dimension_partition: str,
718        calculated_metric: dict,
719        cadence_snapshot_status: dict,
720    ) -> tuple[list, list]:
721        """Get window calculated metrics from use case.
722
723        Args:
724            metric_key: use case metric name.
725            dimension_partition: dimension partition.
726            calculated_metric: use case calculated metrics.
727            cadence_snapshot_status: cadences to execute with the information if it has
728                snapshot.
729        """
730        calculated_script = []
731        calculated_script_snapshot = []
732
733        for i in range(0, len(calculated_metric["window_function"])):
734            window_function = calculated_metric["window_function"][i]["agg_func"]
735            window_function_start = calculated_metric["window_function"][i]["window"][0]
736            window_function_end = calculated_metric["window_function"][i]["window"][1]
737            window_label = calculated_metric["window_function"][i]["label"]
738
739            calculated_script.append(
740                f"""
741                NVL(
742                    {window_function}({metric_key}) OVER
743                    (
744                        PARTITION BY {dimension_partition}
745                        order by from_date ROWS BETWEEN
746                            {str(window_function_start)} PRECEDING
747                            AND {str(window_function_end)} PRECEDING
748                    ),
749                    0
750                ) AS
751                {window_label}
752                """
753            )
754
755            if "Y" in cadence_snapshot_status.values():
756                calculated_script_snapshot.append(
757                    f"""
758                    NVL(
759                        {window_function}({metric_key}) OVER
760                        (
761                            PARTITION BY {dimension_partition} ,rn
762                            order by from_date ROWS BETWEEN
763                                {str(window_function_start)} PRECEDING
764                                AND {str(window_function_end)} PRECEDING
765                        ),
766                        0
767                    ) AS
768                    {window_label}
769                    """
770                )
771
772        return calculated_script, calculated_script_snapshot
773
774    @classmethod
775    def _get_cadence_calculated_metric(
776        cls,
777        metric_key: str,
778        dimension_partition: str,
779        calculated_metric: dict,
780        cadence_snapshot_status: dict,
781        cadence: str,
782    ) -> tuple[list, list]:
783        """Get cadence calculated metrics from use case.
784
785        Args:
786            metric_key: use case metric name.
787            calculated_metric: use case calculated metrics.
788            dimension_partition: dimension partition.
789            cadence_snapshot_status: cadences to execute with the information if it has
790                snapshot.
791            cadence: cadence to process.
792        """
793        calculated_script = []
794        calculated_script_snapshot = []
795
796        for i in range(0, len(calculated_metric[cadence])):
797            cadence_lag = cls._get_cadence_item_lag(calculated_metric, cadence, i)
798            cadence_label = calculated_metric[cadence][i]["label"]
799
800            calculated_script.append(
801                cls._get_cadence_lag_statement(
802                    metric_key,
803                    cadence_lag,
804                    dimension_partition,
805                    cadence_label,
806                    snapshot=False,
807                    cadence=cadence,
808                )
809            )
810
811            if "Y" in cadence_snapshot_status.values():
812                calculated_script_snapshot.append(
813                    cls._get_cadence_lag_statement(
814                        metric_key,
815                        cadence_lag,
816                        dimension_partition,
817                        cadence_label,
818                        snapshot=True,
819                        cadence=cadence,
820                    )
821                )
822
823        return calculated_script, calculated_script_snapshot
824
825    @classmethod
826    def _get_cadence_item_lag(
827        cls, calculated_metric: dict, cadence: str, item: int
828    ) -> str:
829        """Get calculated metric item lag.
830
831        Args:
832            calculated_metric: use case calculated metrics.
833            cadence: cadence to process.
834            item: metric item.
835        """
836        return str(calculated_metric[cadence][item]["window"])
837
838    @classmethod
839    def _get_cadence_lag_statement(
840        cls,
841        metric_key: str,
842        cadence_lag: str,
843        dimension_partition: str,
844        cadence_label: str,
845        snapshot: bool,
846        cadence: str,
847    ) -> str:
848        """Get cadence lag statement.
849
850        Args:
851            metric_key: use case metric name.
852            cadence_lag: cadence window lag.
853            dimension_partition: dimension partition.
854            cadence_label: cadence name.
855            snapshot: indicate if the snapshot is enabled.
856            cadence: cadence to process.
857        """
858        cadence_lag_statement = ""
859        if cadence == "last_cadence":
860            cadence_lag_statement = (
861                "NVL(LAG("
862                + metric_key
863                + ","
864                + cadence_lag
865                + ") OVER(PARTITION BY "
866                + dimension_partition
867                + (",rn" if snapshot else "")
868                + " order by from_date),0) AS "
869                + cadence_label
870            )
871        elif cadence == "last_year_cadence":
872            cadence_lag_statement = (
873                "NVL(LAG("
874                + metric_key
875                + ","
876                + cadence_lag
877                + ") OVER(PARTITION BY "
878                + dimension_partition
879                + (",rn" if snapshot else "")
880                + """,
881                    case
882                        when cadence in ('DAY','MONTH','QUARTER')
883                            then struct(month(from_date), day(from_date))
884                        when cadence in('WEEK')
885                            then struct(weekofyear(from_date+1),1)
886                    end order by from_date),0) AS """
887                + cadence_label
888            )
889        else:
890            cls._LOGGER.error(f"Cadence {cadence} not implemented yet")
891
892        return cadence_lag_statement

Class to control the GAB View creation.

GABViewManager( query_id: str, lookup_query_builder: pyspark.sql.dataframe.DataFrame, target_database: str, target_table: str)
311    def __init__(
312        self,
313        query_id: str,
314        lookup_query_builder: DataFrame,
315        target_database: str,
316        target_table: str,
317    ):
318        """Construct GABViewManager instances.
319
320        Args:
321            query_id: gab configuration table use case identifier.
322            lookup_query_builder: gab configuration data.
323            target_database: target database to write.
324            target_table: target table to write.
325        """
326        self.query_id = query_id
327        self.lookup_query_builder = lookup_query_builder
328        self.target_database = target_database
329        self.target_table = target_table

Construct GABViewManager instances.

Arguments:
  • query_id: gab configuration table use case identifier.
  • lookup_query_builder: gab configuration data.
  • target_database: target database to write.
  • target_table: target table to write.
query_id
lookup_query_builder
target_database
target_table
def generate_use_case_views(self) -> None:
331    def generate_use_case_views(self) -> None:
332        """Generate all the use case views.
333
334        Generates the DDLs for each of the views. This DDL is dynamically built based on
335        the mappings provided in the config table.
336        """
337        reconciliation_window = GABUtils.get_json_column_as_dict(
338            self.lookup_query_builder, self.query_id, "recon_window"
339        )
340
341        cadence_snapshot_status = self._get_cadence_snapshot_status(
342            reconciliation_window
343        )
344
345        (
346            cadences_with_snapshot,
347            cadences_without_snapshot,
348        ) = self._split_cadence_by_snapshot(cadence_snapshot_status)
349
350        mappings = GABUtils.get_json_column_as_dict(
351            self.lookup_query_builder, self.query_id, "mappings"
352        )
353
354        for view_name in mappings.keys():
355            self._generate_use_case_view(
356                mappings,
357                view_name,
358                cadence_snapshot_status,
359                cadences_with_snapshot,
360                cadences_without_snapshot,
361                self.target_database,
362                self.target_table,
363                self.query_id,
364            )

Generate all the use case views.

Generates the DDLs for each of the views. This DDL is dynamically built based on the mappings provided in the config table.