lakehouse_engine.algorithms.gab

Module to define Gold Asset Builder algorithm behavior.

  1"""Module to define Gold Asset Builder algorithm behavior."""
  2
  3import copy
  4from datetime import datetime, timedelta
  5from typing import Union
  6
  7import pendulum
  8from jinja2 import Template
  9from pyspark import Row
 10from pyspark.sql import DataFrame
 11from pyspark.sql.functions import lit
 12
 13from lakehouse_engine.algorithms.algorithm import Algorithm
 14from lakehouse_engine.core.definitions import (
 15    GABCadence,
 16    GABCombinedConfiguration,
 17    GABDefaults,
 18    GABKeys,
 19    GABReplaceableKeys,
 20    GABSpec,
 21    GABStartOfWeek,
 22)
 23from lakehouse_engine.core.exec_env import ExecEnv
 24from lakehouse_engine.core.gab_manager import GABCadenceManager, GABViewManager
 25from lakehouse_engine.core.gab_sql_generator import (
 26    GABDeleteGenerator,
 27    GABInsertGenerator,
 28)
 29from lakehouse_engine.utils.gab_utils import GABPartitionUtils, GABUtils
 30from lakehouse_engine.utils.logging_handler import LoggingHandler
 31
 32
 33class GAB(Algorithm):
 34    """Class representing the gold asset builder."""
 35
 36    _LOGGER = LoggingHandler(__name__).get_logger()
 37    _SPARK_DEFAULT_PARALLELISM_CONFIG = (
 38        "spark.sql.sources.parallelPartitionDiscovery.parallelism"
 39    )
 40    _SPARK_DEFAULT_PARALLELISM_VALUE = "10000"
 41
 42    def __init__(self, acon: dict):
 43        """Construct GAB instances.
 44
 45        Args:
 46            acon: algorithm configuration.
 47        """
 48        self.spec: GABSpec = GABSpec.create_from_acon(acon=acon)
 49
 50    def execute(self) -> None:
 51        """Execute the Gold Asset Builder."""
 52        self._LOGGER.info(f"Reading {self.spec.lookup_table} as lkp_query_builder")
 53        lookup_query_builder_df = ExecEnv.SESSION.read.table(self.spec.lookup_table)
 54        ExecEnv.SESSION.read.table(self.spec.calendar_table).createOrReplaceTempView(
 55            "df_cal"
 56        )
 57        self._LOGGER.info(f"Generating calendar from {self.spec.calendar_table}")
 58
 59        query_label = self.spec.query_label_filter
 60        queue = self.spec.queue_filter
 61        cadence = self.spec.cadence_filter
 62
 63        self._LOGGER.info(f"Query Label Filter {query_label}")
 64        self._LOGGER.info(f"Queue Filter {queue}")
 65        self._LOGGER.info(f"Cadence Filter {cadence}")
 66
 67        gab_path = self.spec.gab_base_path
 68        self._LOGGER.info(f"Gab Base Path {gab_path}")
 69
 70        lookup_query_builder_df = lookup_query_builder_df.filter(
 71            (
 72                (lookup_query_builder_df.query_label.isin(query_label))
 73                & (lookup_query_builder_df.queue.isin(queue))
 74                & (lookup_query_builder_df.is_active != lit("N"))
 75            )
 76        )
 77
 78        lookup_query_builder_df.cache()
 79
 80        for use_case in lookup_query_builder_df.collect():
 81            self._process_use_case(
 82                use_case=use_case,
 83                lookup_query_builder=lookup_query_builder_df,
 84                selected_cadences=cadence,
 85                gab_path=gab_path,
 86            )
 87
 88        lookup_query_builder_df.unpersist()
 89
 90    def _process_use_case(
 91        self,
 92        use_case: Row,
 93        lookup_query_builder: DataFrame,
 94        selected_cadences: list[str],
 95        gab_path: str,
 96    ) -> None:
 97        """Process each gab use case.
 98
 99        Args:
100            use_case: gab use case to process.
101            lookup_query_builder: gab configuration data.
102            selected_cadences: selected cadences to process.
103            gab_path: gab base path used to get the use case stages sql files.
104        """
105        self._LOGGER.info(f"Executing use case: {use_case['query_label']}")
106
107        reconciliation = GABUtils.get_json_column_as_dict(
108            lookup_query_builder=lookup_query_builder,
109            query_id=use_case["query_id"],
110            query_column="recon_window",
111        )
112        self._LOGGER.info(f"reconcilation window - {reconciliation}")
113        configured_cadences = list(reconciliation.keys())
114
115        stages = GABUtils.get_json_column_as_dict(
116            lookup_query_builder=lookup_query_builder,
117            query_id=use_case["query_id"],
118            query_column="intermediate_stages",
119        )
120        self._LOGGER.info(f"intermediate stages - {stages}")
121
122        self._LOGGER.info(f"selected_cadences: {selected_cadences}")
123        self._LOGGER.info(f"configured_cadences: {configured_cadences}")
124        cadences = self._get_filtered_cadences(selected_cadences, configured_cadences)
125        self._LOGGER.info(f"filtered cadences - {cadences}")
126
127        latest_run_date, latest_config_date = self._get_latest_usecase_data(
128            use_case["query_id"]
129        )
130        self._LOGGER.info(f"latest_config_date: {latest_config_date}")
131        self._LOGGER.info(f"latest_run_date: - {latest_run_date}")
132        self._set_use_case_stage_template_file(stages, gab_path, use_case)
133        processed_cadences = []
134
135        for cadence in cadences:
136            is_cadence_processed = self._process_use_case_query_cadence(
137                cadence,
138                reconciliation,
139                use_case,
140                stages,
141                lookup_query_builder,
142            )
143            if is_cadence_processed:
144                processed_cadences.append(is_cadence_processed)
145
146        if processed_cadences:
147            self._generate_ddl(
148                latest_config_date=latest_config_date,
149                latest_run_date=latest_run_date,
150                query_id=use_case["query_id"],
151                lookup_query_builder=lookup_query_builder,
152            )
153        else:
154            self._LOGGER.info(
155                f"Skipping use case {use_case['query_label']}. No cadence processed "
156                "for the use case."
157            )
158
159    @classmethod
160    def _set_use_case_stage_template_file(
161        cls, stages: dict, gab_path: str, use_case: Row
162    ) -> None:
163        """Set templated file for each stage.
164
165        Args:
166            stages: use case stages with their configuration.
167            gab_path: gab base path used to get the use case stages SQL files.
168            use_case: gab use case to process.
169        """
170        cls._LOGGER.info("Reading templated file for each stage...")
171
172        for i in range(1, len(stages) + 1):
173            stage = stages[str(i)]
174            stage_file_path = stage["file_path"]
175            full_path = gab_path + stage_file_path
176            cls._LOGGER.info(f"Stage file path is: {full_path}")
177            file_read = open(full_path, "r").read()
178            templated_file = file_read.replace(
179                "replace_offset_value", str(use_case["timezone_offset"])
180            )
181            stage["templated_file"] = templated_file
182            stage["full_file_path"] = full_path
183
184    def _process_use_case_query_cadence(
185        self,
186        cadence: str,
187        reconciliation: dict,
188        use_case: Row,
189        stages: dict,
190        lookup_query_builder: DataFrame,
191    ) -> bool:
192        """Identify use case reconciliation window and cadence.
193
194        Args:
195            cadence:  cadence to process.
196            reconciliation: configured use case reconciliation window.
197            use_case: gab use case to process.
198            stages: use case stages with their configuration.
199            lookup_query_builder: gab configuration data.
200        """
201        selected_reconciliation_window = {}
202        selected_cadence = reconciliation.get(cadence)
203        self._LOGGER.info(f"Processing cadence: {cadence}")
204        self._LOGGER.info(f"Reconciliation Window - {selected_cadence}")
205
206        if selected_cadence:
207            selected_reconciliation_window = selected_cadence.get("recon_window")
208
209        self._LOGGER.info(f"{cadence}: {self.spec.start_date} - {self.spec.end_date}")
210
211        start_of_week = use_case["start_of_the_week"]
212
213        self._set_week_configuration_by_uc_start_of_week(start_of_week)
214
215        cadence_configuration_at_end_date = (
216            GABUtils.get_cadence_configuration_at_end_date(self.spec.end_date)
217        )
218
219        reconciliation_cadences = GABUtils().get_reconciliation_cadences(
220            cadence=cadence,
221            selected_reconciliation_window=selected_reconciliation_window,
222            cadence_configuration_at_end_date=cadence_configuration_at_end_date,
223            rerun_flag=self.spec.rerun_flag,
224        )
225
226        start_date_str = GABUtils.format_datetime_to_default(self.spec.start_date)
227        end_date_str = GABUtils.format_datetime_to_default(self.spec.end_date)
228
229        for reconciliation_cadence, snapshot_flag in reconciliation_cadences.items():
230            self._process_reconciliation_cadence(
231                reconciliation_cadence=reconciliation_cadence,
232                snapshot_flag=snapshot_flag,
233                cadence=cadence,
234                start_date_str=start_date_str,
235                end_date_str=end_date_str,
236                use_case=use_case,
237                lookup_query_builder=lookup_query_builder,
238                stages=stages,
239            )
240
241        return (cadence in reconciliation.keys()) or (
242            reconciliation_cadences is not None
243        )
244
245    def _process_reconciliation_cadence(
246        self,
247        reconciliation_cadence: str,
248        snapshot_flag: str,
249        cadence: str,
250        start_date_str: str,
251        end_date_str: str,
252        use_case: Row,
253        lookup_query_builder: DataFrame,
254        stages: dict,
255    ) -> None:
256        """Process use case reconciliation window.
257
258        Reconcile the pre-aggregated data to cover the late events.
259
260        Args:
261            reconciliation_cadence: reconciliation to process.
262            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
263            cadence: cadence to process.
264            start_date_str: start date of the period to process.
265            end_date_str: end date of the period to process.
266            use_case: gab use case to process.
267            lookup_query_builder: gab configuration data.
268            stages: use case stages with their configuration.
269
270        Example:
271            Cadence: week;
272            Reconciliation: monthly;
273            This means every weekend previous week aggregations will be calculated and
274                on month end we will reconcile the numbers calculated for last 4 weeks
275                to readjust the number for late events.
276        """
277        (
278            window_start_date,
279            window_end_date,
280            filter_start_date,
281            filter_end_date,
282        ) = GABCadenceManager().extended_window_calculator(
283            cadence,
284            reconciliation_cadence,
285            self.spec.current_date,
286            start_date_str,
287            end_date_str,
288            use_case["query_type"],
289            self.spec.rerun_flag,
290            snapshot_flag,
291        )
292
293        if use_case["timezone_offset"]:
294            filter_start_date = filter_start_date + timedelta(
295                hours=use_case["timezone_offset"]
296            )
297            filter_end_date = filter_end_date + timedelta(
298                hours=use_case["timezone_offset"]
299            )
300
301        filter_start_date_str = GABUtils.format_datetime_to_default(filter_start_date)
302        filter_end_date_str = GABUtils.format_datetime_to_default(filter_end_date)
303
304        partition_end = GABUtils.format_datetime_to_default(
305            (window_end_date - timedelta(days=1))
306        )
307
308        window_start_date_str = GABUtils.format_datetime_to_default(window_start_date)
309        window_end_date_str = GABUtils.format_datetime_to_default(window_end_date)
310
311        partition_filter = GABPartitionUtils.get_partition_condition(
312            filter_start_date_str, partition_end
313        )
314
315        self._LOGGER.info(
316            "extended window for start and end dates are: "
317            f"{filter_start_date_str} - {filter_end_date_str}"
318        )
319
320        unpersist_list = []
321
322        for i in range(1, len(stages) + 1):
323            stage = stages[str(i)]
324            templated_file = stage["templated_file"]
325            stage_file_path = stage["full_file_path"]
326
327            templated = self._process_use_case_query_step(
328                stage=stages[str(i)],
329                templated_file=templated_file,
330                use_case=use_case,
331                reconciliation_cadence=reconciliation_cadence,
332                cadence=cadence,
333                snapshot_flag=snapshot_flag,
334                window_start_date=window_start_date_str,
335                partition_end=partition_end,
336                filter_start_date=filter_start_date_str,
337                filter_end_date=filter_end_date_str,
338                partition_filter=partition_filter,
339            )
340
341            temp_stage_view_name = self._create_stage_view(
342                templated,
343                stages[str(i)],
344                window_start_date_str,
345                window_end_date_str,
346                use_case["query_id"],
347                use_case["query_label"],
348                cadence,
349                stage_file_path,
350            )
351            unpersist_list.append(temp_stage_view_name)
352
353        insert_success = self._generate_view_statement(
354            query_id=use_case["query_id"],
355            cadence=cadence,
356            temp_stage_view_name=temp_stage_view_name,
357            lookup_query_builder=lookup_query_builder,
358            window_start_date=window_start_date_str,
359            window_end_date=window_end_date_str,
360            query_label=use_case["query_label"],
361        )
362        self._LOGGER.info(f"Inserted data to generate the view: {insert_success}")
363
364        self._unpersist_cached_views(unpersist_list)
365
366    def _process_use_case_query_step(
367        self,
368        stage: dict,
369        templated_file: str,
370        use_case: Row,
371        reconciliation_cadence: str,
372        cadence: str,
373        snapshot_flag: str,
374        window_start_date: str,
375        partition_end: str,
376        filter_start_date: str,
377        filter_end_date: str,
378        partition_filter: str,
379    ) -> str:
380        """Process each use case step.
381
382        Process any intermediate view defined in the gab configuration table as step for
383            the use case.
384
385        Args:
386            stage: stage to process.
387            templated_file: sql file to process at this stage.
388            use_case: gab use case to process.
389            reconciliation_cadence: configured use case reconciliation window.
390            cadence: cadence to process.
391            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
392            window_start_date: start date for the configured stage.
393            partition_end: end date for the configured stage.
394            filter_start_date: filter start date to replace in the stage query.
395            filter_end_date: filter end date to replace in the stage query.
396            partition_filter: partition condition.
397        """
398        filter_col = stage["project_date_column"]
399        if stage["filter_date_column"]:
400            filter_col = stage["filter_date_column"]
401
402        # dummy value to avoid empty error if empty on the configuration
403        project_col = stage.get("project_date_column", "X")
404
405        gab_base_configuration_copy = copy.deepcopy(
406            GABCombinedConfiguration.COMBINED_CONFIGURATION.value
407        )
408
409        for item in gab_base_configuration_copy.values():
410            self._update_rendered_item_cadence(
411                reconciliation_cadence, cadence, project_col, item  # type: ignore
412            )
413
414        (
415            rendered_date,
416            rendered_to_date,
417            join_condition,
418        ) = self._get_cadence_configuration(
419            gab_base_configuration_copy,
420            cadence,
421            reconciliation_cadence,
422            snapshot_flag,
423            use_case["start_of_the_week"],
424            project_col,
425            window_start_date,
426            partition_end,
427        )
428
429        rendered_file = self._render_template_query(
430            templated=templated_file,
431            cadence=cadence,
432            start_of_the_week=use_case["start_of_the_week"],
433            query_id=use_case["query_id"],
434            rendered_date=rendered_date,
435            filter_start_date=filter_start_date,
436            filter_end_date=filter_end_date,
437            filter_col=filter_col,
438            timezone_offset=use_case["timezone_offset"],
439            join_condition=join_condition,
440            partition_filter=partition_filter,
441            rendered_to_date=rendered_to_date,
442        )
443
444        return rendered_file
445
446    @classmethod
447    def _get_filtered_cadences(
448        cls, selected_cadences: list[str], configured_cadences: list[str]
449    ) -> list[str]:
450        """Get filtered cadences.
451
452        Get the intersection of user selected cadences and use case configured cadences.
453
454        Args:
455            selected_cadences: user selected cadences.
456            configured_cadences: use case configured cadences.
457        """
458        return (
459            configured_cadences
460            if "All" in selected_cadences
461            else GABCadence.order_cadences(
462                list(set(selected_cadences).intersection(configured_cadences))
463            )
464        )
465
466    def _get_latest_usecase_data(self, query_id: str) -> tuple[datetime, datetime]:
467        """Get latest use case data.
468
469        Args:
470            query_id: use case query id.
471        """
472        return (
473            self._get_latest_run_date(query_id),
474            self._get_latest_use_case_date(query_id),
475        )
476
477    def _get_latest_run_date(self, query_id: str) -> datetime:
478        """Get latest use case run date.
479
480        Args:
481            query_id: use case query id.
482        """
483        last_success_run_sql = """
484            SELECT run_start_time
485            FROM {database}.gab_log_events
486            WHERE query_id = {query_id}
487            AND stage_name = 'Final Insert'
488            AND status = 'Success'
489            ORDER BY 1 DESC
490            LIMIT 1
491            """.format(  # nosec: B608
492            database=self.spec.target_database, query_id=query_id
493        )
494        try:
495            latest_run_date: datetime = ExecEnv.SESSION.sql(
496                last_success_run_sql
497            ).collect()[0][0]
498        except Exception:
499            latest_run_date = datetime.strptime(
500                "2020-01-01", GABDefaults.DATE_FORMAT.value
501            )
502
503        return latest_run_date
504
505    def _get_latest_use_case_date(self, query_id: str) -> datetime:
506        """Get latest use case configured date.
507
508        Args:
509            query_id: use case query id.
510        """
511        query_config_sql = """
512            SELECT lh_created_on
513            FROM {lkp_query_builder}
514            WHERE query_id = {query_id}
515        """.format(  # nosec: B608
516            lkp_query_builder=self.spec.lookup_table,
517            query_id=query_id,
518        )
519
520        latest_config_date: datetime = ExecEnv.SESSION.sql(query_config_sql).collect()[
521            0
522        ][0]
523
524        return latest_config_date
525
526    @classmethod
527    def _set_week_configuration_by_uc_start_of_week(cls, start_of_week: str) -> None:
528        """Set week configuration by use case start of week.
529
530        Args:
531            start_of_week: use case start of week (MONDAY or SUNDAY).
532        """
533        if start_of_week.upper() == "MONDAY":
534            pendulum.week_starts_at(pendulum.MONDAY)
535            pendulum.week_ends_at(pendulum.SUNDAY)
536        elif start_of_week.upper() == "SUNDAY":
537            pendulum.week_starts_at(pendulum.SUNDAY)
538            pendulum.week_ends_at(pendulum.SATURDAY)
539        else:
540            raise NotImplementedError(
541                f"The requested {start_of_week} is not implemented."
542                "Supported `start_of_week` values: [MONDAY, SUNDAY]"
543            )
544
545    @classmethod
546    def _update_rendered_item_cadence(
547        cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict
548    ) -> None:
549        """Override item properties based in the rendered item cadence.
550
551        Args:
552            reconciliation_cadence: configured use case reconciliation window.
553            cadence: cadence to process.
554            project_col: use case projection date column name.
555            item: predefined use case combination.
556        """
557        rendered_item = cls._get_rendered_item_cadence(
558            reconciliation_cadence, cadence, project_col, item
559        )
560        item["join_select"] = rendered_item["join_select"]
561        item["project_start"] = rendered_item["project_start"]
562        item["project_end"] = rendered_item["project_end"]
563
564    @classmethod
565    def _get_rendered_item_cadence(
566        cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict
567    ) -> dict:
568        """Update pre-configured gab parameters with use case data.
569
570        Args:
571            reconciliation_cadence: configured use case reconciliation window.
572            cadence: cadence to process.
573            project_col: use case projection date column name.
574            item: predefined use case combination.
575        """
576        return {
577            GABKeys.JOIN_SELECT: (
578                item[GABKeys.JOIN_SELECT]
579                .replace(GABReplaceableKeys.CONFIG_WEEK_START, "Monday")
580                .replace(
581                    GABReplaceableKeys.RECONCILIATION_CADENCE,
582                    reconciliation_cadence,
583                )
584                .replace(GABReplaceableKeys.CADENCE, cadence)
585            ),
586            GABKeys.PROJECT_START: (
587                item[GABKeys.PROJECT_START]
588                .replace(GABReplaceableKeys.CADENCE, cadence)
589                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
590            ),
591            GABKeys.PROJECT_END: (
592                item[GABKeys.PROJECT_END]
593                .replace(GABReplaceableKeys.CADENCE, cadence)
594                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
595            ),
596        }
597
598    @classmethod
599    def _get_cadence_configuration(
600        cls,
601        use_case_configuration: dict,
602        cadence: str,
603        reconciliation_cadence: str,
604        snapshot_flag: str,
605        start_of_week: str,
606        project_col: str,
607        window_start_date: str,
608        partition_end: str,
609    ) -> tuple[str, str, str]:
610        """Get use case configuration fields to replace pre-configured parameters.
611
612        Args:
613            use_case_configuration: use case configuration.
614            cadence: cadence to process.
615            reconciliation_cadence: cadence to be reconciliated.
616            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
617            start_of_week: use case start of week (MONDAY or SUNDAY).
618            project_col: use case projection date column name.
619            window_start_date: start date for the configured stage.
620            partition_end: end date for the configured stage.
621
622        Returns:
623            rendered_from_date: projection start date.
624            rendered_to_date: projection end date.
625            join_condition: string containing the join condition to replace in the
626                templated query by jinja substitution.
627        """
628        cadence_dict = next(
629            (
630                dict(configuration)
631                for configuration in use_case_configuration.values()
632                if (
633                    (cadence in configuration["cadence"])
634                    and (reconciliation_cadence in configuration["recon"])
635                    and (snapshot_flag in configuration["snap_flag"])
636                    and (
637                        GABStartOfWeek.get_start_of_week()[start_of_week.upper()]
638                        in configuration["week_start"]
639                    )
640                )
641            ),
642            None,
643        )
644        rendered_from_date = None
645        rendered_to_date = None
646        join_condition = None
647
648        if cadence_dict:
649            rendered_from_date = (
650                cadence_dict[GABKeys.PROJECT_START]
651                .replace(GABReplaceableKeys.CADENCE, cadence)
652                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
653            )
654            rendered_to_date = (
655                cadence_dict[GABKeys.PROJECT_END]
656                .replace(GABReplaceableKeys.CADENCE, cadence)
657                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
658            )
659
660            if cadence_dict[GABKeys.JOIN_SELECT]:
661                join_condition = """
662                 inner join (
663                     {join_select} from df_cal
664                     where calendar_date
665                     between '{bucket_start}' and '{bucket_end}'
666                 )
667                 df_cal on date({date_column})
668                     between df_cal.cadence_start_date and df_cal.cadence_end_date
669                 """.format(
670                    join_select=cadence_dict[GABKeys.JOIN_SELECT],
671                    bucket_start=window_start_date,
672                    bucket_end=partition_end,
673                    date_column=project_col,
674                )
675
676        return rendered_from_date, rendered_to_date, join_condition
677
678    def _render_template_query(
679        self,
680        templated: str,
681        cadence: str,
682        start_of_the_week: str,
683        query_id: str,
684        rendered_date: str,
685        filter_start_date: str,
686        filter_end_date: str,
687        filter_col: str,
688        timezone_offset: str,
689        join_condition: str,
690        partition_filter: str,
691        rendered_to_date: str,
692    ) -> str:
693        """Replace jinja templated parameters in the SQL with the actual data.
694
695        Args:
696            templated: templated sql file to process at this stage.
697            cadence: cadence to process.
698            start_of_the_week: use case start of week (MONDAY or SUNDAY).
699            query_id: gab configuration table use case identifier.
700            rendered_date: projection start date.
701            filter_start_date: filter start date to replace in the stage query.
702            filter_end_date: filter end date to replace in the stage query.
703            filter_col: use case projection date column name.
704            timezone_offset: timezone offset configured in the use case.
705            join_condition: string containing the join condition.
706            partition_filter: partition condition.
707            rendered_to_date: projection end date.
708        """
709        return Template(templated).render(
710            cadence="'{cadence}' as cadence".format(cadence=cadence),
711            cadence_run=cadence,
712            week_start=start_of_the_week,
713            query_id="'{query_id}' as query_id".format(query_id=query_id),
714            project_date_column=rendered_date,
715            target_table=self.spec.target_table,
716            database=self.spec.source_database,
717            start_date=filter_start_date,
718            end_date=filter_end_date,
719            filter_date_column=filter_col,
720            offset_value=timezone_offset,
721            joins=join_condition if join_condition else "",
722            partition_filter=partition_filter,
723            to_date=rendered_to_date,
724        )
725
726    def _create_stage_view(
727        self,
728        rendered_template: str,
729        stage: dict,
730        window_start_date: str,
731        window_end_date: str,
732        query_id: str,
733        query_label: str,
734        cadence: str,
735        stage_file_path: str,
736    ) -> str:
737        """Create each use case stage view.
738
739        Each stage has a specific order and refer to a specific SQL to be executed.
740
741        Args:
742            rendered_template: rendered stage SQL file.
743            stage: stage to process.
744            window_start_date: start date for the configured stage.
745            window_end_date: end date for the configured stage.
746            query_id: gab configuration table use case identifier.
747            query_label: gab configuration table use case name.
748            cadence: cadence to process.
749            stage_file_path: full stage file path (gab path + stage path).
750        """
751        run_start_time = datetime.now()
752        creation_status: str
753        error_message: Union[Exception, str]
754
755        try:
756            tmp = ExecEnv.SESSION.sql(rendered_template)
757            num_partitions = ExecEnv.SESSION.conf.get(
758                self._SPARK_DEFAULT_PARALLELISM_CONFIG,
759                self._SPARK_DEFAULT_PARALLELISM_VALUE,
760            )
761
762            if stage["repartition"]:
763                if stage["repartition"].get("numPartitions"):
764                    num_partitions = stage["repartition"]["numPartitions"]
765
766                if stage["repartition"].get("keys"):
767                    tmp = tmp.repartition(
768                        int(num_partitions), *stage["repartition"]["keys"]
769                    )
770                    self._LOGGER.info("Repartitioned on given Key(s)")
771                else:
772                    tmp = tmp.repartition(int(num_partitions))
773                    self._LOGGER.info("Repartitioned on given partition count")
774
775            temp_step_view_name: str = stage["table_alias"]
776            tmp.createOrReplaceTempView(temp_step_view_name)
777
778            if stage["storage_level"]:
779                ExecEnv.SESSION.sql(
780                    "CACHE TABLE {tbl} "
781                    "OPTIONS ('storageLevel' '{type}')".format(
782                        tbl=temp_step_view_name,
783                        type=stage["storage_level"],
784                    )
785                )
786                ExecEnv.SESSION.sql(
787                    "SELECT COUNT(*) FROM {tbl}".format(  # nosec: B608
788                        tbl=temp_step_view_name
789                    )
790                )
791                self._LOGGER.info(f"Cached stage view - {temp_step_view_name} ")
792
793            creation_status = "Success"
794            error_message = "NA"
795        except Exception as err:
796            creation_status = "Failed"
797            error_message = err
798            raise err
799        finally:
800            run_end_time = datetime.now()
801            GABUtils().logger(
802                run_start_time,
803                run_end_time,
804                window_start_date,
805                window_end_date,
806                query_id,
807                query_label,
808                cadence,
809                stage_file_path,
810                rendered_template,
811                creation_status,
812                error_message,
813                self.spec.target_database,
814            )
815
816        return temp_step_view_name
817
818    def _generate_view_statement(
819        self,
820        query_id: str,
821        cadence: str,
822        temp_stage_view_name: str,
823        lookup_query_builder: DataFrame,
824        window_start_date: str,
825        window_end_date: str,
826        query_label: str,
827    ) -> bool:
828        """Feed use case data to the insights table (default: unified use case table).
829
830        Args:
831            query_id: gab configuration table use case identifier.
832            cadence: cadence to process.
833            temp_stage_view_name: name of the temp view generated by the stage.
834            lookup_query_builder: gab configuration data.
835            window_start_date: start date for the configured stage.
836            window_end_date: end date for the configured stage.
837            query_label: gab configuration table use case name.
838        """
839        run_start_time = datetime.now()
840        creation_status: str
841        error_message: Union[Exception, str]
842
843        GABDeleteGenerator(
844            query_id=query_id,
845            cadence=cadence,
846            temp_stage_view_name=temp_stage_view_name,
847            lookup_query_builder=lookup_query_builder,
848            target_database=self.spec.target_database,
849            target_table=self.spec.target_table,
850        ).generate_sql()
851
852        gen_ins = GABInsertGenerator(
853            query_id=query_id,
854            cadence=cadence,
855            final_stage_table=temp_stage_view_name,
856            lookup_query_builder=lookup_query_builder,
857            target_database=self.spec.target_database,
858            target_table=self.spec.target_table,
859        ).generate_sql()
860        try:
861            ExecEnv.SESSION.sql(gen_ins)
862
863            creation_status = "Success"
864            error_message = "NA"
865            inserted = True
866        except Exception as err:
867            creation_status = "Failed"
868            error_message = err
869            raise
870        finally:
871            run_end_time = datetime.now()
872            GABUtils().logger(
873                run_start_time,
874                run_end_time,
875                window_start_date,
876                window_end_date,
877                query_id,
878                query_label,
879                cadence,
880                "Final Insert",
881                gen_ins,
882                creation_status,
883                error_message,
884                self.spec.target_database,
885            )
886
887        return inserted
888
889    @classmethod
890    def _unpersist_cached_views(cls, unpersist_list: list[str]) -> None:
891        """Unpersist cached views.
892
893        Args:
894            unpersist_list: list containing the view names to unpersist.
895        """
896        [
897            ExecEnv.SESSION.sql("UNCACHE TABLE {tbl}".format(tbl=i))
898            for i in unpersist_list
899        ]
900
901    def _generate_ddl(
902        self,
903        latest_config_date: datetime,
904        latest_run_date: datetime,
905        query_id: str,
906        lookup_query_builder: DataFrame,
907    ) -> None:
908        """Generate the actual gold asset.
909
910        It will create and return the view containing all specified dimensions, metrics
911            and computed metric for each cadence/reconciliation window.
912
913        Args:
914            latest_config_date: latest use case configuration date.
915            latest_run_date: latest use case run date.
916            query_id: gab configuration table use case identifier.
917            lookup_query_builder: gab configuration data.
918        """
919        if str(latest_config_date) > str(latest_run_date):
920            GABViewManager(
921                query_id=query_id,
922                lookup_query_builder=lookup_query_builder,
923                target_database=self.spec.target_database,
924                target_table=self.spec.target_table,
925            ).generate_use_case_views()
926        else:
927            self._LOGGER.info(
928                "View is not being re-created as there are no changes in the "
929                "configuration after the latest run"
930            )
 34class GAB(Algorithm):
 35    """Class representing the gold asset builder."""
 36
 37    _LOGGER = LoggingHandler(__name__).get_logger()
 38    _SPARK_DEFAULT_PARALLELISM_CONFIG = (
 39        "spark.sql.sources.parallelPartitionDiscovery.parallelism"
 40    )
 41    _SPARK_DEFAULT_PARALLELISM_VALUE = "10000"
 42
 43    def __init__(self, acon: dict):
 44        """Construct GAB instances.
 45
 46        Args:
 47            acon: algorithm configuration.
 48        """
 49        self.spec: GABSpec = GABSpec.create_from_acon(acon=acon)
 50
 51    def execute(self) -> None:
 52        """Execute the Gold Asset Builder."""
 53        self._LOGGER.info(f"Reading {self.spec.lookup_table} as lkp_query_builder")
 54        lookup_query_builder_df = ExecEnv.SESSION.read.table(self.spec.lookup_table)
 55        ExecEnv.SESSION.read.table(self.spec.calendar_table).createOrReplaceTempView(
 56            "df_cal"
 57        )
 58        self._LOGGER.info(f"Generating calendar from {self.spec.calendar_table}")
 59
 60        query_label = self.spec.query_label_filter
 61        queue = self.spec.queue_filter
 62        cadence = self.spec.cadence_filter
 63
 64        self._LOGGER.info(f"Query Label Filter {query_label}")
 65        self._LOGGER.info(f"Queue Filter {queue}")
 66        self._LOGGER.info(f"Cadence Filter {cadence}")
 67
 68        gab_path = self.spec.gab_base_path
 69        self._LOGGER.info(f"Gab Base Path {gab_path}")
 70
 71        lookup_query_builder_df = lookup_query_builder_df.filter(
 72            (
 73                (lookup_query_builder_df.query_label.isin(query_label))
 74                & (lookup_query_builder_df.queue.isin(queue))
 75                & (lookup_query_builder_df.is_active != lit("N"))
 76            )
 77        )
 78
 79        lookup_query_builder_df.cache()
 80
 81        for use_case in lookup_query_builder_df.collect():
 82            self._process_use_case(
 83                use_case=use_case,
 84                lookup_query_builder=lookup_query_builder_df,
 85                selected_cadences=cadence,
 86                gab_path=gab_path,
 87            )
 88
 89        lookup_query_builder_df.unpersist()
 90
 91    def _process_use_case(
 92        self,
 93        use_case: Row,
 94        lookup_query_builder: DataFrame,
 95        selected_cadences: list[str],
 96        gab_path: str,
 97    ) -> None:
 98        """Process each gab use case.
 99
100        Args:
101            use_case: gab use case to process.
102            lookup_query_builder: gab configuration data.
103            selected_cadences: selected cadences to process.
104            gab_path: gab base path used to get the use case stages sql files.
105        """
106        self._LOGGER.info(f"Executing use case: {use_case['query_label']}")
107
108        reconciliation = GABUtils.get_json_column_as_dict(
109            lookup_query_builder=lookup_query_builder,
110            query_id=use_case["query_id"],
111            query_column="recon_window",
112        )
113        self._LOGGER.info(f"reconcilation window - {reconciliation}")
114        configured_cadences = list(reconciliation.keys())
115
116        stages = GABUtils.get_json_column_as_dict(
117            lookup_query_builder=lookup_query_builder,
118            query_id=use_case["query_id"],
119            query_column="intermediate_stages",
120        )
121        self._LOGGER.info(f"intermediate stages - {stages}")
122
123        self._LOGGER.info(f"selected_cadences: {selected_cadences}")
124        self._LOGGER.info(f"configured_cadences: {configured_cadences}")
125        cadences = self._get_filtered_cadences(selected_cadences, configured_cadences)
126        self._LOGGER.info(f"filtered cadences - {cadences}")
127
128        latest_run_date, latest_config_date = self._get_latest_usecase_data(
129            use_case["query_id"]
130        )
131        self._LOGGER.info(f"latest_config_date: {latest_config_date}")
132        self._LOGGER.info(f"latest_run_date: - {latest_run_date}")
133        self._set_use_case_stage_template_file(stages, gab_path, use_case)
134        processed_cadences = []
135
136        for cadence in cadences:
137            is_cadence_processed = self._process_use_case_query_cadence(
138                cadence,
139                reconciliation,
140                use_case,
141                stages,
142                lookup_query_builder,
143            )
144            if is_cadence_processed:
145                processed_cadences.append(is_cadence_processed)
146
147        if processed_cadences:
148            self._generate_ddl(
149                latest_config_date=latest_config_date,
150                latest_run_date=latest_run_date,
151                query_id=use_case["query_id"],
152                lookup_query_builder=lookup_query_builder,
153            )
154        else:
155            self._LOGGER.info(
156                f"Skipping use case {use_case['query_label']}. No cadence processed "
157                "for the use case."
158            )
159
160    @classmethod
161    def _set_use_case_stage_template_file(
162        cls, stages: dict, gab_path: str, use_case: Row
163    ) -> None:
164        """Set templated file for each stage.
165
166        Args:
167            stages: use case stages with their configuration.
168            gab_path: gab base path used to get the use case stages SQL files.
169            use_case: gab use case to process.
170        """
171        cls._LOGGER.info("Reading templated file for each stage...")
172
173        for i in range(1, len(stages) + 1):
174            stage = stages[str(i)]
175            stage_file_path = stage["file_path"]
176            full_path = gab_path + stage_file_path
177            cls._LOGGER.info(f"Stage file path is: {full_path}")
178            file_read = open(full_path, "r").read()
179            templated_file = file_read.replace(
180                "replace_offset_value", str(use_case["timezone_offset"])
181            )
182            stage["templated_file"] = templated_file
183            stage["full_file_path"] = full_path
184
185    def _process_use_case_query_cadence(
186        self,
187        cadence: str,
188        reconciliation: dict,
189        use_case: Row,
190        stages: dict,
191        lookup_query_builder: DataFrame,
192    ) -> bool:
193        """Identify use case reconciliation window and cadence.
194
195        Args:
196            cadence:  cadence to process.
197            reconciliation: configured use case reconciliation window.
198            use_case: gab use case to process.
199            stages: use case stages with their configuration.
200            lookup_query_builder: gab configuration data.
201        """
202        selected_reconciliation_window = {}
203        selected_cadence = reconciliation.get(cadence)
204        self._LOGGER.info(f"Processing cadence: {cadence}")
205        self._LOGGER.info(f"Reconciliation Window - {selected_cadence}")
206
207        if selected_cadence:
208            selected_reconciliation_window = selected_cadence.get("recon_window")
209
210        self._LOGGER.info(f"{cadence}: {self.spec.start_date} - {self.spec.end_date}")
211
212        start_of_week = use_case["start_of_the_week"]
213
214        self._set_week_configuration_by_uc_start_of_week(start_of_week)
215
216        cadence_configuration_at_end_date = (
217            GABUtils.get_cadence_configuration_at_end_date(self.spec.end_date)
218        )
219
220        reconciliation_cadences = GABUtils().get_reconciliation_cadences(
221            cadence=cadence,
222            selected_reconciliation_window=selected_reconciliation_window,
223            cadence_configuration_at_end_date=cadence_configuration_at_end_date,
224            rerun_flag=self.spec.rerun_flag,
225        )
226
227        start_date_str = GABUtils.format_datetime_to_default(self.spec.start_date)
228        end_date_str = GABUtils.format_datetime_to_default(self.spec.end_date)
229
230        for reconciliation_cadence, snapshot_flag in reconciliation_cadences.items():
231            self._process_reconciliation_cadence(
232                reconciliation_cadence=reconciliation_cadence,
233                snapshot_flag=snapshot_flag,
234                cadence=cadence,
235                start_date_str=start_date_str,
236                end_date_str=end_date_str,
237                use_case=use_case,
238                lookup_query_builder=lookup_query_builder,
239                stages=stages,
240            )
241
242        return (cadence in reconciliation.keys()) or (
243            reconciliation_cadences is not None
244        )
245
246    def _process_reconciliation_cadence(
247        self,
248        reconciliation_cadence: str,
249        snapshot_flag: str,
250        cadence: str,
251        start_date_str: str,
252        end_date_str: str,
253        use_case: Row,
254        lookup_query_builder: DataFrame,
255        stages: dict,
256    ) -> None:
257        """Process use case reconciliation window.
258
259        Reconcile the pre-aggregated data to cover the late events.
260
261        Args:
262            reconciliation_cadence: reconciliation to process.
263            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
264            cadence: cadence to process.
265            start_date_str: start date of the period to process.
266            end_date_str: end date of the period to process.
267            use_case: gab use case to process.
268            lookup_query_builder: gab configuration data.
269            stages: use case stages with their configuration.
270
271        Example:
272            Cadence: week;
273            Reconciliation: monthly;
274            This means every weekend previous week aggregations will be calculated and
275                on month end we will reconcile the numbers calculated for last 4 weeks
276                to readjust the number for late events.
277        """
278        (
279            window_start_date,
280            window_end_date,
281            filter_start_date,
282            filter_end_date,
283        ) = GABCadenceManager().extended_window_calculator(
284            cadence,
285            reconciliation_cadence,
286            self.spec.current_date,
287            start_date_str,
288            end_date_str,
289            use_case["query_type"],
290            self.spec.rerun_flag,
291            snapshot_flag,
292        )
293
294        if use_case["timezone_offset"]:
295            filter_start_date = filter_start_date + timedelta(
296                hours=use_case["timezone_offset"]
297            )
298            filter_end_date = filter_end_date + timedelta(
299                hours=use_case["timezone_offset"]
300            )
301
302        filter_start_date_str = GABUtils.format_datetime_to_default(filter_start_date)
303        filter_end_date_str = GABUtils.format_datetime_to_default(filter_end_date)
304
305        partition_end = GABUtils.format_datetime_to_default(
306            (window_end_date - timedelta(days=1))
307        )
308
309        window_start_date_str = GABUtils.format_datetime_to_default(window_start_date)
310        window_end_date_str = GABUtils.format_datetime_to_default(window_end_date)
311
312        partition_filter = GABPartitionUtils.get_partition_condition(
313            filter_start_date_str, partition_end
314        )
315
316        self._LOGGER.info(
317            "extended window for start and end dates are: "
318            f"{filter_start_date_str} - {filter_end_date_str}"
319        )
320
321        unpersist_list = []
322
323        for i in range(1, len(stages) + 1):
324            stage = stages[str(i)]
325            templated_file = stage["templated_file"]
326            stage_file_path = stage["full_file_path"]
327
328            templated = self._process_use_case_query_step(
329                stage=stages[str(i)],
330                templated_file=templated_file,
331                use_case=use_case,
332                reconciliation_cadence=reconciliation_cadence,
333                cadence=cadence,
334                snapshot_flag=snapshot_flag,
335                window_start_date=window_start_date_str,
336                partition_end=partition_end,
337                filter_start_date=filter_start_date_str,
338                filter_end_date=filter_end_date_str,
339                partition_filter=partition_filter,
340            )
341
342            temp_stage_view_name = self._create_stage_view(
343                templated,
344                stages[str(i)],
345                window_start_date_str,
346                window_end_date_str,
347                use_case["query_id"],
348                use_case["query_label"],
349                cadence,
350                stage_file_path,
351            )
352            unpersist_list.append(temp_stage_view_name)
353
354        insert_success = self._generate_view_statement(
355            query_id=use_case["query_id"],
356            cadence=cadence,
357            temp_stage_view_name=temp_stage_view_name,
358            lookup_query_builder=lookup_query_builder,
359            window_start_date=window_start_date_str,
360            window_end_date=window_end_date_str,
361            query_label=use_case["query_label"],
362        )
363        self._LOGGER.info(f"Inserted data to generate the view: {insert_success}")
364
365        self._unpersist_cached_views(unpersist_list)
366
367    def _process_use_case_query_step(
368        self,
369        stage: dict,
370        templated_file: str,
371        use_case: Row,
372        reconciliation_cadence: str,
373        cadence: str,
374        snapshot_flag: str,
375        window_start_date: str,
376        partition_end: str,
377        filter_start_date: str,
378        filter_end_date: str,
379        partition_filter: str,
380    ) -> str:
381        """Process each use case step.
382
383        Process any intermediate view defined in the gab configuration table as step for
384            the use case.
385
386        Args:
387            stage: stage to process.
388            templated_file: sql file to process at this stage.
389            use_case: gab use case to process.
390            reconciliation_cadence: configured use case reconciliation window.
391            cadence: cadence to process.
392            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
393            window_start_date: start date for the configured stage.
394            partition_end: end date for the configured stage.
395            filter_start_date: filter start date to replace in the stage query.
396            filter_end_date: filter end date to replace in the stage query.
397            partition_filter: partition condition.
398        """
399        filter_col = stage["project_date_column"]
400        if stage["filter_date_column"]:
401            filter_col = stage["filter_date_column"]
402
403        # dummy value to avoid empty error if empty on the configuration
404        project_col = stage.get("project_date_column", "X")
405
406        gab_base_configuration_copy = copy.deepcopy(
407            GABCombinedConfiguration.COMBINED_CONFIGURATION.value
408        )
409
410        for item in gab_base_configuration_copy.values():
411            self._update_rendered_item_cadence(
412                reconciliation_cadence, cadence, project_col, item  # type: ignore
413            )
414
415        (
416            rendered_date,
417            rendered_to_date,
418            join_condition,
419        ) = self._get_cadence_configuration(
420            gab_base_configuration_copy,
421            cadence,
422            reconciliation_cadence,
423            snapshot_flag,
424            use_case["start_of_the_week"],
425            project_col,
426            window_start_date,
427            partition_end,
428        )
429
430        rendered_file = self._render_template_query(
431            templated=templated_file,
432            cadence=cadence,
433            start_of_the_week=use_case["start_of_the_week"],
434            query_id=use_case["query_id"],
435            rendered_date=rendered_date,
436            filter_start_date=filter_start_date,
437            filter_end_date=filter_end_date,
438            filter_col=filter_col,
439            timezone_offset=use_case["timezone_offset"],
440            join_condition=join_condition,
441            partition_filter=partition_filter,
442            rendered_to_date=rendered_to_date,
443        )
444
445        return rendered_file
446
447    @classmethod
448    def _get_filtered_cadences(
449        cls, selected_cadences: list[str], configured_cadences: list[str]
450    ) -> list[str]:
451        """Get filtered cadences.
452
453        Get the intersection of user selected cadences and use case configured cadences.
454
455        Args:
456            selected_cadences: user selected cadences.
457            configured_cadences: use case configured cadences.
458        """
459        return (
460            configured_cadences
461            if "All" in selected_cadences
462            else GABCadence.order_cadences(
463                list(set(selected_cadences).intersection(configured_cadences))
464            )
465        )
466
467    def _get_latest_usecase_data(self, query_id: str) -> tuple[datetime, datetime]:
468        """Get latest use case data.
469
470        Args:
471            query_id: use case query id.
472        """
473        return (
474            self._get_latest_run_date(query_id),
475            self._get_latest_use_case_date(query_id),
476        )
477
478    def _get_latest_run_date(self, query_id: str) -> datetime:
479        """Get latest use case run date.
480
481        Args:
482            query_id: use case query id.
483        """
484        last_success_run_sql = """
485            SELECT run_start_time
486            FROM {database}.gab_log_events
487            WHERE query_id = {query_id}
488            AND stage_name = 'Final Insert'
489            AND status = 'Success'
490            ORDER BY 1 DESC
491            LIMIT 1
492            """.format(  # nosec: B608
493            database=self.spec.target_database, query_id=query_id
494        )
495        try:
496            latest_run_date: datetime = ExecEnv.SESSION.sql(
497                last_success_run_sql
498            ).collect()[0][0]
499        except Exception:
500            latest_run_date = datetime.strptime(
501                "2020-01-01", GABDefaults.DATE_FORMAT.value
502            )
503
504        return latest_run_date
505
506    def _get_latest_use_case_date(self, query_id: str) -> datetime:
507        """Get latest use case configured date.
508
509        Args:
510            query_id: use case query id.
511        """
512        query_config_sql = """
513            SELECT lh_created_on
514            FROM {lkp_query_builder}
515            WHERE query_id = {query_id}
516        """.format(  # nosec: B608
517            lkp_query_builder=self.spec.lookup_table,
518            query_id=query_id,
519        )
520
521        latest_config_date: datetime = ExecEnv.SESSION.sql(query_config_sql).collect()[
522            0
523        ][0]
524
525        return latest_config_date
526
527    @classmethod
528    def _set_week_configuration_by_uc_start_of_week(cls, start_of_week: str) -> None:
529        """Set week configuration by use case start of week.
530
531        Args:
532            start_of_week: use case start of week (MONDAY or SUNDAY).
533        """
534        if start_of_week.upper() == "MONDAY":
535            pendulum.week_starts_at(pendulum.MONDAY)
536            pendulum.week_ends_at(pendulum.SUNDAY)
537        elif start_of_week.upper() == "SUNDAY":
538            pendulum.week_starts_at(pendulum.SUNDAY)
539            pendulum.week_ends_at(pendulum.SATURDAY)
540        else:
541            raise NotImplementedError(
542                f"The requested {start_of_week} is not implemented."
543                "Supported `start_of_week` values: [MONDAY, SUNDAY]"
544            )
545
546    @classmethod
547    def _update_rendered_item_cadence(
548        cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict
549    ) -> None:
550        """Override item properties based in the rendered item cadence.
551
552        Args:
553            reconciliation_cadence: configured use case reconciliation window.
554            cadence: cadence to process.
555            project_col: use case projection date column name.
556            item: predefined use case combination.
557        """
558        rendered_item = cls._get_rendered_item_cadence(
559            reconciliation_cadence, cadence, project_col, item
560        )
561        item["join_select"] = rendered_item["join_select"]
562        item["project_start"] = rendered_item["project_start"]
563        item["project_end"] = rendered_item["project_end"]
564
565    @classmethod
566    def _get_rendered_item_cadence(
567        cls, reconciliation_cadence: str, cadence: str, project_col: str, item: dict
568    ) -> dict:
569        """Update pre-configured gab parameters with use case data.
570
571        Args:
572            reconciliation_cadence: configured use case reconciliation window.
573            cadence: cadence to process.
574            project_col: use case projection date column name.
575            item: predefined use case combination.
576        """
577        return {
578            GABKeys.JOIN_SELECT: (
579                item[GABKeys.JOIN_SELECT]
580                .replace(GABReplaceableKeys.CONFIG_WEEK_START, "Monday")
581                .replace(
582                    GABReplaceableKeys.RECONCILIATION_CADENCE,
583                    reconciliation_cadence,
584                )
585                .replace(GABReplaceableKeys.CADENCE, cadence)
586            ),
587            GABKeys.PROJECT_START: (
588                item[GABKeys.PROJECT_START]
589                .replace(GABReplaceableKeys.CADENCE, cadence)
590                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
591            ),
592            GABKeys.PROJECT_END: (
593                item[GABKeys.PROJECT_END]
594                .replace(GABReplaceableKeys.CADENCE, cadence)
595                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
596            ),
597        }
598
599    @classmethod
600    def _get_cadence_configuration(
601        cls,
602        use_case_configuration: dict,
603        cadence: str,
604        reconciliation_cadence: str,
605        snapshot_flag: str,
606        start_of_week: str,
607        project_col: str,
608        window_start_date: str,
609        partition_end: str,
610    ) -> tuple[str, str, str]:
611        """Get use case configuration fields to replace pre-configured parameters.
612
613        Args:
614            use_case_configuration: use case configuration.
615            cadence: cadence to process.
616            reconciliation_cadence: cadence to be reconciliated.
617            snapshot_flag: flag indicating if for this cadence the snapshot is enabled.
618            start_of_week: use case start of week (MONDAY or SUNDAY).
619            project_col: use case projection date column name.
620            window_start_date: start date for the configured stage.
621            partition_end: end date for the configured stage.
622
623        Returns:
624            rendered_from_date: projection start date.
625            rendered_to_date: projection end date.
626            join_condition: string containing the join condition to replace in the
627                templated query by jinja substitution.
628        """
629        cadence_dict = next(
630            (
631                dict(configuration)
632                for configuration in use_case_configuration.values()
633                if (
634                    (cadence in configuration["cadence"])
635                    and (reconciliation_cadence in configuration["recon"])
636                    and (snapshot_flag in configuration["snap_flag"])
637                    and (
638                        GABStartOfWeek.get_start_of_week()[start_of_week.upper()]
639                        in configuration["week_start"]
640                    )
641                )
642            ),
643            None,
644        )
645        rendered_from_date = None
646        rendered_to_date = None
647        join_condition = None
648
649        if cadence_dict:
650            rendered_from_date = (
651                cadence_dict[GABKeys.PROJECT_START]
652                .replace(GABReplaceableKeys.CADENCE, cadence)
653                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
654            )
655            rendered_to_date = (
656                cadence_dict[GABKeys.PROJECT_END]
657                .replace(GABReplaceableKeys.CADENCE, cadence)
658                .replace(GABReplaceableKeys.DATE_COLUMN, project_col)
659            )
660
661            if cadence_dict[GABKeys.JOIN_SELECT]:
662                join_condition = """
663                 inner join (
664                     {join_select} from df_cal
665                     where calendar_date
666                     between '{bucket_start}' and '{bucket_end}'
667                 )
668                 df_cal on date({date_column})
669                     between df_cal.cadence_start_date and df_cal.cadence_end_date
670                 """.format(
671                    join_select=cadence_dict[GABKeys.JOIN_SELECT],
672                    bucket_start=window_start_date,
673                    bucket_end=partition_end,
674                    date_column=project_col,
675                )
676
677        return rendered_from_date, rendered_to_date, join_condition
678
679    def _render_template_query(
680        self,
681        templated: str,
682        cadence: str,
683        start_of_the_week: str,
684        query_id: str,
685        rendered_date: str,
686        filter_start_date: str,
687        filter_end_date: str,
688        filter_col: str,
689        timezone_offset: str,
690        join_condition: str,
691        partition_filter: str,
692        rendered_to_date: str,
693    ) -> str:
694        """Replace jinja templated parameters in the SQL with the actual data.
695
696        Args:
697            templated: templated sql file to process at this stage.
698            cadence: cadence to process.
699            start_of_the_week: use case start of week (MONDAY or SUNDAY).
700            query_id: gab configuration table use case identifier.
701            rendered_date: projection start date.
702            filter_start_date: filter start date to replace in the stage query.
703            filter_end_date: filter end date to replace in the stage query.
704            filter_col: use case projection date column name.
705            timezone_offset: timezone offset configured in the use case.
706            join_condition: string containing the join condition.
707            partition_filter: partition condition.
708            rendered_to_date: projection end date.
709        """
710        return Template(templated).render(
711            cadence="'{cadence}' as cadence".format(cadence=cadence),
712            cadence_run=cadence,
713            week_start=start_of_the_week,
714            query_id="'{query_id}' as query_id".format(query_id=query_id),
715            project_date_column=rendered_date,
716            target_table=self.spec.target_table,
717            database=self.spec.source_database,
718            start_date=filter_start_date,
719            end_date=filter_end_date,
720            filter_date_column=filter_col,
721            offset_value=timezone_offset,
722            joins=join_condition if join_condition else "",
723            partition_filter=partition_filter,
724            to_date=rendered_to_date,
725        )
726
727    def _create_stage_view(
728        self,
729        rendered_template: str,
730        stage: dict,
731        window_start_date: str,
732        window_end_date: str,
733        query_id: str,
734        query_label: str,
735        cadence: str,
736        stage_file_path: str,
737    ) -> str:
738        """Create each use case stage view.
739
740        Each stage has a specific order and refer to a specific SQL to be executed.
741
742        Args:
743            rendered_template: rendered stage SQL file.
744            stage: stage to process.
745            window_start_date: start date for the configured stage.
746            window_end_date: end date for the configured stage.
747            query_id: gab configuration table use case identifier.
748            query_label: gab configuration table use case name.
749            cadence: cadence to process.
750            stage_file_path: full stage file path (gab path + stage path).
751        """
752        run_start_time = datetime.now()
753        creation_status: str
754        error_message: Union[Exception, str]
755
756        try:
757            tmp = ExecEnv.SESSION.sql(rendered_template)
758            num_partitions = ExecEnv.SESSION.conf.get(
759                self._SPARK_DEFAULT_PARALLELISM_CONFIG,
760                self._SPARK_DEFAULT_PARALLELISM_VALUE,
761            )
762
763            if stage["repartition"]:
764                if stage["repartition"].get("numPartitions"):
765                    num_partitions = stage["repartition"]["numPartitions"]
766
767                if stage["repartition"].get("keys"):
768                    tmp = tmp.repartition(
769                        int(num_partitions), *stage["repartition"]["keys"]
770                    )
771                    self._LOGGER.info("Repartitioned on given Key(s)")
772                else:
773                    tmp = tmp.repartition(int(num_partitions))
774                    self._LOGGER.info("Repartitioned on given partition count")
775
776            temp_step_view_name: str = stage["table_alias"]
777            tmp.createOrReplaceTempView(temp_step_view_name)
778
779            if stage["storage_level"]:
780                ExecEnv.SESSION.sql(
781                    "CACHE TABLE {tbl} "
782                    "OPTIONS ('storageLevel' '{type}')".format(
783                        tbl=temp_step_view_name,
784                        type=stage["storage_level"],
785                    )
786                )
787                ExecEnv.SESSION.sql(
788                    "SELECT COUNT(*) FROM {tbl}".format(  # nosec: B608
789                        tbl=temp_step_view_name
790                    )
791                )
792                self._LOGGER.info(f"Cached stage view - {temp_step_view_name} ")
793
794            creation_status = "Success"
795            error_message = "NA"
796        except Exception as err:
797            creation_status = "Failed"
798            error_message = err
799            raise err
800        finally:
801            run_end_time = datetime.now()
802            GABUtils().logger(
803                run_start_time,
804                run_end_time,
805                window_start_date,
806                window_end_date,
807                query_id,
808                query_label,
809                cadence,
810                stage_file_path,
811                rendered_template,
812                creation_status,
813                error_message,
814                self.spec.target_database,
815            )
816
817        return temp_step_view_name
818
819    def _generate_view_statement(
820        self,
821        query_id: str,
822        cadence: str,
823        temp_stage_view_name: str,
824        lookup_query_builder: DataFrame,
825        window_start_date: str,
826        window_end_date: str,
827        query_label: str,
828    ) -> bool:
829        """Feed use case data to the insights table (default: unified use case table).
830
831        Args:
832            query_id: gab configuration table use case identifier.
833            cadence: cadence to process.
834            temp_stage_view_name: name of the temp view generated by the stage.
835            lookup_query_builder: gab configuration data.
836            window_start_date: start date for the configured stage.
837            window_end_date: end date for the configured stage.
838            query_label: gab configuration table use case name.
839        """
840        run_start_time = datetime.now()
841        creation_status: str
842        error_message: Union[Exception, str]
843
844        GABDeleteGenerator(
845            query_id=query_id,
846            cadence=cadence,
847            temp_stage_view_name=temp_stage_view_name,
848            lookup_query_builder=lookup_query_builder,
849            target_database=self.spec.target_database,
850            target_table=self.spec.target_table,
851        ).generate_sql()
852
853        gen_ins = GABInsertGenerator(
854            query_id=query_id,
855            cadence=cadence,
856            final_stage_table=temp_stage_view_name,
857            lookup_query_builder=lookup_query_builder,
858            target_database=self.spec.target_database,
859            target_table=self.spec.target_table,
860        ).generate_sql()
861        try:
862            ExecEnv.SESSION.sql(gen_ins)
863
864            creation_status = "Success"
865            error_message = "NA"
866            inserted = True
867        except Exception as err:
868            creation_status = "Failed"
869            error_message = err
870            raise
871        finally:
872            run_end_time = datetime.now()
873            GABUtils().logger(
874                run_start_time,
875                run_end_time,
876                window_start_date,
877                window_end_date,
878                query_id,
879                query_label,
880                cadence,
881                "Final Insert",
882                gen_ins,
883                creation_status,
884                error_message,
885                self.spec.target_database,
886            )
887
888        return inserted
889
890    @classmethod
891    def _unpersist_cached_views(cls, unpersist_list: list[str]) -> None:
892        """Unpersist cached views.
893
894        Args:
895            unpersist_list: list containing the view names to unpersist.
896        """
897        [
898            ExecEnv.SESSION.sql("UNCACHE TABLE {tbl}".format(tbl=i))
899            for i in unpersist_list
900        ]
901
902    def _generate_ddl(
903        self,
904        latest_config_date: datetime,
905        latest_run_date: datetime,
906        query_id: str,
907        lookup_query_builder: DataFrame,
908    ) -> None:
909        """Generate the actual gold asset.
910
911        It will create and return the view containing all specified dimensions, metrics
912            and computed metric for each cadence/reconciliation window.
913
914        Args:
915            latest_config_date: latest use case configuration date.
916            latest_run_date: latest use case run date.
917            query_id: gab configuration table use case identifier.
918            lookup_query_builder: gab configuration data.
919        """
920        if str(latest_config_date) > str(latest_run_date):
921            GABViewManager(
922                query_id=query_id,
923                lookup_query_builder=lookup_query_builder,
924                target_database=self.spec.target_database,
925                target_table=self.spec.target_table,
926            ).generate_use_case_views()
927        else:
928            self._LOGGER.info(
929                "View is not being re-created as there are no changes in the "
930                "configuration after the latest run"
931            )

