Skip to content

Gab utils

Module to define GAB Utility classes.

GABPartitionUtils

Bases: object

Class to extract a partition based in a date period.

Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
class GABPartitionUtils(object):
    """Class to extract a partition based in a date period."""

    _LOGGER = LoggingHandler(__name__).get_logger()

    @classmethod
    def get_years(cls, start_date: str, end_date: str) -> list[str]:
        """Return a list of distinct years from the input parameters.

        Args:
            start_date: start of the period.
            end_date: end of the period.
        """
        year = []
        if start_date > end_date:
            raise ValueError(
                "Input Error: Invalid start_date and end_date. "
                "Start_date is greater than end_date"
            )

        for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1):
            year.append(str(i))

        return year

    @classmethod
    def get_partition_condition(cls, start_date: str, end_date: str) -> str:
        """Return year,month and day partition statement from the input parameters.

        Args:
            start_date: start of the period.
            end_date: end of the period.
        """
        years = cls.get_years(start_date, end_date)
        if len(years) > 1:
            partition_condition = cls._get_multiple_years_partition(
                start_date, end_date, years
            )
        else:
            partition_condition = cls._get_single_year_partition(start_date, end_date)
        return partition_condition

    @classmethod
    def _get_multiple_years_partition(
        cls, start_date: str, end_date: str, years: list[str]
    ) -> str:
        """Return partition when executing multiple years (>1).

        Args:
            start_date: start of the period.
            end_date: end of the period.
            years: list of years.
        """
        start_date_month = cls._extract_date_part_from_date("MONTH", start_date)
        start_date_day = cls._extract_date_part_from_date("DAY", start_date)

        end_date_month = cls._extract_date_part_from_date("MONTH", end_date)
        end_date_day = cls._extract_date_part_from_date("DAY", end_date)

        year_statement = "(year = {0} and (".format(years[0]) + "{})"
        if start_date_month != "12":
            start_date_partition = year_statement.format(
                "(month = {0} and day between {1} and 31)".format(
                    start_date_month, start_date_day
                )
                + " or (month between {0} and 12)".format(int(start_date_month) + 1)
            )
        else:
            start_date_partition = year_statement.format(
                "month = {0} and day between {1} and 31".format(
                    start_date_month, start_date_day
                )
            )

        period_years_partition = ""

        if len(years) == 3:
            period_years_partition = ") or (year = {0}".format(years[1])
        elif len(years) > 3:
            period_years_partition = ") or (year between {0} and {1})".format(
                years[1], years[-2]
            )

        if end_date_month != "01":
            end_date_partition = (
                ") or (year = {0} and ((month between 01 and {1})".format(
                    years[-1], int(end_date_month) - 1
                )
                + " or (month = {0} and day between 1 and {1})))".format(
                    end_date_month, end_date_day
                )
            )
        else:
            end_date_partition = (
                ") or (year = {0} and month = 1 and day between 01 and {1})".format(
                    years[-1], end_date_day
                )
            )
        partition_condition = (
            start_date_partition + period_years_partition + end_date_partition
        )

        return partition_condition

    @classmethod
    def _get_single_year_partition(cls, start_date: str, end_date: str) -> str:
        """Return partition when executing a single year.

        Args:
            start_date: start of the period.
            end_date: end of the period.
        """
        start_date_year = cls._extract_date_part_from_date("YEAR", start_date)
        start_date_month = cls._extract_date_part_from_date("MONTH", start_date)
        start_date_day = cls._extract_date_part_from_date("DAY", start_date)

        end_date_year = cls._extract_date_part_from_date("YEAR", end_date)
        end_date_month = cls._extract_date_part_from_date("MONTH", end_date)
        end_date_day = cls._extract_date_part_from_date("DAY", end_date)

        if start_date_month != end_date_month:
            months = []
            for i in range(int(start_date_month), int(end_date_month) + 1):
                months.append(i)

            start_date_partition = (
                "year = {0} and ((month={1} and day between {2} and 31)".format(
                    start_date_year, months[0], start_date_day
                )
            )
            period_years_partition = ""
            if len(months) == 2:
                period_years_partition = start_date_partition
            elif len(months) == 3:
                period_years_partition = (
                    start_date_partition + " or (month = {0})".format(months[1])
                )
            elif len(months) > 3:
                period_years_partition = (
                    start_date_partition
                    + " or (month between {0} and {1})".format(months[1], months[-2])
                )
            partition_condition = (
                period_years_partition
                + " or (month = {0} and day between 1 and {1}))".format(
                    end_date_month, end_date_day
                )
            )
        else:
            partition_condition = (
                "year = {0} and month = {1} and day between {2} and {3}".format(
                    end_date_year, end_date_month, start_date_day, end_date_day
                )
            )

        return partition_condition

    @classmethod
    def _extract_date_part_from_date(cls, part: str, date: str) -> str:
        """Extract date part from string date.

        Args:
            part: date part (possible values: DAY, MONTH, YEAR)
            date: string date.
        """
        if "DAY" == part.upper():
            return date[8:10]
        elif "MONTH" == part.upper():
            return date[5:7]
        else:
            return date[0:4]

