lakehouse_engine.core.gab_sql_generator

Module to define GAB SQL classes.

  1"""Module to define GAB SQL classes."""
  2
  3import ast
  4import json
  5from abc import ABC, abstractmethod
  6from typing import Any, Callable, Optional
  7
  8from pyspark.sql import DataFrame
  9from pyspark.sql.functions import col, lit, struct, to_json
 10
 11from lakehouse_engine.core.exec_env import ExecEnv
 12from lakehouse_engine.utils.gab_utils import GABUtils
 13from lakehouse_engine.utils.logging_handler import LoggingHandler
 14
 15
 16def _execute_sql(func) -> Callable:  # type: ignore
 17    """Execute the SQL resulting from the function.
 18
 19    This function is protected to be used just in this module.
 20    It's used to decorate functions that returns a SQL statement.
 21
 22    Args:
 23        func: function that will return the sql to execute
 24    """
 25
 26    def inner(*args: Any) -> None:
 27        generated_sql = func(*args)
 28        if generated_sql:
 29            ExecEnv.SESSION.sql(generated_sql)
 30
 31    return inner
 32
 33
 34class GABSQLGenerator(ABC):
 35    """Abstract class defining the behaviour of a GAB SQL Generator."""
 36
 37    @abstractmethod
 38    def generate_sql(self) -> Optional[str]:
 39        """Define the generate sql command.
 40
 41        E.g., the behaviour of gab generate sql inheriting from this.
 42        """
 43        pass
 44
 45
 46class GABInsertGenerator(GABSQLGenerator):
 47    """GAB insert generator.
 48
 49    Creates the insert statement based on the dimensions and metrics provided in
 50    the configuration table.
 51    """
 52
 53    _LOGGER = LoggingHandler(__name__).get_logger()
 54
 55    def __init__(
 56        self,
 57        query_id: str,
 58        cadence: str,
 59        final_stage_table: str,
 60        lookup_query_builder: DataFrame,
 61        target_database: str,
 62        target_table: str,
 63    ):
 64        """Construct GABInsertGenerator instances.
 65
 66        Args:
 67            query_id: gab configuration table use case identifier.
 68            cadence:  inputted cadence to process.
 69            final_stage_table: stage view name.
 70            lookup_query_builder: gab configuration data.
 71            target_database: target database to write.
 72            target_table: target table to write.
 73        """
 74        self.query_id = query_id
 75        self.cadence = cadence
 76        self.final_stage_table = final_stage_table
 77        self.lookup_query_builder = lookup_query_builder
 78        self.target_database = target_database
 79        self.target_table = target_table
 80
 81    def generate_sql(self) -> Optional[str]:
 82        """Generate insert sql statement to the insights table."""
 83        insert_sql_statement = self._insert_statement_generator()
 84
 85        return insert_sql_statement
 86
 87    def _insert_statement_generator(self) -> str:
 88        """Generate GAB insert statement.
 89
 90        Creates the insert statement based on the dimensions and metrics provided in
 91        the configuration table.
 92        """
 93        result = GABUtils.get_json_column_as_dict(
 94            self.lookup_query_builder, self.query_id, "mappings"
 95        )
 96
 97        for result_key in result.keys():
 98            joined_dimensions, joined_metrics = self._get_mapping_columns(
 99                mapping=result[result_key]
100            )
101            gen_ins = f"""
102                INSERT INTO {self.target_database}.{self.target_table}
103                SELECT
104                    {self.query_id} as query_id,
105                    '{self.cadence}' as cadence,
106                    {joined_dimensions},
107                    {joined_metrics},
108                    current_timestamp() as lh_created_on
109                FROM {self.final_stage_table}
110                """  # nosec: B608
111
112        return gen_ins
113
114    @classmethod
115    def _get_mapping_columns(cls, mapping: dict) -> tuple[str, str]:
116        """Get mapping columns(dimensions and metrics) as joined string.
117
118        Args:
119            mapping: use case mappings configuration.
120        """
121        dimensions_mapping = mapping["dimensions"]
122        metrics_mapping = mapping["metric"]
123
124        joined_dimensions = cls._join_extracted_column_with_filled_columns(
125            columns=dimensions_mapping, is_dimension=True
126        )
127        joined_metrics = cls._join_extracted_column_with_filled_columns(
128            columns=metrics_mapping, is_dimension=False
129        )
130
131        return joined_dimensions, joined_metrics
132
133    @classmethod
134    def _join_extracted_column_with_filled_columns(
135        cls, columns: dict, is_dimension: bool
136    ) -> str:
137        """Join extracted columns with empty filled columns.
138
139        Args:
140            columns: use case columns and values.
141            is_dimension: flag identifying if is a dimension or a metric.
142        """
143        extracted_columns_with_alias = (
144            GABUtils.extract_columns_from_mapping(  # type: ignore
145                columns=columns, is_dimension=is_dimension
146            )
147        )
148
149        filled_columns = cls._fill_empty_columns(
150            extracted_columns=extracted_columns_with_alias,  # type: ignore
151            is_dimension=is_dimension,
152        )
153
154        joined_columns = [*extracted_columns_with_alias, *filled_columns]
155
156        return ",".join(joined_columns)
157
158    @classmethod
159    def _fill_empty_columns(
160        cls, extracted_columns: list[str], is_dimension: bool
161    ) -> list[str]:
162        """Fill empty columns as null.
163
164        As the data is expected to have 40 columns we have to fill the unused columns.
165
166        Args:
167            extracted_columns: use case extracted columns.
168            is_dimension: flag identifying if is a dimension or a metric.
169        """
170        filled_columns = []
171
172        for ins in range(
173            (
174                len(extracted_columns) - 1
175                if is_dimension
176                else len(extracted_columns) + 1
177            ),
178            41,
179        ):
180            filled_columns.append(
181                " null as {}{}".format("d" if is_dimension else "m", ins)
182            )
183
184        return filled_columns
185
186
187class GABViewGenerator(GABSQLGenerator):
188    """GAB view generator.
189
190    Creates the use case view statement to be consumed.
191    """
192
193    _LOGGER = LoggingHandler(__name__).get_logger()
194
195    def __init__(
196        self,
197        cadence_snapshot_status: dict,
198        target_database: str,
199        view_name: str,
200        final_cols: str,
201        target_table: str,
202        dimensions_and_metrics_with_alias: str,
203        dimensions: str,
204        dimensions_and_metrics: str,
205        final_calculated_script: str,
206        query_id: str,
207        view_filter: str,
208        final_calculated_script_snapshot: str,
209        without_snapshot_cadences: list[str],
210        with_snapshot_cadences: list[str],
211    ):
212        """Construct GABViewGenerator instances.
213
214        Args:
215            cadence_snapshot_status: each cadence with the corresponding snapshot
216                status.
217            target_database: target database to write.
218            view_name: name of the view to be generated.
219            final_cols: columns to return in the view.
220            target_table: target table to write.
221            dimensions_and_metrics_with_alias: configured dimensions and metrics with
222                alias to compute in the view.
223            dimensions: use case configured dimensions.
224            dimensions_and_metrics: use case configured dimensions and metrics.
225            final_calculated_script: use case calculated metrics.
226            query_id: gab configuration table use case identifier.
227            view_filter: filter to add in the view.
228            final_calculated_script_snapshot: use case calculated metrics with snapshot.
229            without_snapshot_cadences: cadences without snapshot.
230            with_snapshot_cadences: cadences with snapshot.
231        """
232        self.cadence_snapshot_status = cadence_snapshot_status
233        self.target_database = target_database
234        self.result_key = view_name
235        self.final_cols = final_cols
236        self.target_table = target_table
237        self.dimensions_and_metrics_with_alias = dimensions_and_metrics_with_alias
238        self.dimensions = dimensions
239        self.dimensions_and_metrics = dimensions_and_metrics
240        self.final_calculated_script = final_calculated_script
241        self.query_id = query_id
242        self.view_filter = view_filter
243        self.final_calculated_script_snapshot = final_calculated_script_snapshot
244        self.without_snapshot_cadences = without_snapshot_cadences
245        self.with_snapshot_cadences = with_snapshot_cadences
246
247    @_execute_sql
248    def generate_sql(self) -> Optional[str]:
249        """Generate use case view sql statement."""
250        consumption_view_sql = self._create_consumption_view()
251
252        return consumption_view_sql
253
254    def _create_consumption_view(self) -> str:
255        """Create consumption view."""
256        final_view_query = self._generate_consumption_view_statement(
257            self.cadence_snapshot_status,
258            self.target_database,
259            self.final_cols,
260            self.target_table,
261            self.dimensions_and_metrics_with_alias,
262            self.dimensions,
263            self.dimensions_and_metrics,
264            self.final_calculated_script,
265            self.query_id,
266            self.view_filter,
267            self.final_calculated_script_snapshot,
268            without_snapshot_cadences=",".join(
269                f'"{w}"' for w in self.without_snapshot_cadences
270            ),
271            with_snapshot_cadences=",".join(
272                f'"{w}"' for w in self.with_snapshot_cadences
273            ),
274        )
275
276        rendered_query = """
277            CREATE OR REPLACE VIEW {database}.{view_name} AS {final_view_query}
278            """.format(
279            database=self.target_database,
280            view_name=self.result_key,
281            final_view_query=final_view_query,
282        )
283        self._LOGGER.info(f"Consumption view statement: {rendered_query}")
284        return rendered_query
285
286    @classmethod
287    def _generate_consumption_view_statement(
288        cls,
289        cadence_snapshot_status: dict,
290        target_database: str,
291        final_cols: str,
292        target_table: str,
293        dimensions_and_metrics_with_alias: str,
294        dimensions: str,
295        dimensions_and_metrics: str,
296        final_calculated_script: str,
297        query_id: str,
298        view_filter: str,
299        final_calculated_script_snapshot: str,
300        without_snapshot_cadences: str,
301        with_snapshot_cadences: str,
302    ) -> str:
303        """Generate consumption view.
304
305        Args:
306            cadence_snapshot_status: cadences to execute with the information if it has
307                snapshot.
308            target_database: target database to write.
309            final_cols: use case columns exposed in the consumption view.
310            target_table: target table to write.
311            dimensions_and_metrics_with_alias: dimensions and metrics as string columns
312                with alias.
313            dimensions: dimensions as string columns.
314            dimensions_and_metrics: dimensions and metrics as string columns
315                without alias.
316            final_calculated_script: final calculated metrics script.
317            query_id: gab configuration table use case identifier.
318            view_filter: filter to execute on the view.
319            final_calculated_script_snapshot: final calculated metrics with snapshot
320                script.
321            without_snapshot_cadences: cadences without snapshot.
322            with_snapshot_cadences: cadences with snapshot.
323        """
324        cls._LOGGER.info("Generating consumption view statement...")
325        cls._LOGGER.info(
326            f"""
327            {{
328                target_database: {target_database},
329                target_table: {target_table},
330                query_id: {query_id},
331                cadence_and_snapshot_status: {cadence_snapshot_status},
332                cadences_without_snapshot: [{without_snapshot_cadences}],
333                cadences_with_snapshot: [{with_snapshot_cadences}],
334                final_cols: {final_cols},
335                dimensions_and_metrics_with_alias: {dimensions_and_metrics_with_alias},
336                dimensions: {dimensions},
337                dimensions_with_metrics: {dimensions_and_metrics},
338                final_calculated_script: {final_calculated_script},
339                final_calculated_script_snapshot: {final_calculated_script_snapshot},
340                view_filter: {view_filter}
341            }}"""
342        )
343        if (
344            "Y" in cadence_snapshot_status.values()
345            and "N" in cadence_snapshot_status.values()
346        ):
347            consumption_view_query = f"""
348                WITH TEMP1 AS (
349                    SELECT
350                        a.cadence,
351                        {dimensions_and_metrics_with_alias}{final_calculated_script}
352                    FROM {target_database}.{target_table} a
353                    WHERE a.query_id = {query_id}
354                    AND cadence IN ({without_snapshot_cadences})
355                    {view_filter}
356                ),
357                TEMP_RN AS (
358                    SELECT
359                        a.cadence,
360                        a.from_date,
361                        a.to_date,
362                        {dimensions_and_metrics},
363                        row_number() over(
364                            PARTITION BY
365                                a.cadence,
366                                {dimensions},
367                                a.from_date
368                            order by to_date
369                        ) as rn
370                    FROM {target_database}.{target_table} a
371                    WHERE a.query_id = {query_id}
372                    AND cadence IN ({with_snapshot_cadences})
373                    {view_filter}
374                ),
375                TEMP2 AS (
376                    SELECT
377                        a.cadence,
378                        {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot}
379                    FROM TEMP_RN a
380                ),
381                TEMP3 AS (SELECT * FROM TEMP1 UNION SELECT * from TEMP2)
382                SELECT {final_cols} FROM TEMP3
383            """  # nosec: B608
384        elif "N" in cadence_snapshot_status.values():
385            consumption_view_query = f"""
386                WITH TEMP1 AS (
387                    SELECT
388                        a.cadence,
389                        {dimensions_and_metrics_with_alias}{final_calculated_script}
390                    FROM {target_database}.{target_table} a
391                    WHERE a.query_id = {query_id}
392                    AND cadence IN ({without_snapshot_cadences})  {view_filter}
393                )
394                SELECT {final_cols} FROM TEMP1
395            """  # nosec: B608
396        else:
397            consumption_view_query = f"""
398                WITH TEMP_RN AS (
399                    SELECT
400                        a.cadence,
401                        a.from_date,
402                        a.to_date,
403                        {dimensions_and_metrics},
404                        row_number() over(
405                            PARTITION BY
406                                a.cadence,
407                                a.from_date,
408                                a.to_date,
409                                {dimensions},
410                                a.from_date
411                        order by to_date) as rn
412                    FROM {target_database}.{target_table} a
413                    WHERE a.query_id = {query_id}
414                    AND cadence IN ({with_snapshot_cadences})
415                    {view_filter}
416                ),
417                TEMP2 AS (
418                    SELECT
419                        a.cadence,
420                        {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot}
421                    FROM TEMP_RN a
422                )
423                SELECT {final_cols} FROM TEMP2
424            """  # nosec: B608
425
426        return consumption_view_query
427
428
429class GABDeleteGenerator(GABSQLGenerator):
430    """GAB delete generator.
431
432    Creates the delete statement to clean the use case base data on the insights table.
433    """
434
435    _LOGGER = LoggingHandler(__name__).get_logger()
436
437    def __init__(
438        self,
439        query_id: str,
440        cadence: str,
441        temp_stage_view_name: str,
442        lookup_query_builder: DataFrame,
443        target_database: str,
444        target_table: str,
445    ):
446        """Construct GABViewGenerator instances.
447
448        Args:
449            query_id: gab configuration table use case identifier.
450            cadence:  inputted cadence to process.
451            temp_stage_view_name: stage view name.
452            lookup_query_builder: gab configuration data.
453            target_database: target database to write.
454            target_table: target table to write.
455        """
456        self.query_id = query_id
457        self.cadence = cadence
458        self.temp_stage_view_name = temp_stage_view_name
459        self.lookup_query_builder = lookup_query_builder
460        self.target_database = target_database
461        self.target_table = target_table
462
463    @_execute_sql
464    def generate_sql(self) -> Optional[str]:
465        """Generate delete sql statement.
466
467        This statement is to clean the insights table for the corresponding use case.
468        """
469        delete_sql_statement = self._delete_statement_generator()
470
471        return delete_sql_statement
472
473    def _delete_statement_generator(self) -> str:
474        df_filtered = self.lookup_query_builder.filter(
475            col("query_id") == lit(self.query_id)
476        )
477
478        df_map = df_filtered.select(col("mappings"))
479        view_df = df_map.select(
480            to_json(struct([df_map[x] for x in df_map.columns]))
481        ).collect()[0][0]
482        line = json.loads(view_df)
483
484        for line_v in line.values():
485            result = ast.literal_eval(line_v)
486
487        for result_key in result.keys():
488            result_new = result[result_key]
489            dim_from_date = result_new["dimensions"]["from_date"]
490            dim_to_date = result_new["dimensions"]["to_date"]
491
492        self._LOGGER.info(f"temp stage view name: {self.temp_stage_view_name}")
493
494        min_from_date = ExecEnv.SESSION.sql(
495            """
496            SELECT
497                MIN({from_date}) as min_from_date
498            FROM {iter_stages}""".format(  # nosec: B608
499                iter_stages=self.temp_stage_view_name, from_date=dim_from_date
500            )
501        ).collect()[0][0]
502        max_from_date = ExecEnv.SESSION.sql(
503            """
504            SELECT
505                MAX({from_date}) as max_from_date
506            FROM {iter_stages}""".format(  # nosec: B608
507                iter_stages=self.temp_stage_view_name, from_date=dim_from_date
508            )
509        ).collect()[0][0]
510
511        min_to_date = ExecEnv.SESSION.sql(
512            """
513            SELECT
514                MIN({to_date}) as min_to_date
515            FROM {iter_stages}""".format(  # nosec: B608
516                iter_stages=self.temp_stage_view_name, to_date=dim_to_date
517            )
518        ).collect()[0][0]
519        max_to_date = ExecEnv.SESSION.sql(
520            """
521            SELECT
522                MAX({to_date}) as max_to_date
523            FROM {iter_stages}""".format(  # nosec: B608
524                iter_stages=self.temp_stage_view_name, to_date=dim_to_date
525            )
526        ).collect()[0][0]
527
528        gen_del = """
529        DELETE FROM {target_database}.{target_table} a
530            WHERE query_id = {query_id}
531            AND cadence = '{cadence}'
532            AND from_date BETWEEN '{min_from_date}' AND '{max_from_date}'
533            AND to_date BETWEEN '{min_to_date}' AND '{max_to_date}'
534        """.format(  # nosec: B608
535            target_database=self.target_database,
536            target_table=self.target_table,
537            query_id=self.query_id,
538            cadence=self.cadence,
539            min_from_date=min_from_date,
540            max_from_date=max_from_date,
541            min_to_date=min_to_date,
542            max_to_date=max_to_date,
543        )
544
545        return gen_del
class GABSQLGenerator(abc.ABC):
35class GABSQLGenerator(ABC):
36    """Abstract class defining the behaviour of a GAB SQL Generator."""
37
38    @abstractmethod
39    def generate_sql(self) -> Optional[str]:
40        """Define the generate sql command.
41
42        E.g., the behaviour of gab generate sql inheriting from this.
43        """
44        pass