Class representing the gold asset builder.

GAB(acon: dict)
43    def __init__(self, acon: dict):
44        """Construct GAB instances.
45
46        Args:
47            acon: algorithm configuration.
48        """
49        self.spec: GABSpec = GABSpec.create_from_acon(acon=acon)

Construct GAB instances.

Arguments:
  • acon: algorithm configuration.
def execute(self) -> None:
51    def execute(self) -> None:
52        """Execute the Gold Asset Builder."""
53        self._LOGGER.info(f"Reading {self.spec.lookup_table} as lkp_query_builder")
54        lookup_query_builder_df = ExecEnv.SESSION.read.table(self.spec.lookup_table)
55        ExecEnv.SESSION.read.table(self.spec.calendar_table).createOrReplaceTempView(
56            "df_cal"
57        )
58        self._LOGGER.info(f"Generating calendar from {self.spec.calendar_table}")
59
60        query_label = self.spec.query_label_filter
61        queue = self.spec.queue_filter
62        cadence = self.spec.cadence_filter
63
64        self._LOGGER.info(f"Query Label Filter {query_label}")
65        self._LOGGER.info(f"Queue Filter {queue}")
66        self._LOGGER.info(f"Cadence Filter {cadence}")
67
68        gab_path = self.spec.gab_base_path
69        self._LOGGER.info(f"Gab Base Path {gab_path}")
70
71        lookup_query_builder_df = lookup_query_builder_df.filter(
72            (
73                (lookup_query_builder_df.query_label.isin(query_label))
74                & (lookup_query_builder_df.queue.isin(queue))
75                & (lookup_query_builder_df.is_active != lit("N"))
76            )
77        )
78
79        lookup_query_builder_df.cache()
80
81        for use_case in lookup_query_builder_df.collect():
82            self._process_use_case(
83                use_case=use_case,
84                lookup_query_builder=lookup_query_builder_df,
85                selected_cadences=cadence,
86                gab_path=gab_path,
87            )
88
89        lookup_query_builder_df.unpersist()

Execute the Gold Asset Builder.