get_partition_condition(start_date, end_date) classmethod

Return year,month and day partition statement from the input parameters.

Parameters:

Name Type Description Default
start_date str

start of the period.

required
end_date str

end of the period.

required
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
@classmethod
def get_partition_condition(cls, start_date: str, end_date: str) -> str:
    """Return year,month and day partition statement from the input parameters.

    Args:
        start_date: start of the period.
        end_date: end of the period.
    """
    years = cls.get_years(start_date, end_date)
    if len(years) > 1:
        partition_condition = cls._get_multiple_years_partition(
            start_date, end_date, years
        )
    else:
        partition_condition = cls._get_single_year_partition(start_date, end_date)
    return partition_condition

get_years(start_date, end_date) classmethod

Return a list of distinct years from the input parameters.

Parameters:

Name Type Description Default
start_date str

start of the period.

required
end_date str

end of the period.

required
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
@classmethod
def get_years(cls, start_date: str, end_date: str) -> list[str]:
    """Return a list of distinct years from the input parameters.

    Args:
        start_date: start of the period.
        end_date: end of the period.
    """
    year = []
    if start_date > end_date:
        raise ValueError(
            "Input Error: Invalid start_date and end_date. "
            "Start_date is greater than end_date"
        )

    for i in range(int(start_date[0:4]), int(end_date[0:4]) + 1):
        year.append(str(i))

    return year

GABUtils

Bases: object