Abstract class defining the behaviour of a GAB SQL Generator.

@abstractmethod
def generate_sql(self) -> Optional[str]:
38    @abstractmethod
39    def generate_sql(self) -> Optional[str]:
40        """Define the generate sql command.
41
42        E.g., the behaviour of gab generate sql inheriting from this.
43        """
44        pass

Define the generate sql command.

E.g., the behaviour of gab generate sql inheriting from this.

class GABInsertGenerator(GABSQLGenerator):
 47class GABInsertGenerator(GABSQLGenerator):
 48    """GAB insert generator.
 49
 50    Creates the insert statement based on the dimensions and metrics provided in
 51    the configuration table.
 52    """
 53
 54    _LOGGER = LoggingHandler(__name__).get_logger()
 55
 56    def __init__(
 57        self,
 58        query_id: str,
 59        cadence: str,
 60        final_stage_table: str,
 61        lookup_query_builder: DataFrame,
 62        target_database: str,
 63        target_table: str,
 64    ):
 65        """Construct GABInsertGenerator instances.
 66
 67        Args:
 68            query_id: gab configuration table use case identifier.
 69            cadence:  inputted cadence to process.
 70            final_stage_table: stage view name.
 71            lookup_query_builder: gab configuration data.
 72            target_database: target database to write.
 73            target_table: target table to write.
 74        """
 75        self.query_id = query_id
 76        self.cadence = cadence
 77        self.final_stage_table = final_stage_table
 78        self.lookup_query_builder = lookup_query_builder
 79        self.target_database = target_database
 80        self.target_table = target_table
 81
 82    def generate_sql(self) -> Optional[str]:
 83        """Generate insert sql statement to the insights table."""
 84        insert_sql_statement = self._insert_statement_generator()
 85
 86        return insert_sql_statement
 87
 88    def _insert_statement_generator(self) -> str:
 89        """Generate GAB insert statement.
 90
 91        Creates the insert statement based on the dimensions and metrics provided in
 92        the configuration table.
 93        """
 94        result = GABUtils.get_json_column_as_dict(
 95            self.lookup_query_builder, self.query_id, "mappings"
 96        )
 97
 98        for result_key in result.keys():
 99            joined_dimensions, joined_metrics = self._get_mapping_columns(
100                mapping=result[result_key]
101            )
102            gen_ins = f"""
103                INSERT INTO {self.target_database}.{self.target_table}
104                SELECT
105                    {self.query_id} as query_id,
106                    '{self.cadence}' as cadence,
107                    {joined_dimensions},
108                    {joined_metrics},
109                    current_timestamp() as lh_created_on
110                FROM {self.final_stage_table}
111                """  # nosec: B608
112
113        return gen_ins
114
115    @classmethod
116    def _get_mapping_columns(cls, mapping: dict) -> tuple[str, str]:
117        """Get mapping columns(dimensions and metrics) as joined string.
118
119        Args:
120            mapping: use case mappings configuration.
121        """
122        dimensions_mapping = mapping["dimensions"]
123        metrics_mapping = mapping["metric"]
124
125        joined_dimensions = cls._join_extracted_column_with_filled_columns(
126            columns=dimensions_mapping, is_dimension=True
127        )
128        joined_metrics = cls._join_extracted_column_with_filled_columns(
129            columns=metrics_mapping, is_dimension=False
130        )
131
132        return joined_dimensions, joined_metrics
133
134    @classmethod
135    def _join_extracted_column_with_filled_columns(
136        cls, columns: dict, is_dimension: bool
137    ) -> str:
138        """Join extracted columns with empty filled columns.
139
140        Args:
141            columns: use case columns and values.
142            is_dimension: flag identifying if is a dimension or a metric.
143        """
144        extracted_columns_with_alias = (
145            GABUtils.extract_columns_from_mapping(  # type: ignore
146                columns=columns, is_dimension=is_dimension
147            )
148        )
149
150        filled_columns = cls._fill_empty_columns(
151            extracted_columns=extracted_columns_with_alias,  # type: ignore
152            is_dimension=is_dimension,
153        )
154
155        joined_columns = [*extracted_columns_with_alias, *filled_columns]
156
157        return ",".join(joined_columns)
158
159    @classmethod
160    def _fill_empty_columns(
161        cls, extracted_columns: list[str], is_dimension: bool
162    ) -> list[str]:
163        """Fill empty columns as null.
164
165        As the data is expected to have 40 columns we have to fill the unused columns.
166
167        Args:
168            extracted_columns: use case extracted columns.
169            is_dimension: flag identifying if is a dimension or a metric.
170        """
171        filled_columns = []
172
173        for ins in range(
174            (
175                len(extracted_columns) - 1
176                if is_dimension
177                else len(extracted_columns) + 1
178            ),
179            41,
180        ):
181            filled_columns.append(
182                " null as {}{}".format("d" if is_dimension else "m", ins)
183            )
184
185        return filled_columns

