lakehouse_engine.core.gab_sql_generator
Module to define GAB SQL classes.
1"""Module to define GAB SQL classes.""" 2 3import ast 4import json 5from abc import ABC, abstractmethod 6from typing import Any, Callable, Optional 7 8from pyspark.sql import DataFrame 9from pyspark.sql.functions import col, lit, struct, to_json 10 11from lakehouse_engine.core.exec_env import ExecEnv 12from lakehouse_engine.utils.gab_utils import GABUtils 13from lakehouse_engine.utils.logging_handler import LoggingHandler 14 15 16def _execute_sql(func) -> Callable: # type: ignore 17 """Execute the SQL resulting from the function. 18 19 This function is protected to be used just in this module. 20 It's used to decorate functions that returns a SQL statement. 21 22 Args: 23 func: function that will return the sql to execute 24 """ 25 26 def inner(*args: Any) -> None: 27 generated_sql = func(*args) 28 if generated_sql: 29 ExecEnv.SESSION.sql(generated_sql) 30 31 return inner 32 33 34class GABSQLGenerator(ABC): 35 """Abstract class defining the behaviour of a GAB SQL Generator.""" 36 37 @abstractmethod 38 def generate_sql(self) -> Optional[str]: 39 """Define the generate sql command. 40 41 E.g., the behaviour of gab generate sql inheriting from this. 42 """ 43 pass 44 45 46class GABInsertGenerator(GABSQLGenerator): 47 """GAB insert generator. 48 49 Creates the insert statement based on the dimensions and metrics provided in 50 the configuration table. 51 """ 52 53 _LOGGER = LoggingHandler(__name__).get_logger() 54 55 def __init__( 56 self, 57 query_id: str, 58 cadence: str, 59 final_stage_table: str, 60 lookup_query_builder: DataFrame, 61 target_database: str, 62 target_table: str, 63 ): 64 """Construct GABInsertGenerator instances. 65 66 Args: 67 query_id: gab configuration table use case identifier. 68 cadence: inputted cadence to process. 69 final_stage_table: stage view name. 70 lookup_query_builder: gab configuration data. 71 target_database: target database to write. 72 target_table: target table to write. 73 """ 74 self.query_id = query_id 75 self.cadence = cadence 76 self.final_stage_table = final_stage_table 77 self.lookup_query_builder = lookup_query_builder 78 self.target_database = target_database 79 self.target_table = target_table 80 81 def generate_sql(self) -> Optional[str]: 82 """Generate insert sql statement to the insights table.""" 83 insert_sql_statement = self._insert_statement_generator() 84 85 return insert_sql_statement 86 87 def _insert_statement_generator(self) -> str: 88 """Generate GAB insert statement. 89 90 Creates the insert statement based on the dimensions and metrics provided in 91 the configuration table. 92 """ 93 result = GABUtils.get_json_column_as_dict( 94 self.lookup_query_builder, self.query_id, "mappings" 95 ) 96 97 for result_key in result.keys(): 98 joined_dimensions, joined_metrics = self._get_mapping_columns( 99 mapping=result[result_key] 100 ) 101 gen_ins = f""" 102 INSERT INTO {self.target_database}.{self.target_table} 103 SELECT 104 {self.query_id} as query_id, 105 '{self.cadence}' as cadence, 106 {joined_dimensions}, 107 {joined_metrics}, 108 current_timestamp() as lh_created_on 109 FROM {self.final_stage_table} 110 """ # nosec: B608 111 112 return gen_ins 113 114 @classmethod 115 def _get_mapping_columns(cls, mapping: dict) -> tuple[str, str]: 116 """Get mapping columns(dimensions and metrics) as joined string. 117 118 Args: 119 mapping: use case mappings configuration. 120 """ 121 dimensions_mapping = mapping["dimensions"] 122 metrics_mapping = mapping["metric"] 123 124 joined_dimensions = cls._join_extracted_column_with_filled_columns( 125 columns=dimensions_mapping, is_dimension=True 126 ) 127 joined_metrics = cls._join_extracted_column_with_filled_columns( 128 columns=metrics_mapping, is_dimension=False 129 ) 130 131 return joined_dimensions, joined_metrics 132 133 @classmethod 134 def _join_extracted_column_with_filled_columns( 135 cls, columns: dict, is_dimension: bool 136 ) -> str: 137 """Join extracted columns with empty filled columns. 138 139 Args: 140 columns: use case columns and values. 141 is_dimension: flag identifying if is a dimension or a metric. 142 """ 143 extracted_columns_with_alias = ( 144 GABUtils.extract_columns_from_mapping( # type: ignore 145 columns=columns, is_dimension=is_dimension 146 ) 147 ) 148 149 filled_columns = cls._fill_empty_columns( 150 extracted_columns=extracted_columns_with_alias, # type: ignore 151 is_dimension=is_dimension, 152 ) 153 154 joined_columns = [*extracted_columns_with_alias, *filled_columns] 155 156 return ",".join(joined_columns) 157 158 @classmethod 159 def _fill_empty_columns( 160 cls, extracted_columns: list[str], is_dimension: bool 161 ) -> list[str]: 162 """Fill empty columns as null. 163 164 As the data is expected to have 40 columns we have to fill the unused columns. 165 166 Args: 167 extracted_columns: use case extracted columns. 168 is_dimension: flag identifying if is a dimension or a metric. 169 """ 170 filled_columns = [] 171 172 for ins in range( 173 ( 174 len(extracted_columns) - 1 175 if is_dimension 176 else len(extracted_columns) + 1 177 ), 178 41, 179 ): 180 filled_columns.append( 181 " null as {}{}".format("d" if is_dimension else "m", ins) 182 ) 183 184 return filled_columns 185 186 187class GABViewGenerator(GABSQLGenerator): 188 """GAB view generator. 189 190 Creates the use case view statement to be consumed. 191 """ 192 193 _LOGGER = LoggingHandler(__name__).get_logger() 194 195 def __init__( 196 self, 197 cadence_snapshot_status: dict, 198 target_database: str, 199 view_name: str, 200 final_cols: str, 201 target_table: str, 202 dimensions_and_metrics_with_alias: str, 203 dimensions: str, 204 dimensions_and_metrics: str, 205 final_calculated_script: str, 206 query_id: str, 207 view_filter: str, 208 final_calculated_script_snapshot: str, 209 without_snapshot_cadences: list[str], 210 with_snapshot_cadences: list[str], 211 ): 212 """Construct GABViewGenerator instances. 213 214 Args: 215 cadence_snapshot_status: each cadence with the corresponding snapshot 216 status. 217 target_database: target database to write. 218 view_name: name of the view to be generated. 219 final_cols: columns to return in the view. 220 target_table: target table to write. 221 dimensions_and_metrics_with_alias: configured dimensions and metrics with 222 alias to compute in the view. 223 dimensions: use case configured dimensions. 224 dimensions_and_metrics: use case configured dimensions and metrics. 225 final_calculated_script: use case calculated metrics. 226 query_id: gab configuration table use case identifier. 227 view_filter: filter to add in the view. 228 final_calculated_script_snapshot: use case calculated metrics with snapshot. 229 without_snapshot_cadences: cadences without snapshot. 230 with_snapshot_cadences: cadences with snapshot. 231 """ 232 self.cadence_snapshot_status = cadence_snapshot_status 233 self.target_database = target_database 234 self.result_key = view_name 235 self.final_cols = final_cols 236 self.target_table = target_table 237 self.dimensions_and_metrics_with_alias = dimensions_and_metrics_with_alias 238 self.dimensions = dimensions 239 self.dimensions_and_metrics = dimensions_and_metrics 240 self.final_calculated_script = final_calculated_script 241 self.query_id = query_id 242 self.view_filter = view_filter 243 self.final_calculated_script_snapshot = final_calculated_script_snapshot 244 self.without_snapshot_cadences = without_snapshot_cadences 245 self.with_snapshot_cadences = with_snapshot_cadences 246 247 @_execute_sql 248 def generate_sql(self) -> Optional[str]: 249 """Generate use case view sql statement.""" 250 consumption_view_sql = self._create_consumption_view() 251 252 return consumption_view_sql 253 254 def _create_consumption_view(self) -> str: 255 """Create consumption view.""" 256 final_view_query = self._generate_consumption_view_statement( 257 self.cadence_snapshot_status, 258 self.target_database, 259 self.final_cols, 260 self.target_table, 261 self.dimensions_and_metrics_with_alias, 262 self.dimensions, 263 self.dimensions_and_metrics, 264 self.final_calculated_script, 265 self.query_id, 266 self.view_filter, 267 self.final_calculated_script_snapshot, 268 without_snapshot_cadences=",".join( 269 f'"{w}"' for w in self.without_snapshot_cadences 270 ), 271 with_snapshot_cadences=",".join( 272 f'"{w}"' for w in self.with_snapshot_cadences 273 ), 274 ) 275 276 rendered_query = """ 277 CREATE OR REPLACE VIEW {database}.{view_name} AS {final_view_query} 278 """.format( 279 database=self.target_database, 280 view_name=self.result_key, 281 final_view_query=final_view_query, 282 ) 283 self._LOGGER.info(f"Consumption view statement: {rendered_query}") 284 return rendered_query 285 286 @classmethod 287 def _generate_consumption_view_statement( 288 cls, 289 cadence_snapshot_status: dict, 290 target_database: str, 291 final_cols: str, 292 target_table: str, 293 dimensions_and_metrics_with_alias: str, 294 dimensions: str, 295 dimensions_and_metrics: str, 296 final_calculated_script: str, 297 query_id: str, 298 view_filter: str, 299 final_calculated_script_snapshot: str, 300 without_snapshot_cadences: str, 301 with_snapshot_cadences: str, 302 ) -> str: 303 """Generate consumption view. 304 305 Args: 306 cadence_snapshot_status: cadences to execute with the information if it has 307 snapshot. 308 target_database: target database to write. 309 final_cols: use case columns exposed in the consumption view. 310 target_table: target table to write. 311 dimensions_and_metrics_with_alias: dimensions and metrics as string columns 312 with alias. 313 dimensions: dimensions as string columns. 314 dimensions_and_metrics: dimensions and metrics as string columns 315 without alias. 316 final_calculated_script: final calculated metrics script. 317 query_id: gab configuration table use case identifier. 318 view_filter: filter to execute on the view. 319 final_calculated_script_snapshot: final calculated metrics with snapshot 320 script. 321 without_snapshot_cadences: cadences without snapshot. 322 with_snapshot_cadences: cadences with snapshot. 323 """ 324 cls._LOGGER.info("Generating consumption view statement...") 325 cls._LOGGER.info( 326 f""" 327 {{ 328 target_database: {target_database}, 329 target_table: {target_table}, 330 query_id: {query_id}, 331 cadence_and_snapshot_status: {cadence_snapshot_status}, 332 cadences_without_snapshot: [{without_snapshot_cadences}], 333 cadences_with_snapshot: [{with_snapshot_cadences}], 334 final_cols: {final_cols}, 335 dimensions_and_metrics_with_alias: {dimensions_and_metrics_with_alias}, 336 dimensions: {dimensions}, 337 dimensions_with_metrics: {dimensions_and_metrics}, 338 final_calculated_script: {final_calculated_script}, 339 final_calculated_script_snapshot: {final_calculated_script_snapshot}, 340 view_filter: {view_filter} 341 }}""" 342 ) 343 if ( 344 "Y" in cadence_snapshot_status.values() 345 and "N" in cadence_snapshot_status.values() 346 ): 347 consumption_view_query = f""" 348 WITH TEMP1 AS ( 349 SELECT 350 a.cadence, 351 {dimensions_and_metrics_with_alias}{final_calculated_script} 352 FROM {target_database}.{target_table} a 353 WHERE a.query_id = {query_id} 354 AND cadence IN ({without_snapshot_cadences}) 355 {view_filter} 356 ), 357 TEMP_RN AS ( 358 SELECT 359 a.cadence, 360 a.from_date, 361 a.to_date, 362 {dimensions_and_metrics}, 363 row_number() over( 364 PARTITION BY 365 a.cadence, 366 {dimensions}, 367 a.from_date 368 order by to_date 369 ) as rn 370 FROM {target_database}.{target_table} a 371 WHERE a.query_id = {query_id} 372 AND cadence IN ({with_snapshot_cadences}) 373 {view_filter} 374 ), 375 TEMP2 AS ( 376 SELECT 377 a.cadence, 378 {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot} 379 FROM TEMP_RN a 380 ), 381 TEMP3 AS (SELECT * FROM TEMP1 UNION SELECT * from TEMP2) 382 SELECT {final_cols} FROM TEMP3 383 """ # nosec: B608 384 elif "N" in cadence_snapshot_status.values(): 385 consumption_view_query = f""" 386 WITH TEMP1 AS ( 387 SELECT 388 a.cadence, 389 {dimensions_and_metrics_with_alias}{final_calculated_script} 390 FROM {target_database}.{target_table} a 391 WHERE a.query_id = {query_id} 392 AND cadence IN ({without_snapshot_cadences}) {view_filter} 393 ) 394 SELECT {final_cols} FROM TEMP1 395 """ # nosec: B608 396 else: 397 consumption_view_query = f""" 398 WITH TEMP_RN AS ( 399 SELECT 400 a.cadence, 401 a.from_date, 402 a.to_date, 403 {dimensions_and_metrics}, 404 row_number() over( 405 PARTITION BY 406 a.cadence, 407 a.from_date, 408 a.to_date, 409 {dimensions}, 410 a.from_date 411 order by to_date) as rn 412 FROM {target_database}.{target_table} a 413 WHERE a.query_id = {query_id} 414 AND cadence IN ({with_snapshot_cadences}) 415 {view_filter} 416 ), 417 TEMP2 AS ( 418 SELECT 419 a.cadence, 420 {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot} 421 FROM TEMP_RN a 422 ) 423 SELECT {final_cols} FROM TEMP2 424 """ # nosec: B608 425 426 return consumption_view_query 427 428 429class GABDeleteGenerator(GABSQLGenerator): 430 """GAB delete generator. 431 432 Creates the delete statement to clean the use case base data on the insights table. 433 """ 434 435 _LOGGER = LoggingHandler(__name__).get_logger() 436 437 def __init__( 438 self, 439 query_id: str, 440 cadence: str, 441 temp_stage_view_name: str, 442 lookup_query_builder: DataFrame, 443 target_database: str, 444 target_table: str, 445 ): 446 """Construct GABViewGenerator instances. 447 448 Args: 449 query_id: gab configuration table use case identifier. 450 cadence: inputted cadence to process. 451 temp_stage_view_name: stage view name. 452 lookup_query_builder: gab configuration data. 453 target_database: target database to write. 454 target_table: target table to write. 455 """ 456 self.query_id = query_id 457 self.cadence = cadence 458 self.temp_stage_view_name = temp_stage_view_name 459 self.lookup_query_builder = lookup_query_builder 460 self.target_database = target_database 461 self.target_table = target_table 462 463 @_execute_sql 464 def generate_sql(self) -> Optional[str]: 465 """Generate delete sql statement. 466 467 This statement is to clean the insights table for the corresponding use case. 468 """ 469 delete_sql_statement = self._delete_statement_generator() 470 471 return delete_sql_statement 472 473 def _delete_statement_generator(self) -> str: 474 df_filtered = self.lookup_query_builder.filter( 475 col("query_id") == lit(self.query_id) 476 ) 477 478 df_map = df_filtered.select(col("mappings")) 479 view_df = df_map.select( 480 to_json(struct([df_map[x] for x in df_map.columns])) 481 ).collect()[0][0] 482 line = json.loads(view_df) 483 484 for line_v in line.values(): 485 result = ast.literal_eval(line_v) 486 487 for result_key in result.keys(): 488 result_new = result[result_key] 489 dim_from_date = result_new["dimensions"]["from_date"] 490 dim_to_date = result_new["dimensions"]["to_date"] 491 492 self._LOGGER.info(f"temp stage view name: {self.temp_stage_view_name}") 493 494 min_from_date = ExecEnv.SESSION.sql( 495 """ 496 SELECT 497 MIN({from_date}) as min_from_date 498 FROM {iter_stages}""".format( # nosec: B608 499 iter_stages=self.temp_stage_view_name, from_date=dim_from_date 500 ) 501 ).collect()[0][0] 502 max_from_date = ExecEnv.SESSION.sql( 503 """ 504 SELECT 505 MAX({from_date}) as max_from_date 506 FROM {iter_stages}""".format( # nosec: B608 507 iter_stages=self.temp_stage_view_name, from_date=dim_from_date 508 ) 509 ).collect()[0][0] 510 511 min_to_date = ExecEnv.SESSION.sql( 512 """ 513 SELECT 514 MIN({to_date}) as min_to_date 515 FROM {iter_stages}""".format( # nosec: B608 516 iter_stages=self.temp_stage_view_name, to_date=dim_to_date 517 ) 518 ).collect()[0][0] 519 max_to_date = ExecEnv.SESSION.sql( 520 """ 521 SELECT 522 MAX({to_date}) as max_to_date 523 FROM {iter_stages}""".format( # nosec: B608 524 iter_stages=self.temp_stage_view_name, to_date=dim_to_date 525 ) 526 ).collect()[0][0] 527 528 gen_del = """ 529 DELETE FROM {target_database}.{target_table} a 530 WHERE query_id = {query_id} 531 AND cadence = '{cadence}' 532 AND from_date BETWEEN '{min_from_date}' AND '{max_from_date}' 533 AND to_date BETWEEN '{min_to_date}' AND '{max_to_date}' 534 """.format( # nosec: B608 535 target_database=self.target_database, 536 target_table=self.target_table, 537 query_id=self.query_id, 538 cadence=self.cadence, 539 min_from_date=min_from_date, 540 max_from_date=max_from_date, 541 min_to_date=min_to_date, 542 max_to_date=max_to_date, 543 ) 544 545 return gen_del
class
GABSQLGenerator(abc.ABC):
35class GABSQLGenerator(ABC): 36 """Abstract class defining the behaviour of a GAB SQL Generator.""" 37 38 @abstractmethod 39 def generate_sql(self) -> Optional[str]: 40 """Define the generate sql command. 41 42 E.g., the behaviour of gab generate sql inheriting from this. 43 """ 44 pass
Abstract class defining the behaviour of a GAB SQL Generator.
@abstractmethod
def
generate_sql(self) -> Optional[str]:
38 @abstractmethod 39 def generate_sql(self) -> Optional[str]: 40 """Define the generate sql command. 41 42 E.g., the behaviour of gab generate sql inheriting from this. 43 """ 44 pass
Define the generate sql command.
E.g., the behaviour of gab generate sql inheriting from this.
47class GABInsertGenerator(GABSQLGenerator): 48 """GAB insert generator. 49 50 Creates the insert statement based on the dimensions and metrics provided in 51 the configuration table. 52 """ 53 54 _LOGGER = LoggingHandler(__name__).get_logger() 55 56 def __init__( 57 self, 58 query_id: str, 59 cadence: str, 60 final_stage_table: str, 61 lookup_query_builder: DataFrame, 62 target_database: str, 63 target_table: str, 64 ): 65 """Construct GABInsertGenerator instances. 66 67 Args: 68 query_id: gab configuration table use case identifier. 69 cadence: inputted cadence to process. 70 final_stage_table: stage view name. 71 lookup_query_builder: gab configuration data. 72 target_database: target database to write. 73 target_table: target table to write. 74 """ 75 self.query_id = query_id 76 self.cadence = cadence 77 self.final_stage_table = final_stage_table 78 self.lookup_query_builder = lookup_query_builder 79 self.target_database = target_database 80 self.target_table = target_table 81 82 def generate_sql(self) -> Optional[str]: 83 """Generate insert sql statement to the insights table.""" 84 insert_sql_statement = self._insert_statement_generator() 85 86 return insert_sql_statement 87 88 def _insert_statement_generator(self) -> str: 89 """Generate GAB insert statement. 90 91 Creates the insert statement based on the dimensions and metrics provided in 92 the configuration table. 93 """ 94 result = GABUtils.get_json_column_as_dict( 95 self.lookup_query_builder, self.query_id, "mappings" 96 ) 97 98 for result_key in result.keys(): 99 joined_dimensions, joined_metrics = self._get_mapping_columns( 100 mapping=result[result_key] 101 ) 102 gen_ins = f""" 103 INSERT INTO {self.target_database}.{self.target_table} 104 SELECT 105 {self.query_id} as query_id, 106 '{self.cadence}' as cadence, 107 {joined_dimensions}, 108 {joined_metrics}, 109 current_timestamp() as lh_created_on 110 FROM {self.final_stage_table} 111 """ # nosec: B608 112 113 return gen_ins 114 115 @classmethod 116 def _get_mapping_columns(cls, mapping: dict) -> tuple[str, str]: 117 """Get mapping columns(dimensions and metrics) as joined string. 118 119 Args: 120 mapping: use case mappings configuration. 121 """ 122 dimensions_mapping = mapping["dimensions"] 123 metrics_mapping = mapping["metric"] 124 125 joined_dimensions = cls._join_extracted_column_with_filled_columns( 126 columns=dimensions_mapping, is_dimension=True 127 ) 128 joined_metrics = cls._join_extracted_column_with_filled_columns( 129 columns=metrics_mapping, is_dimension=False 130 ) 131 132 return joined_dimensions, joined_metrics 133 134 @classmethod 135 def _join_extracted_column_with_filled_columns( 136 cls, columns: dict, is_dimension: bool 137 ) -> str: 138 """Join extracted columns with empty filled columns. 139 140 Args: 141 columns: use case columns and values. 142 is_dimension: flag identifying if is a dimension or a metric. 143 """ 144 extracted_columns_with_alias = ( 145 GABUtils.extract_columns_from_mapping( # type: ignore 146 columns=columns, is_dimension=is_dimension 147 ) 148 ) 149 150 filled_columns = cls._fill_empty_columns( 151 extracted_columns=extracted_columns_with_alias, # type: ignore 152 is_dimension=is_dimension, 153 ) 154 155 joined_columns = [*extracted_columns_with_alias, *filled_columns] 156 157 return ",".join(joined_columns) 158 159 @classmethod 160 def _fill_empty_columns( 161 cls, extracted_columns: list[str], is_dimension: bool 162 ) -> list[str]: 163 """Fill empty columns as null. 164 165 As the data is expected to have 40 columns we have to fill the unused columns. 166 167 Args: 168 extracted_columns: use case extracted columns. 169 is_dimension: flag identifying if is a dimension or a metric. 170 """ 171 filled_columns = [] 172 173 for ins in range( 174 ( 175 len(extracted_columns) - 1 176 if is_dimension 177 else len(extracted_columns) + 1 178 ), 179 41, 180 ): 181 filled_columns.append( 182 " null as {}{}".format("d" if is_dimension else "m", ins) 183 ) 184 185 return filled_columns
GAB insert generator.
Creates the insert statement based on the dimensions and metrics provided in the configuration table.
GABInsertGenerator( query_id: str, cadence: str, final_stage_table: str, lookup_query_builder: pyspark.sql.dataframe.DataFrame, target_database: str, target_table: str)
56 def __init__( 57 self, 58 query_id: str, 59 cadence: str, 60 final_stage_table: str, 61 lookup_query_builder: DataFrame, 62 target_database: str, 63 target_table: str, 64 ): 65 """Construct GABInsertGenerator instances. 66 67 Args: 68 query_id: gab configuration table use case identifier. 69 cadence: inputted cadence to process. 70 final_stage_table: stage view name. 71 lookup_query_builder: gab configuration data. 72 target_database: target database to write. 73 target_table: target table to write. 74 """ 75 self.query_id = query_id 76 self.cadence = cadence 77 self.final_stage_table = final_stage_table 78 self.lookup_query_builder = lookup_query_builder 79 self.target_database = target_database 80 self.target_table = target_table
Construct GABInsertGenerator instances.
Arguments:
- query_id: gab configuration table use case identifier.
- cadence: inputted cadence to process.
- final_stage_table: stage view name.
- lookup_query_builder: gab configuration data.
- target_database: target database to write.
- target_table: target table to write.
188class GABViewGenerator(GABSQLGenerator): 189 """GAB view generator. 190 191 Creates the use case view statement to be consumed. 192 """ 193 194 _LOGGER = LoggingHandler(__name__).get_logger() 195 196 def __init__( 197 self, 198 cadence_snapshot_status: dict, 199 target_database: str, 200 view_name: str, 201 final_cols: str, 202 target_table: str, 203 dimensions_and_metrics_with_alias: str, 204 dimensions: str, 205 dimensions_and_metrics: str, 206 final_calculated_script: str, 207 query_id: str, 208 view_filter: str, 209 final_calculated_script_snapshot: str, 210 without_snapshot_cadences: list[str], 211 with_snapshot_cadences: list[str], 212 ): 213 """Construct GABViewGenerator instances. 214 215 Args: 216 cadence_snapshot_status: each cadence with the corresponding snapshot 217 status. 218 target_database: target database to write. 219 view_name: name of the view to be generated. 220 final_cols: columns to return in the view. 221 target_table: target table to write. 222 dimensions_and_metrics_with_alias: configured dimensions and metrics with 223 alias to compute in the view. 224 dimensions: use case configured dimensions. 225 dimensions_and_metrics: use case configured dimensions and metrics. 226 final_calculated_script: use case calculated metrics. 227 query_id: gab configuration table use case identifier. 228 view_filter: filter to add in the view. 229 final_calculated_script_snapshot: use case calculated metrics with snapshot. 230 without_snapshot_cadences: cadences without snapshot. 231 with_snapshot_cadences: cadences with snapshot. 232 """ 233 self.cadence_snapshot_status = cadence_snapshot_status 234 self.target_database = target_database 235 self.result_key = view_name 236 self.final_cols = final_cols 237 self.target_table = target_table 238 self.dimensions_and_metrics_with_alias = dimensions_and_metrics_with_alias 239 self.dimensions = dimensions 240 self.dimensions_and_metrics = dimensions_and_metrics 241 self.final_calculated_script = final_calculated_script 242 self.query_id = query_id 243 self.view_filter = view_filter 244 self.final_calculated_script_snapshot = final_calculated_script_snapshot 245 self.without_snapshot_cadences = without_snapshot_cadences 246 self.with_snapshot_cadences = with_snapshot_cadences 247 248 @_execute_sql 249 def generate_sql(self) -> Optional[str]: 250 """Generate use case view sql statement.""" 251 consumption_view_sql = self._create_consumption_view() 252 253 return consumption_view_sql 254 255 def _create_consumption_view(self) -> str: 256 """Create consumption view.""" 257 final_view_query = self._generate_consumption_view_statement( 258 self.cadence_snapshot_status, 259 self.target_database, 260 self.final_cols, 261 self.target_table, 262 self.dimensions_and_metrics_with_alias, 263 self.dimensions, 264 self.dimensions_and_metrics, 265 self.final_calculated_script, 266 self.query_id, 267 self.view_filter, 268 self.final_calculated_script_snapshot, 269 without_snapshot_cadences=",".join( 270 f'"{w}"' for w in self.without_snapshot_cadences 271 ), 272 with_snapshot_cadences=",".join( 273 f'"{w}"' for w in self.with_snapshot_cadences 274 ), 275 ) 276 277 rendered_query = """ 278 CREATE OR REPLACE VIEW {database}.{view_name} AS {final_view_query} 279 """.format( 280 database=self.target_database, 281 view_name=self.result_key, 282 final_view_query=final_view_query, 283 ) 284 self._LOGGER.info(f"Consumption view statement: {rendered_query}") 285 return rendered_query 286 287 @classmethod 288 def _generate_consumption_view_statement( 289 cls, 290 cadence_snapshot_status: dict, 291 target_database: str, 292 final_cols: str, 293 target_table: str, 294 dimensions_and_metrics_with_alias: str, 295 dimensions: str, 296 dimensions_and_metrics: str, 297 final_calculated_script: str, 298 query_id: str, 299 view_filter: str, 300 final_calculated_script_snapshot: str, 301 without_snapshot_cadences: str, 302 with_snapshot_cadences: str, 303 ) -> str: 304 """Generate consumption view. 305 306 Args: 307 cadence_snapshot_status: cadences to execute with the information if it has 308 snapshot. 309 target_database: target database to write. 310 final_cols: use case columns exposed in the consumption view. 311 target_table: target table to write. 312 dimensions_and_metrics_with_alias: dimensions and metrics as string columns 313 with alias. 314 dimensions: dimensions as string columns. 315 dimensions_and_metrics: dimensions and metrics as string columns 316 without alias. 317 final_calculated_script: final calculated metrics script. 318 query_id: gab configuration table use case identifier. 319 view_filter: filter to execute on the view. 320 final_calculated_script_snapshot: final calculated metrics with snapshot 321 script. 322 without_snapshot_cadences: cadences without snapshot. 323 with_snapshot_cadences: cadences with snapshot. 324 """ 325 cls._LOGGER.info("Generating consumption view statement...") 326 cls._LOGGER.info( 327 f""" 328 {{ 329 target_database: {target_database}, 330 target_table: {target_table}, 331 query_id: {query_id}, 332 cadence_and_snapshot_status: {cadence_snapshot_status}, 333 cadences_without_snapshot: [{without_snapshot_cadences}], 334 cadences_with_snapshot: [{with_snapshot_cadences}], 335 final_cols: {final_cols}, 336 dimensions_and_metrics_with_alias: {dimensions_and_metrics_with_alias}, 337 dimensions: {dimensions}, 338 dimensions_with_metrics: {dimensions_and_metrics}, 339 final_calculated_script: {final_calculated_script}, 340 final_calculated_script_snapshot: {final_calculated_script_snapshot}, 341 view_filter: {view_filter} 342 }}""" 343 ) 344 if ( 345 "Y" in cadence_snapshot_status.values() 346 and "N" in cadence_snapshot_status.values() 347 ): 348 consumption_view_query = f""" 349 WITH TEMP1 AS ( 350 SELECT 351 a.cadence, 352 {dimensions_and_metrics_with_alias}{final_calculated_script} 353 FROM {target_database}.{target_table} a 354 WHERE a.query_id = {query_id} 355 AND cadence IN ({without_snapshot_cadences}) 356 {view_filter} 357 ), 358 TEMP_RN AS ( 359 SELECT 360 a.cadence, 361 a.from_date, 362 a.to_date, 363 {dimensions_and_metrics}, 364 row_number() over( 365 PARTITION BY 366 a.cadence, 367 {dimensions}, 368 a.from_date 369 order by to_date 370 ) as rn 371 FROM {target_database}.{target_table} a 372 WHERE a.query_id = {query_id} 373 AND cadence IN ({with_snapshot_cadences}) 374 {view_filter} 375 ), 376 TEMP2 AS ( 377 SELECT 378 a.cadence, 379 {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot} 380 FROM TEMP_RN a 381 ), 382 TEMP3 AS (SELECT * FROM TEMP1 UNION SELECT * from TEMP2) 383 SELECT {final_cols} FROM TEMP3 384 """ # nosec: B608 385 elif "N" in cadence_snapshot_status.values(): 386 consumption_view_query = f""" 387 WITH TEMP1 AS ( 388 SELECT 389 a.cadence, 390 {dimensions_and_metrics_with_alias}{final_calculated_script} 391 FROM {target_database}.{target_table} a 392 WHERE a.query_id = {query_id} 393 AND cadence IN ({without_snapshot_cadences}) {view_filter} 394 ) 395 SELECT {final_cols} FROM TEMP1 396 """ # nosec: B608 397 else: 398 consumption_view_query = f""" 399 WITH TEMP_RN AS ( 400 SELECT 401 a.cadence, 402 a.from_date, 403 a.to_date, 404 {dimensions_and_metrics}, 405 row_number() over( 406 PARTITION BY 407 a.cadence, 408 a.from_date, 409 a.to_date, 410 {dimensions}, 411 a.from_date 412 order by to_date) as rn 413 FROM {target_database}.{target_table} a 414 WHERE a.query_id = {query_id} 415 AND cadence IN ({with_snapshot_cadences}) 416 {view_filter} 417 ), 418 TEMP2 AS ( 419 SELECT 420 a.cadence, 421 {dimensions_and_metrics_with_alias}{final_calculated_script_snapshot} 422 FROM TEMP_RN a 423 ) 424 SELECT {final_cols} FROM TEMP2 425 """ # nosec: B608 426 427 return consumption_view_query
GAB view generator.
Creates the use case view statement to be consumed.
GABViewGenerator( cadence_snapshot_status: dict, target_database: str, view_name: str, final_cols: str, target_table: str, dimensions_and_metrics_with_alias: str, dimensions: str, dimensions_and_metrics: str, final_calculated_script: str, query_id: str, view_filter: str, final_calculated_script_snapshot: str, without_snapshot_cadences: list[str], with_snapshot_cadences: list[str])
196 def __init__( 197 self, 198 cadence_snapshot_status: dict, 199 target_database: str, 200 view_name: str, 201 final_cols: str, 202 target_table: str, 203 dimensions_and_metrics_with_alias: str, 204 dimensions: str, 205 dimensions_and_metrics: str, 206 final_calculated_script: str, 207 query_id: str, 208 view_filter: str, 209 final_calculated_script_snapshot: str, 210 without_snapshot_cadences: list[str], 211 with_snapshot_cadences: list[str], 212 ): 213 """Construct GABViewGenerator instances. 214 215 Args: 216 cadence_snapshot_status: each cadence with the corresponding snapshot 217 status. 218 target_database: target database to write. 219 view_name: name of the view to be generated. 220 final_cols: columns to return in the view. 221 target_table: target table to write. 222 dimensions_and_metrics_with_alias: configured dimensions and metrics with 223 alias to compute in the view. 224 dimensions: use case configured dimensions. 225 dimensions_and_metrics: use case configured dimensions and metrics. 226 final_calculated_script: use case calculated metrics. 227 query_id: gab configuration table use case identifier. 228 view_filter: filter to add in the view. 229 final_calculated_script_snapshot: use case calculated metrics with snapshot. 230 without_snapshot_cadences: cadences without snapshot. 231 with_snapshot_cadences: cadences with snapshot. 232 """ 233 self.cadence_snapshot_status = cadence_snapshot_status 234 self.target_database = target_database 235 self.result_key = view_name 236 self.final_cols = final_cols 237 self.target_table = target_table 238 self.dimensions_and_metrics_with_alias = dimensions_and_metrics_with_alias 239 self.dimensions = dimensions 240 self.dimensions_and_metrics = dimensions_and_metrics 241 self.final_calculated_script = final_calculated_script 242 self.query_id = query_id 243 self.view_filter = view_filter 244 self.final_calculated_script_snapshot = final_calculated_script_snapshot 245 self.without_snapshot_cadences = without_snapshot_cadences 246 self.with_snapshot_cadences = with_snapshot_cadences
Construct GABViewGenerator instances.
Arguments:
- cadence_snapshot_status: each cadence with the corresponding snapshot status.
- target_database: target database to write.
- view_name: name of the view to be generated.
- final_cols: columns to return in the view.
- target_table: target table to write.
- dimensions_and_metrics_with_alias: configured dimensions and metrics with alias to compute in the view.
- dimensions: use case configured dimensions.
- dimensions_and_metrics: use case configured dimensions and metrics.
- final_calculated_script: use case calculated metrics.
- query_id: gab configuration table use case identifier.
- view_filter: filter to add in the view.
- final_calculated_script_snapshot: use case calculated metrics with snapshot.
- without_snapshot_cadences: cadences without snapshot.
- with_snapshot_cadences: cadences with snapshot.
430class GABDeleteGenerator(GABSQLGenerator): 431 """GAB delete generator. 432 433 Creates the delete statement to clean the use case base data on the insights table. 434 """ 435 436 _LOGGER = LoggingHandler(__name__).get_logger() 437 438 def __init__( 439 self, 440 query_id: str, 441 cadence: str, 442 temp_stage_view_name: str, 443 lookup_query_builder: DataFrame, 444 target_database: str, 445 target_table: str, 446 ): 447 """Construct GABViewGenerator instances. 448 449 Args: 450 query_id: gab configuration table use case identifier. 451 cadence: inputted cadence to process. 452 temp_stage_view_name: stage view name. 453 lookup_query_builder: gab configuration data. 454 target_database: target database to write. 455 target_table: target table to write. 456 """ 457 self.query_id = query_id 458 self.cadence = cadence 459 self.temp_stage_view_name = temp_stage_view_name 460 self.lookup_query_builder = lookup_query_builder 461 self.target_database = target_database 462 self.target_table = target_table 463 464 @_execute_sql 465 def generate_sql(self) -> Optional[str]: 466 """Generate delete sql statement. 467 468 This statement is to clean the insights table for the corresponding use case. 469 """ 470 delete_sql_statement = self._delete_statement_generator() 471 472 return delete_sql_statement 473 474 def _delete_statement_generator(self) -> str: 475 df_filtered = self.lookup_query_builder.filter( 476 col("query_id") == lit(self.query_id) 477 ) 478 479 df_map = df_filtered.select(col("mappings")) 480 view_df = df_map.select( 481 to_json(struct([df_map[x] for x in df_map.columns])) 482 ).collect()[0][0] 483 line = json.loads(view_df) 484 485 for line_v in line.values(): 486 result = ast.literal_eval(line_v) 487 488 for result_key in result.keys(): 489 result_new = result[result_key] 490 dim_from_date = result_new["dimensions"]["from_date"] 491 dim_to_date = result_new["dimensions"]["to_date"] 492 493 self._LOGGER.info(f"temp stage view name: {self.temp_stage_view_name}") 494 495 min_from_date = ExecEnv.SESSION.sql( 496 """ 497 SELECT 498 MIN({from_date}) as min_from_date 499 FROM {iter_stages}""".format( # nosec: B608 500 iter_stages=self.temp_stage_view_name, from_date=dim_from_date 501 ) 502 ).collect()[0][0] 503 max_from_date = ExecEnv.SESSION.sql( 504 """ 505 SELECT 506 MAX({from_date}) as max_from_date 507 FROM {iter_stages}""".format( # nosec: B608 508 iter_stages=self.temp_stage_view_name, from_date=dim_from_date 509 ) 510 ).collect()[0][0] 511 512 min_to_date = ExecEnv.SESSION.sql( 513 """ 514 SELECT 515 MIN({to_date}) as min_to_date 516 FROM {iter_stages}""".format( # nosec: B608 517 iter_stages=self.temp_stage_view_name, to_date=dim_to_date 518 ) 519 ).collect()[0][0] 520 max_to_date = ExecEnv.SESSION.sql( 521 """ 522 SELECT 523 MAX({to_date}) as max_to_date 524 FROM {iter_stages}""".format( # nosec: B608 525 iter_stages=self.temp_stage_view_name, to_date=dim_to_date 526 ) 527 ).collect()[0][0] 528 529 gen_del = """ 530 DELETE FROM {target_database}.{target_table} a 531 WHERE query_id = {query_id} 532 AND cadence = '{cadence}' 533 AND from_date BETWEEN '{min_from_date}' AND '{max_from_date}' 534 AND to_date BETWEEN '{min_to_date}' AND '{max_to_date}' 535 """.format( # nosec: B608 536 target_database=self.target_database, 537 target_table=self.target_table, 538 query_id=self.query_id, 539 cadence=self.cadence, 540 min_from_date=min_from_date, 541 max_from_date=max_from_date, 542 min_to_date=min_to_date, 543 max_to_date=max_to_date, 544 ) 545 546 return gen_del
GAB delete generator.
Creates the delete statement to clean the use case base data on the insights table.
GABDeleteGenerator( query_id: str, cadence: str, temp_stage_view_name: str, lookup_query_builder: pyspark.sql.dataframe.DataFrame, target_database: str, target_table: str)
438 def __init__( 439 self, 440 query_id: str, 441 cadence: str, 442 temp_stage_view_name: str, 443 lookup_query_builder: DataFrame, 444 target_database: str, 445 target_table: str, 446 ): 447 """Construct GABViewGenerator instances. 448 449 Args: 450 query_id: gab configuration table use case identifier. 451 cadence: inputted cadence to process. 452 temp_stage_view_name: stage view name. 453 lookup_query_builder: gab configuration data. 454 target_database: target database to write. 455 target_table: target table to write. 456 """ 457 self.query_id = query_id 458 self.cadence = cadence 459 self.temp_stage_view_name = temp_stage_view_name 460 self.lookup_query_builder = lookup_query_builder 461 self.target_database = target_database 462 self.target_table = target_table
Construct GABViewGenerator instances.
Arguments:
- query_id: gab configuration table use case identifier.
- cadence: inputted cadence to process.
- temp_stage_view_name: stage view name.
- lookup_query_builder: gab configuration data.
- target_database: target database to write.
- target_table: target table to write.