Class containing utility functions for GAB.

Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
class GABUtils(object):
    """Class containing utility functions for GAB."""

    _LOGGER = LoggingHandler(__name__).get_logger()

    def logger(
        self,
        run_start_time: datetime,
        run_end_time: datetime,
        start: str,
        end: str,
        query_id: str,
        query_label: str,
        cadence: str,
        stage_file_path: str,
        query: str,
        status: str,
        error_message: Union[Exception, str],
        target_database: str,
    ) -> None:
        """Store the execution of each stage in the log events table.

        Args:
            run_start_time: execution start time.
            run_end_time: execution end time.
            start: use case start date.
            end: use case end date.
            query_id: gab configuration table use case identifier.
            query_label: gab configuration table use case name.
            cadence: cadence to process.
            stage_file_path: stage file path.
            query: query to execute.
            status: status of the query execution.
            error_message: error message if present.
            target_database: target database to write.
        """
        ins = """
        INSERT INTO {database}.gab_log_events
        VALUES (
            '{run_start_time}',
            '{run_end_time}',
            '{start}',
            '{end}',
            {query_id},
            '{query_label}',
            '{cadence}',
            '{stage_file_path}',
            '{query}',
            '{status}',
            '{error_message}'
        )""".format(  # nosec: B608
            database=target_database,
            run_start_time=run_start_time,
            run_end_time=run_end_time,
            start=start,
            end=end,
            query_id=query_id,
            query_label=query_label,
            cadence=cadence,
            stage_file_path=stage_file_path,
            query=self._escape_quote(query),
            status=status,
            error_message=(
                self._escape_quote(str(error_message))
                if status == "Failed"
                else error_message
            ),
        )

        ExecEnv.SESSION.sql(ins)

    @classmethod
    def _escape_quote(cls, to_escape: str) -> str:
        """Escape quote on string.

        Args:
            to_escape: string to escape.
        """
        return to_escape.replace("'", r"\'").replace('"', r"\"")

    @classmethod
    def get_json_column_as_dict(
        cls, lookup_query_builder: DataFrame, query_id: str, query_column: str
    ) -> dict:  # type: ignore
        """Get JSON column as dictionary.

        Args:
            lookup_query_builder: gab configuration data.
            query_id: gab configuration table use case identifier.
            query_column: column to get as json.
        """
        column_df = lookup_query_builder.filter(
            col("query_id") == lit(query_id)
        ).select(col(query_column))

        column_df_json = column_df.select(
            to_json(struct([column_df[x] for x in column_df.columns]))
        ).collect()[0][0]

        json_column = json.loads(column_df_json)

        for mapping in json_column.values():
            column_as_json = ast.literal_eval(mapping)

        return column_as_json  # type: ignore

    @classmethod
    def extract_columns_from_mapping(
        cls,
        columns: dict,
        is_dimension: bool,
        extract_column_without_alias: bool = False,
        table_alias: Optional[str] = None,
        is_extracted_value_as_name: bool = True,
    ) -> Union[tuple[list[str], list[str]], list[str]]:
        """Extract and transform columns to SQL select statement.

        Args:
            columns: data to extract the columns.
            is_dimension: flag identifying if is a dimension or a metric.
            extract_column_without_alias: flag to inform if it's to extract columns
                without aliases.
            table_alias: name or alias from the source table.
            is_extracted_value_as_name: identify if the extracted value is the
                column name.
        """
        column_with_alias = (
            "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}"
        )
        column_without_alias = (
            "".join([table_alias, ".", "{}"]) if table_alias else "{}"
        )

        extracted_columns_with_alias = []
        extracted_columns_without_alias = []
        for column_name, column_value in columns.items():
            if extract_column_without_alias:
                extracted_column_without_alias = column_without_alias.format(
                    cls._get_column_format_without_alias(
                        is_dimension,
                        column_name,
                        column_value,
                        is_extracted_value_as_name,
                    )
                )
                extracted_columns_without_alias.append(extracted_column_without_alias)

            extracted_column_with_alias = column_with_alias.format(
                *cls._extract_column_with_alias(
                    is_dimension,
                    column_name,
                    column_value,
                    is_extracted_value_as_name,
                )
            )
            extracted_columns_with_alias.append(extracted_column_with_alias)

        return (
            (extracted_columns_with_alias, extracted_columns_without_alias)
            if extract_column_without_alias
            else extracted_columns_with_alias
        )

    @classmethod
    def _extract_column_with_alias(
        cls,
        is_dimension: bool,
        column_name: str,
        column_value: Union[str, dict],
        is_extracted_value_as_name: bool = True,
    ) -> tuple[str, str]:
        """Extract column name with alias.

        Args:
            is_dimension: flag indicating if the column is a dimension.
            column_name: name of the column.
            column_value: value of the column.
            is_extracted_value_as_name: flag indicating if the name of the column is the
                extracted value.
        """
        extracted_value = (
            column_value
            if is_dimension
            else (column_value["metric_name"])  # type: ignore
        )

        return (
            (extracted_value, column_name)  # type: ignore
            if is_extracted_value_as_name
            else (column_name, extracted_value)
        )

    @classmethod
    def _get_column_format_without_alias(
        cls,
        is_dimension: bool,
        column_name: str,
        column_value: Union[str, dict],
        is_extracted_value_as_name: bool = True,
    ) -> str:
        """Extract column name without alias.

        Args:
            is_dimension: flag indicating if the column is a dimension.
            column_name: name of the column.
            column_value: value of the column.
            is_extracted_value_as_name: flag indicating if the name of the column is the
                extracted value.
        """
        extracted_value: str = (
            column_value
            if is_dimension
            else (column_value["metric_name"])  # type: ignore
        )

        return extracted_value if is_extracted_value_as_name else column_name

    @classmethod
    def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict:
        """A dictionary that corresponds to the conclusion of a cadence.

        Any end date inputted by the user we check this end date is actually end of
            a cadence (YEAR, QUARTER, MONTH, WEEK).
        If the user input is 2024-03-31 this is a month end and a quarter end that
            means any use cases configured as month or quarter need to be calculated.

        Args:
            end_date: base end date.
        """
        init_end_date_dict = {}

        expected_end_cadence_date = pendulum.datetime(
            int(end_date.strftime("%Y")),
            int(end_date.strftime("%m")),
            int(end_date.strftime("%d")),
        ).replace(tzinfo=None)

        # Validating YEAR cadence
        if end_date == expected_end_cadence_date.last_of("year"):
            init_end_date_dict["YEAR"] = "N"

        # Validating QUARTER cadence
        if end_date == expected_end_cadence_date.last_of("quarter"):
            init_end_date_dict["QUARTER"] = "N"

        # Validating MONTH cadence
        if end_date == datetime(
            int(end_date.strftime("%Y")),
            int(end_date.strftime("%m")),
            calendar.monthrange(
                int(end_date.strftime("%Y")), int(end_date.strftime("%m"))
            )[1],
        ):
            init_end_date_dict["MONTH"] = "N"

        # Validating WEEK cadence
        if end_date == expected_end_cadence_date.end_of("week").replace(
            hour=0, minute=0, second=0, microsecond=0
        ):
            init_end_date_dict["WEEK"] = "N"

        init_end_date_dict["DAY"] = "N"

        return init_end_date_dict

    def get_reconciliation_cadences(
        self,
        cadence: str,
        selected_reconciliation_window: dict,
        cadence_configuration_at_end_date: dict,
        rerun_flag: str,
    ) -> dict:
        """Get reconciliation cadences based on the use case configuration.

        Args:
            cadence: cadence to process.
            selected_reconciliation_window: configured use case reconciliation window.
            cadence_configuration_at_end_date: cadences to execute at the end date.
            rerun_flag: flag indicating if it's a rerun or a normal run.
        """
        configured_cadences = self._get_configured_cadences_by_snapshot(
            cadence, selected_reconciliation_window, cadence_configuration_at_end_date
        )

        return self._get_cadences_to_execute(
            configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag
        )

    @classmethod
    def _get_cadences_to_execute(
        cls,
        configured_cadences: dict,
        cadence: str,
        cadence_configuration_at_end_date: dict,
        rerun_flag: str,
    ) -> dict:
        """Get cadences to execute.

        Args:
            cadence: cadence to process.
            configured_cadences: configured use case reconciliation window.
            cadence_configuration_at_end_date: cadences to execute at the end date.
            rerun_flag: flag indicating if it's a rerun or a normal run.
        """
        cadences_to_execute = {}
        cad_order = GABCadence.get_ordered_cadences()

        for snapshot_cadence, snapshot_flag in configured_cadences.items():
            if (
                (cad_order[cadence] > cad_order[snapshot_cadence])
                and (rerun_flag == "Y")
            ) or snapshot_cadence in cadence_configuration_at_end_date:
                cadences_to_execute[snapshot_cadence] = snapshot_flag
            elif snapshot_cadence not in cadence_configuration_at_end_date:
                continue

        return cls._sort_cadences_to_execute(cadences_to_execute, cad_order)

    @classmethod
    def _sort_cadences_to_execute(
        cls, cadences_to_execute: dict, cad_order: dict
    ) -> dict:
        """Sort the cadences to execute.

        Args:
            cadences_to_execute: cadences to execute.
            cad_order: all cadences with order.
        """
        # ordering it because when grouping cadences with snapshot and without snapshot
        # can impact the cadence ordering.
        sorted_cadences_to_execute: dict = dict(
            sorted(
                cadences_to_execute.items(),
                key=lambda item: cad_order.get(item[0]),  # type: ignore
            )
        )
        # ordering cadences to execute it from bigger (YEAR) to smaller (DAY)
        cadences_to_execute_items = []

        for cadence_name, cadence_value in sorted_cadences_to_execute.items():
            cadences_to_execute_items.append((cadence_name, cadence_value))

        cadences_sorted_by_bigger_cadence_to_execute: dict = dict(
            reversed(cadences_to_execute_items)
        )

        return cadences_sorted_by_bigger_cadence_to_execute

    @classmethod
    def _get_configured_cadences_by_snapshot(
        cls,
        cadence: str,
        selected_reconciliation_window: dict,
        cadence_configuration_at_end_date: dict,
    ) -> dict:
        """Get configured cadences to execute.

        Args:
            cadence: selected cadence.
            selected_reconciliation_window: configured use case reconciliation window.
            cadence_configuration_at_end_date: cadences to execute at the end date.

        Returns:
            Each cadence with the corresponding information if it's to execute with
                snapshot or not.
        """
        cadences_by_snapshot = {}

        (
            no_snapshot_cadences,
            snapshot_cadences,
        ) = cls._generate_reconciliation_by_snapshot(
            cadence, selected_reconciliation_window
        )

        for snapshot_cadence, snapshot_flag in no_snapshot_cadences.items():
            if snapshot_cadence in cadence_configuration_at_end_date:
                cadences_by_snapshot[snapshot_cadence] = snapshot_flag

                cls._LOGGER.info(f"{snapshot_cadence} is present in {cadence} cadence")
                break

        cadences_by_snapshot.update(snapshot_cadences)

        if (not cadences_by_snapshot) and (
            cadence in cadence_configuration_at_end_date
        ):
            cadences_by_snapshot[cadence] = "N"

        return cadences_by_snapshot

    @classmethod
    def _generate_reconciliation_by_snapshot(
        cls, cadence: str, selected_reconciliation_window: dict
    ) -> tuple[dict, dict]:
        """Generate reconciliation by snapshot.

        Args:
            cadence: cadence to process.
            selected_reconciliation_window: configured use case reconciliation window.
        """
        cadence_snapshot_configuration = {cadence: "N"}
        for cadence in GABCadence.get_cadences():
            cls._add_cadence_snapshot_to_cadence_snapshot_config(
                cadence, selected_reconciliation_window, cadence_snapshot_configuration
            )
        cadence_snapshot_configuration = dict(
            sorted(
                cadence_snapshot_configuration.items(),
                key=(
                    lambda item: GABCadence.get_ordered_cadences().get(  # type: ignore
                        item[0]
                    )
                ),
            )
        )

        cadence_snapshot_configuration = dict(
            reversed(list(cadence_snapshot_configuration.items()))
        )

        cadences_without_snapshot = {
            key: value
            for key, value in cadence_snapshot_configuration.items()
            if value == "N"
        }

        cadences_with_snapshot = {
            key: value
            for key, value in cadence_snapshot_configuration.items()
            if value == "Y"
        }

        return cadences_with_snapshot, cadences_without_snapshot

    @classmethod
    def _add_cadence_snapshot_to_cadence_snapshot_config(
        cls,
        cadence: str,
        selected_reconciliation_window: dict,
        cadence_snapshot_configuration: dict,
    ) -> None:
        """Add the selected reconciliation to cadence snapshot configuration.

        Args:
            cadence: selected cadence.
            selected_reconciliation_window:  configured use case reconciliation window.
            cadence_snapshot_configuration: cadence snapshot configuration dictionary
                who will be updated with the new value.
        """
        if cadence in selected_reconciliation_window:
            cadence_snapshot_configuration[cadence] = selected_reconciliation_window[
                cadence
            ]["snapshot"]

    @classmethod
    def format_datetime_to_default(cls, date_to_format: datetime) -> str:
        """Format datetime to GAB default format.

        Args:
            date_to_format: date to format.
        """
        return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value)