GAB insert generator.

Creates the insert statement based on the dimensions and metrics provided in the configuration table.

GABInsertGenerator( query_id: str, cadence: str, final_stage_table: str, lookup_query_builder: pyspark.sql.dataframe.DataFrame, target_database: str, target_table: str)
56    def __init__(
57        self,
58        query_id: str,
59        cadence: str,
60        final_stage_table: str,
61        lookup_query_builder: DataFrame,
62        target_database: str,
63        target_table: str,
64    ):
65        """Construct GABInsertGenerator instances.
66
67        Args:
68            query_id: gab configuration table use case identifier.
69            cadence:  inputted cadence to process.
70            final_stage_table: stage view name.
71            lookup_query_builder: gab configuration data.
72            target_database: target database to write.
73            target_table: target table to write.
74        """
75        self.query_id = query_id
76        self.cadence = cadence
77        self.final_stage_table = final_stage_table
78        self.lookup_query_builder = lookup_query_builder
79        self.target_database = target_database
80        self.target_table = target_table

Construct GABInsertGenerator instances.

Arguments:
  • query_id: gab configuration table use case identifier.
  • cadence: inputted cadence to process.
  • final_stage_table: stage view name.
  • lookup_query_builder: gab configuration data.
  • target_database: target database to write.
  • target_table: target table to write.
query_id
cadence
final_stage_table
lookup_query_builder
target_database
target_table
def generate_sql(self) -> Optional[str]:
82    def generate_sql(self) -> Optional[str]:
83        """Generate insert sql statement to the insights table."""
84        insert_sql_statement = self._insert_statement_generator()
85
86        return insert_sql_statement

Generate insert sql statement to the insights table.

class GABViewGenerator(GABSQLGenerator):
188class GABViewGenerator(GABSQLGenerator):
189    """GAB view generator.
190
191    Creates the use case view statement to be consumed.
192    """
193
194    _LOGGER = LoggingHandler(__name__).get_logger()
195
196    def __init__(
197        self,
198        cadence_snapshot_status: dict,
199        target_database: str,
200        view_name: str,
201        final_cols: str,
202        target_table: str,
203        dimensions_and_metrics_with_alias: str,
204        dimensions: str,
205        dimensions_and_metrics: str,
206        final_calculated_script: str,
207        query_id: str,
208        view_filter: str,
209        final_calculated_script_snapshot: str,
210        without_snapshot_cadences: list[str],
211        with_snapshot_cadences: list[str],
212    ):
213        """Construct GABViewGenerator instances.
214
215        Args:
216            cadence_snapshot_status: each cadence with the corresponding snapshot
217                status.
218            target_database: target database to write.
219            view_name: name of the view to be generated.
220            final_cols: columns to return in the view.
221            target_table: target table to write.
222            dimensions_and_metrics_with_alias: configured dimensions and metrics with
223                alias to compute in the view.
224            dimensions: use case configured dimensions.
225            dimensions_and_metrics: use case configured dimensions and metrics.
226            final_calculated_script: use case calculated metrics.
227            query_id: gab configuration table use case identifier.
228            view_filter: filter to add in the view.
229            final_calculated_script_snapshot: use case calculated metrics with snapshot.
230            without_snapshot_cadences: cadences without snapshot.
231            with_snapshot_cadences: cadences with snapshot.
232        """
233        self.cadence_snapshot_status = cadence_snapshot_status
234        self.target_database = target_database
235        self.result_key = view_name
236        self.final_cols = final_cols
237        self.target_table = target_table
238        self.dimensions_and_metrics_with_alias = dimensions_and_metrics_with_alias
239        self.dimensions = dimensions
240        self.dimensions_and_metrics = dimensions_and_metrics
241        self.final_calculated_script = final_calculated_script
242        self.query_id = query_id
243        self.view_filter = view_filter
244        self.final_calculated_script_snapshot = final_calculated_script_snapshot
245        self.without_snapshot_cadences = without_snapshot_cadences
246        self.with_snapshot_cadences = with_snapshot_cadences
247
248    @_execute_sql
249    def generate_sql(self) -> Optional[str]:
250        """Generate use case view sql statement."""
251        consumption_view_sql = self._create_consumption_view()
252
253        return consumption_view_sql
254
255    def _create_consumption_view(self) -> str:
256        """Create consumption view."""
257        final_view_query = self._generate_consumption_view_statement(
258            self.cadence_snapshot_status,
259            self.target_database,
260            self.final_cols,
261            self.target_table,
262            self.dimensions_and_metrics_with_alias,
263            self.dimensions,
264            self.dimensions_and_metrics,
265            self.final_calculated_script,
266            self.query_id,
267            self.view_filter,
268            self.final_calculated_script_snapshot,
269            without_snapshot_cadences=",".join(
270                f'"{w}"' for w in self.without_snapshot_cadences
271            ),
272            with_snapshot_cadences=",".join(
273                f'"{w}"' for w in self.with_snapshot_cadences
274            ),
275        )
276
277        rendered_query = """
278            CREATE OR REPLACE VIEW {database}.{view_name} AS {final_view_query}
279            """.format(
280            database=self.target_database,
281            view_name=self.result_key,
282            final_view_query=final_view_query,
283        )
284        self._LOGGER.info(f"Consumption view statement: {rendered_query}")
285        return rendered_query
286
287    @classmethod
288    def _generate_consumption_view_statement(
289        cls,
290        cadence_snapshot_status: dict,
291        target_database: str,
292        final_cols: str,
293        target_table: str,
294        dimensions_and_metrics_with_alias: str,
295        dimensions: str,
296        dimensions_and_metrics: str,
297        final_calculated_script: str,
298        query_id: str,
299        view_filter: str,
300        final_calculated_script_snapshot: str,
301        without_snapshot_cadences: str,
302        with_snapshot_cadences: str,
303    ) -> str:
304        """Generate consumption view.
305
306        Args:
307            cadence_snapshot_status: cadences to execute with the information if it has
308                snapshot.
309            target_database: target database to write.
310            final_cols: use case columns exposed in the consumption view.
311            target_table: target table to write.
312            dimensions_and_metrics_with_alias: dimensions and metrics as string columns
313                with alias.
314            dimensions: dimensions as string columns.
315            dimensions_and_metrics: dimensions and metrics as string columns
316                without alias.
317            final_calculated_script: final calculated metrics script.
318            query_id: gab configuration table use case identifier.
319            view_filter: filter to execute on the view.
320            final_calculated_script_snapshot: final calculated metrics with snapshot
321                script.
322            without_snapshot_cadences: cadences without snapshot.
323            with_snapshot_cadences: cadences with snapshot.
324        """
325        cls._LOGGER.info("Generating consumption view statement...")
326        cls._LOGGER.info(
327            f"""
328            {{
329                target_database: {target_database},
330                target_table: {target_table},
331                query_id: {query_id},
332                cadence_and_snapshot_status: {cadence_snapshot_status},
333                cadences_without_snapshot: [{without_snapshot_cadences}],
334                cadences_with_snapshot: [{with_snapshot_cadences}],
335                final_cols: {final_cols},
336                dimensions_and_metrics_with_alias: {dimensions_and_metrics_with_alias},
337                dimensions: {dimensions},
338                dimensions_with_metrics: {dimensions_and_metrics},
339                final_calculated_script: {final_calculated_script},
340                final_calculated_script_snapshot: {final_calculated_script_snapshot},
341                view_filter: {view_filter}
342            }}"""
343        )
344        if (
345            "Y" in cadence_snapshot_status.values()
346            and "N" in cadence_snapshot_status.values()
347        ):
348            consumption_view_query = f"""
349                WITH TEMP1 AS (
350                    SELECT
351                        a.cadence,
352                        {dimensions_and_metrics_with_alias}{final_calculated_script}
353                    FROM {target_database}.{target_table} a
354                    WHERE a.query_id = {query_id}
355                    AND cadence IN ({without_snapshot_cadences})
356                    {view_filter}
357                ),
358                TEMP_RN AS (
359                    SELECT
360                        a.cadence,
361                        a.from_date,
362                        a.to_date,
363                        {dimensions_and_metrics},
364                        row_number() over(
365                            PARTITION BY
366                                a.cadence,
367                                {dimensions},
368                                a.from_date
369                            order by to_date
370                        ) as rn
371                    FROM {target_database}.{target_table} a
372                    WHERE a.query_id = {query_id}
373                    AND cadence IN ({with_snapshot_cadences})
374                    {view_filter}
375                ),
376                TEMP2 AS (
377                    SELECT
378                        a.cadence,
379                        {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot}
380                    FROM TEMP_RN a
381                ),
382                TEMP3 AS (SELECT * FROM TEMP1 UNION SELECT * from TEMP2)
383                SELECT {final_cols} FROM TEMP3
384            """  # nosec: B608
385        elif "N" in cadence_snapshot_status.values():
386            consumption_view_query = f"""
387                WITH TEMP1 AS (
388                    SELECT
389                        a.cadence,
390                        {dimensions_and_metrics_with_alias}{final_calculated_script}
391                    FROM {target_database}.{target_table} a
392                    WHERE a.query_id = {query_id}
393                    AND cadence IN ({without_snapshot_cadences})  {view_filter}
394                )
395                SELECT {final_cols} FROM TEMP1
396            """  # nosec: B608
397        else:
398            consumption_view_query = f"""
399                WITH TEMP_RN AS (
400                    SELECT
401                        a.cadence,
402                        a.from_date,
403                        a.to_date,
404                        {dimensions_and_metrics},
405                        row_number() over(
406                            PARTITION BY
407                                a.cadence,
408                                a.from_date,
409                                a.to_date,
410                                {dimensions},
411                                a.from_date
412                        order by to_date) as rn
413                    FROM {target_database}.{target_table} a
414                    WHERE a.query_id = {query_id}
415                    AND cadence IN ({with_snapshot_cadences})
416                    {view_filter}
417                ),
418                TEMP2 AS (
419                    SELECT
420                        a.cadence,
421                        {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot}
422                    FROM TEMP_RN a
423                )
424                SELECT {final_cols} FROM TEMP2
425            """  # nosec: B608
426
427        return consumption_view_query