extract_columns_from_mapping(columns, is_dimension, extract_column_without_alias=False, table_alias=None, is_extracted_value_as_name=True) classmethod

Extract and transform columns to SQL select statement.

Parameters:

Name Type Description Default
columns dict

data to extract the columns.

required
is_dimension bool

flag identifying if is a dimension or a metric.

required
extract_column_without_alias bool

flag to inform if it's to extract columns without aliases.

False
table_alias Optional[str]

name or alias from the source table.

None
is_extracted_value_as_name bool

identify if the extracted value is the column name.

True
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
@classmethod
def extract_columns_from_mapping(
    cls,
    columns: dict,
    is_dimension: bool,
    extract_column_without_alias: bool = False,
    table_alias: Optional[str] = None,
    is_extracted_value_as_name: bool = True,
) -> Union[tuple[list[str], list[str]], list[str]]:
    """Extract and transform columns to SQL select statement.

    Args:
        columns: data to extract the columns.
        is_dimension: flag identifying if is a dimension or a metric.
        extract_column_without_alias: flag to inform if it's to extract columns
            without aliases.
        table_alias: name or alias from the source table.
        is_extracted_value_as_name: identify if the extracted value is the
            column name.
    """
    column_with_alias = (
        "".join([table_alias, ".", "{} as {}"]) if table_alias else "{} as {}"
    )
    column_without_alias = (
        "".join([table_alias, ".", "{}"]) if table_alias else "{}"
    )

    extracted_columns_with_alias = []
    extracted_columns_without_alias = []
    for column_name, column_value in columns.items():
        if extract_column_without_alias:
            extracted_column_without_alias = column_without_alias.format(
                cls._get_column_format_without_alias(
                    is_dimension,
                    column_name,
                    column_value,
                    is_extracted_value_as_name,
                )
            )
            extracted_columns_without_alias.append(extracted_column_without_alias)

        extracted_column_with_alias = column_with_alias.format(
            *cls._extract_column_with_alias(
                is_dimension,
                column_name,
                column_value,
                is_extracted_value_as_name,
            )
        )
        extracted_columns_with_alias.append(extracted_column_with_alias)

    return (
        (extracted_columns_with_alias, extracted_columns_without_alias)
        if extract_column_without_alias
        else extracted_columns_with_alias
    )

format_datetime_to_default(date_to_format) classmethod

Format datetime to GAB default format.

Parameters:

Name Type Description Default
date_to_format datetime

date to format.

required
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
@classmethod
def format_datetime_to_default(cls, date_to_format: datetime) -> str:
    """Format datetime to GAB default format.

    Args:
        date_to_format: date to format.
    """
    return datetime.date(date_to_format).strftime(GABDefaults.DATE_FORMAT.value)

get_cadence_configuration_at_end_date(end_date) classmethod

A dictionary that corresponds to the conclusion of a cadence.

Any end date inputted by the user we check this end date is actually end of a cadence (YEAR, QUARTER, MONTH, WEEK). If the user input is 2024-03-31 this is a month end and a quarter end that means any use cases configured as month or quarter need to be calculated.

Parameters:

Name Type Description Default
end_date datetime

base end date.

required
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
@classmethod
def get_cadence_configuration_at_end_date(cls, end_date: datetime) -> dict:
    """A dictionary that corresponds to the conclusion of a cadence.

    Any end date inputted by the user we check this end date is actually end of
        a cadence (YEAR, QUARTER, MONTH, WEEK).
    If the user input is 2024-03-31 this is a month end and a quarter end that
        means any use cases configured as month or quarter need to be calculated.

    Args:
        end_date: base end date.
    """
    init_end_date_dict = {}

    expected_end_cadence_date = pendulum.datetime(
        int(end_date.strftime("%Y")),
        int(end_date.strftime("%m")),
        int(end_date.strftime("%d")),
    ).replace(tzinfo=None)

    # Validating YEAR cadence
    if end_date == expected_end_cadence_date.last_of("year"):
        init_end_date_dict["YEAR"] = "N"

    # Validating QUARTER cadence
    if end_date == expected_end_cadence_date.last_of("quarter"):
        init_end_date_dict["QUARTER"] = "N"

    # Validating MONTH cadence
    if end_date == datetime(
        int(end_date.strftime("%Y")),
        int(end_date.strftime("%m")),
        calendar.monthrange(
            int(end_date.strftime("%Y")), int(end_date.strftime("%m"))
        )[1],
    ):
        init_end_date_dict["MONTH"] = "N"

    # Validating WEEK cadence
    if end_date == expected_end_cadence_date.end_of("week").replace(
        hour=0, minute=0, second=0, microsecond=0
    ):
        init_end_date_dict["WEEK"] = "N"

    init_end_date_dict["DAY"] = "N"

    return init_end_date_dict