GAB view generator.

Creates the use case view statement to be consumed.

GABViewGenerator( cadence_snapshot_status: dict, target_database: str, view_name: str, final_cols: str, target_table: str, dimensions_and_metrics_with_alias: str, dimensions: str, dimensions_and_metrics: str, final_calculated_script: str, query_id: str, view_filter: str, final_calculated_script_snapshot: str, without_snapshot_cadences: list[str], with_snapshot_cadences: list[str])
196    def __init__(
197        self,
198        cadence_snapshot_status: dict,
199        target_database: str,
200        view_name: str,
201        final_cols: str,
202        target_table: str,
203        dimensions_and_metrics_with_alias: str,
204        dimensions: str,
205        dimensions_and_metrics: str,
206        final_calculated_script: str,
207        query_id: str,
208        view_filter: str,
209        final_calculated_script_snapshot: str,
210        without_snapshot_cadences: list[str],
211        with_snapshot_cadences: list[str],
212    ):
213        """Construct GABViewGenerator instances.
214
215        Args:
216            cadence_snapshot_status: each cadence with the corresponding snapshot
217                status.
218            target_database: target database to write.
219            view_name: name of the view to be generated.
220            final_cols: columns to return in the view.
221            target_table: target table to write.
222            dimensions_and_metrics_with_alias: configured dimensions and metrics with
223                alias to compute in the view.
224            dimensions: use case configured dimensions.
225            dimensions_and_metrics: use case configured dimensions and metrics.
226            final_calculated_script: use case calculated metrics.
227            query_id: gab configuration table use case identifier.
228            view_filter: filter to add in the view.
229            final_calculated_script_snapshot: use case calculated metrics with snapshot.
230            without_snapshot_cadences: cadences without snapshot.
231            with_snapshot_cadences: cadences with snapshot.
232        """
233        self.cadence_snapshot_status = cadence_snapshot_status
234        self.target_database = target_database
235        self.result_key = view_name
236        self.final_cols = final_cols
237        self.target_table = target_table
238        self.dimensions_and_metrics_with_alias = dimensions_and_metrics_with_alias
239        self.dimensions = dimensions
240        self.dimensions_and_metrics = dimensions_and_metrics
241        self.final_calculated_script = final_calculated_script
242        self.query_id = query_id
243        self.view_filter = view_filter
244        self.final_calculated_script_snapshot = final_calculated_script_snapshot
245        self.without_snapshot_cadences = without_snapshot_cadences
246        self.with_snapshot_cadences = with_snapshot_cadences