get_json_column_as_dict(lookup_query_builder, query_id, query_column) classmethod

Get JSON column as dictionary.

Parameters:

Name Type Description Default
lookup_query_builder DataFrame

gab configuration data.

required
query_id str

gab configuration table use case identifier.

required
query_column str

column to get as json.

required
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
@classmethod
def get_json_column_as_dict(
    cls, lookup_query_builder: DataFrame, query_id: str, query_column: str
) -> dict:  # type: ignore
    """Get JSON column as dictionary.

    Args:
        lookup_query_builder: gab configuration data.
        query_id: gab configuration table use case identifier.
        query_column: column to get as json.
    """
    column_df = lookup_query_builder.filter(
        col("query_id") == lit(query_id)
    ).select(col(query_column))

    column_df_json = column_df.select(
        to_json(struct([column_df[x] for x in column_df.columns]))
    ).collect()[0][0]

    json_column = json.loads(column_df_json)

    for mapping in json_column.values():
        column_as_json = ast.literal_eval(mapping)

    return column_as_json  # type: ignore

get_reconciliation_cadences(cadence, selected_reconciliation_window, cadence_configuration_at_end_date, rerun_flag)

Get reconciliation cadences based on the use case configuration.

Parameters:

Name Type Description Default
cadence str

cadence to process.

required
selected_reconciliation_window dict

configured use case reconciliation window.

required
cadence_configuration_at_end_date dict

cadences to execute at the end date.

required
rerun_flag str

flag indicating if it's a rerun or a normal run.

required
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
def get_reconciliation_cadences(
    self,
    cadence: str,
    selected_reconciliation_window: dict,
    cadence_configuration_at_end_date: dict,
    rerun_flag: str,
) -> dict:
    """Get reconciliation cadences based on the use case configuration.

    Args:
        cadence: cadence to process.
        selected_reconciliation_window: configured use case reconciliation window.
        cadence_configuration_at_end_date: cadences to execute at the end date.
        rerun_flag: flag indicating if it's a rerun or a normal run.
    """
    configured_cadences = self._get_configured_cadences_by_snapshot(
        cadence, selected_reconciliation_window, cadence_configuration_at_end_date
    )

    return self._get_cadences_to_execute(
        configured_cadences, cadence, cadence_configuration_at_end_date, rerun_flag
    )

logger(run_start_time, run_end_time, start, end, query_id, query_label, cadence, stage_file_path, query, status, error_message, target_database)

Store the execution of each stage in the log events table.

Parameters:

Name Type Description Default
run_start_time datetime

execution start time.

required
run_end_time datetime

execution end time.

required
start str

use case start date.

required
end str

use case end date.

required
query_id str

gab configuration table use case identifier.

required
query_label str

gab configuration table use case name.

required
cadence str

cadence to process.

required
stage_file_path str

stage file path.

required
query str

query to execute.

required
status str

status of the query execution.

required
error_message Union[Exception, str]

error message if present.

required
target_database str

target database to write.

required
Source code in mkdocs/lakehouse_engine/packages/utils/gab_utils.py
def logger(
    self,
    run_start_time: datetime,
    run_end_time: datetime,
    start: str,
    end: str,
    query_id: str,
    query_label: str,
    cadence: str,
    stage_file_path: str,
    query: str,
    status: str,
    error_message: Union[Exception, str],
    target_database: str,
) -> None:
    """Store the execution of each stage in the log events table.

    Args:
        run_start_time: execution start time.
        run_end_time: execution end time.
        start: use case start date.
        end: use case end date.
        query_id: gab configuration table use case identifier.
        query_label: gab configuration table use case name.
        cadence: cadence to process.
        stage_file_path: stage file path.
        query: query to execute.
        status: status of the query execution.
        error_message: error message if present.
        target_database: target database to write.
    """
    ins = """
    INSERT INTO {database}.gab_log_events
    VALUES (
        '{run_start_time}',
        '{run_end_time}',
        '{start}',
        '{end}',
        {query_id},
        '{query_label}',
        '{cadence}',
        '{stage_file_path}',
        '{query}',
        '{status}',
        '{error_message}'
    )""".format(  # nosec: B608
        database=target_database,
        run_start_time=run_start_time,
        run_end_time=run_end_time,
        start=start,
        end=end,
        query_id=query_id,
        query_label=query_label,
        cadence=cadence,
        stage_file_path=stage_file_path,
        query=self._escape_quote(query),
        status=status,
        error_message=(
            self._escape_quote(str(error_message))
            if status == "Failed"
            else error_message
        ),
    )

    ExecEnv.SESSION.sql(ins)