Construct GABViewGenerator instances.

Arguments:
  • cadence_snapshot_status: each cadence with the corresponding snapshot status.
  • target_database: target database to write.
  • view_name: name of the view to be generated.
  • final_cols: columns to return in the view.
  • target_table: target table to write.
  • dimensions_and_metrics_with_alias: configured dimensions and metrics with alias to compute in the view.
  • dimensions: use case configured dimensions.
  • dimensions_and_metrics: use case configured dimensions and metrics.
  • final_calculated_script: use case calculated metrics.
  • query_id: gab configuration table use case identifier.
  • view_filter: filter to add in the view.
  • final_calculated_script_snapshot: use case calculated metrics with snapshot.
  • without_snapshot_cadences: cadences without snapshot.
  • with_snapshot_cadences: cadences with snapshot.
cadence_snapshot_status
target_database
result_key
final_cols
target_table
dimensions_and_metrics_with_alias
dimensions
dimensions_and_metrics
final_calculated_script
query_id
view_filter
final_calculated_script_snapshot
without_snapshot_cadences
with_snapshot_cadences
def generate_sql(*args: Any) -> None:
27    def inner(*args: Any) -> None:
28        generated_sql = func(*args)
29        if generated_sql:
30            ExecEnv.SESSION.sql(generated_sql)

Generate use case view sql statement.

class GABDeleteGenerator(GABSQLGenerator):
430class GABDeleteGenerator(GABSQLGenerator):
431    """GAB delete generator.
432
433    Creates the delete statement to clean the use case base data on the insights table.
434    """
435
436    _LOGGER = LoggingHandler(__name__).get_logger()
437
438    def __init__(
439        self,
440        query_id: str,
441        cadence: str,
442        temp_stage_view_name: str,
443        lookup_query_builder: DataFrame,
444        target_database: str,
445        target_table: str,
446    ):
447        """Construct GABViewGenerator instances.
448
449        Args:
450            query_id: gab configuration table use case identifier.
451            cadence:  inputted cadence to process.
452            temp_stage_view_name: stage view name.
453            lookup_query_builder: gab configuration data.
454            target_database: target database to write.
455            target_table: target table to write.
456        """
457        self.query_id = query_id
458        self.cadence = cadence
459        self.temp_stage_view_name = temp_stage_view_name
460        self.lookup_query_builder = lookup_query_builder
461        self.target_database = target_database
462        self.target_table = target_table
463
464    @_execute_sql
465    def generate_sql(self) -> Optional[str]:
466        """Generate delete sql statement.
467
468        This statement is to clean the insights table for the corresponding use case.
469        """
470        delete_sql_statement = self._delete_statement_generator()
471
472        return delete_sql_statement
473
474    def _delete_statement_generator(self) -> str:
475        df_filtered = self.lookup_query_builder.filter(
476            col("query_id") == lit(self.query_id)
477        )
478
479        df_map = df_filtered.select(col("mappings"))
480        view_df = df_map.select(
481            to_json(struct([df_map[x] for x in df_map.columns]))
482        ).collect()[0][0]
483        line = json.loads(view_df)
484
485        for line_v in line.values():
486            result = ast.literal_eval(line_v)
487
488        for result_key in result.keys():
489            result_new = result[result_key]
490            dim_from_date = result_new["dimensions"]["from_date"]
491            dim_to_date = result_new["dimensions"]["to_date"]
492
493        self._LOGGER.info(f"temp stage view name: {self.temp_stage_view_name}")
494
495        min_from_date = ExecEnv.SESSION.sql(
496            """
497            SELECT
498                MIN({from_date}) as min_from_date
499            FROM {iter_stages}""".format(  # nosec: B608
500                iter_stages=self.temp_stage_view_name, from_date=dim_from_date
501            )
502        ).collect()[0][0]
503        max_from_date = ExecEnv.SESSION.sql(
504            """
505            SELECT
506                MAX({from_date}) as max_from_date
507            FROM {iter_stages}""".format(  # nosec: B608
508                iter_stages=self.temp_stage_view_name, from_date=dim_from_date
509            )
510        ).collect()[0][0]
511
512        min_to_date = ExecEnv.SESSION.sql(
513            """
514            SELECT
515                MIN({to_date}) as min_to_date
516            FROM {iter_stages}""".format(  # nosec: B608
517                iter_stages=self.temp_stage_view_name, to_date=dim_to_date
518            )
519        ).collect()[0][0]
520        max_to_date = ExecEnv.SESSION.sql(
521            """
522            SELECT
523                MAX({to_date}) as max_to_date
524            FROM {iter_stages}""".format(  # nosec: B608
525                iter_stages=self.temp_stage_view_name, to_date=dim_to_date
526            )
527        ).collect()[0][0]
528
529        gen_del = """
530        DELETE FROM {target_database}.{target_table} a
531            WHERE query_id = {query_id}
532            AND cadence = '{cadence}'
533            AND from_date BETWEEN '{min_from_date}' AND '{max_from_date}'
534            AND to_date BETWEEN '{min_to_date}' AND '{max_to_date}'
535        """.format(  # nosec: B608
536            target_database=self.target_database,
537            target_table=self.target_table,
538            query_id=self.query_id,
539            cadence=self.cadence,
540            min_from_date=min_from_date,
541            max_from_date=max_from_date,
542            min_to_date=min_to_date,
543            max_to_date=max_to_date,
544        )
545
546        return gen_del

GAB delete generator.

Creates the delete statement to clean the use case base data on the insights table.

GABDeleteGenerator( query_id: str, cadence: str, temp_stage_view_name: str, lookup_query_builder: pyspark.sql.dataframe.DataFrame, target_database: str, target_table: str)
438    def __init__(
439        self,
440        query_id: str,
441        cadence: str,
442        temp_stage_view_name: str,
443        lookup_query_builder: DataFrame,
444        target_database: str,
445        target_table: str,
446    ):
447        """Construct GABViewGenerator instances.
448
449        Args:
450            query_id: gab configuration table use case identifier.
451            cadence:  inputted cadence to process.
452            temp_stage_view_name: stage view name.
453            lookup_query_builder: gab configuration data.
454            target_database: target database to write.
455            target_table: target table to write.
456        """
457        self.query_id = query_id
458        self.cadence = cadence
459        self.temp_stage_view_name = temp_stage_view_name
460        self.lookup_query_builder = lookup_query_builder
461        self.target_database = target_database
462        self.target_table = target_table

Construct GABViewGenerator instances.

Arguments:
  • query_id: gab configuration table use case identifier.
  • cadence: inputted cadence to process.
  • temp_stage_view_name: stage view name.
  • lookup_query_builder: gab configuration data.
  • target_database: target database to write.
  • target_table: target table to write.
query_id
cadence
temp_stage_view_name
lookup_query_builder
target_database
target_table
def generate_sql(*args: Any) -> None:
27    def inner(*args: Any) -> None:
28        generated_sql = func(*args)
29        if generated_sql:
30            ExecEnv.SESSION.sql(generated_sql)

Generate delete sql statement.

This statement is to clean the insights table for the corresponding use case.