lakehouse_engine.utils.extraction.sap_bw_extraction_utils
Utilities module for SAP BW extraction processes.
1"""Utilities module for SAP BW extraction processes.""" 2 3from copy import copy 4from dataclasses import dataclass 5from logging import Logger 6from typing import Optional, Tuple 7 8from lakehouse_engine.core.definitions import InputFormat, InputSpec, ReadType 9from lakehouse_engine.transformers.aggregators import Aggregators 10from lakehouse_engine.utils.extraction.jdbc_extraction_utils import ( 11 JDBCExtraction, 12 JDBCExtractionType, 13 JDBCExtractionUtils, 14) 15from lakehouse_engine.utils.logging_handler import LoggingHandler 16 17 18@dataclass 19class SAPBWExtraction(JDBCExtraction): 20 """Configurations available for an Extraction from SAP BW. 21 22 It inherits from SAPBWExtraction configurations, so it can use 23 and/or overwrite those configurations. 24 25 These configurations cover: 26 - latest_timestamp_input_col: the column containing the actrequest timestamp 27 in the dataset in latest_timestamp_data_location. Default: 28 "actrequest_timestamp". 29 - act_request_table: the name of the SAP BW activation requests table. 30 Composed of database.table. Default: SAPPHA.RSODSACTREQ. 31 - request_col_name: name of the column having the request to join 32 with the activation request table. Default: actrequest. 33 - act_req_join_condition: the join condition into activation table 34 can be changed using this property. 35 Default: 'changelog_tbl.request = act_req.request_col_name'. 36 - odsobject: name of BW Object, used for joining with the activation request 37 table to get the max actrequest_timestamp to consider while filtering 38 the changelog table. 39 - include_changelog_tech_cols: whether to include the technical columns 40 (usually coming from the changelog) table or not. Default: True. 41 - extra_cols_act_request: list of columns to be added from act request table. 42 It needs to contain the prefix "act_req.". E.g. "act_req.col1 43 as column_one, act_req.col2 as column_two". 44 - get_timestamp_from_act_request: whether to get init timestamp 45 from act request table or assume current/given timestamp. 46 - sap_bw_schema: sap bw schema. Default: SAPPHA. 47 - max_timestamp_custom_schema: the custom schema to apply on the calculation of 48 the max timestamp to consider for the delta extractions. 49 Default: timestamp DECIMAL(23,0). 50 - default_max_timestamp: the timestamp to use as default, when it is not possible 51 to derive one. 52 """ 53 54 latest_timestamp_input_col: str = "actrequest_timestamp" 55 act_request_table: str = "SAPPHA.RSODSACTREQ" 56 request_col_name: str = "actrequest" 57 act_req_join_condition: Optional[str] = None 58 odsobject: Optional[str] = None 59 include_changelog_tech_cols: bool = True 60 extra_cols_act_request: Optional[str] = None 61 get_timestamp_from_act_request: bool = False 62 sap_bw_schema: str = "SAPPHA" 63 max_timestamp_custom_schema: str = "timestamp DECIMAL(15,0)" 64 default_max_timestamp: str = "197000000000000" 65 66 67class SAPBWExtractionUtils(JDBCExtractionUtils): 68 """Utils for managing data extraction from particularly relevant JDBC sources.""" 69 70 def __init__(self, sap_bw_extraction: SAPBWExtraction): 71 """Construct SAPBWExtractionUtils. 72 73 Args: 74 sap_bw_extraction: SAP BW Extraction configurations. 75 """ 76 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 77 self._BW_EXTRACTION = sap_bw_extraction 78 self._BW_EXTRACTION.changelog_table = self.get_changelog_table() 79 self._MAX_TIMESTAMP_QUERY = f""" --# nosec 80 (SELECT COALESCE(MAX(timestamp), 81 {self._BW_EXTRACTION.default_max_timestamp}) as timestamp 82 FROM {self._BW_EXTRACTION.act_request_table} 83 WHERE odsobject = '{self._BW_EXTRACTION.odsobject}' 84 AND operation = 'A' AND status = '0') 85 """ # nosec: B608 86 super().__init__(sap_bw_extraction) 87 88 def get_changelog_table(self) -> str: 89 """Get the changelog table, given an odsobject. 90 91 Returns: 92 String to use as changelog_table. 93 """ 94 if ( 95 self._BW_EXTRACTION.odsobject is not None 96 and self._BW_EXTRACTION.changelog_table is None 97 and self._BW_EXTRACTION.extraction_type != JDBCExtractionType.INIT.value 98 ): 99 escaped_odsobject = copy(self._BW_EXTRACTION.odsobject).replace("_", "$_") 100 101 if self._BW_EXTRACTION.sap_bw_schema: 102 system_table = f"{self._BW_EXTRACTION.sap_bw_schema}.RSTSODS" 103 else: 104 system_table = "RSTSODS" 105 106 jdbc_args = { 107 "url": self._BW_EXTRACTION.url, 108 "table": f""" -- # nosec 109 (SELECT ODSNAME_TECH 110 FROM {system_table} 111 WHERE ODSNAME LIKE '8{escaped_odsobject}$_%' 112 ESCAPE '$' AND USERAPP = 'CHANGELOG' AND VERSION = '000') 113 """, # nosec: B608 114 "properties": { 115 "user": self._BW_EXTRACTION.user, 116 "password": self._BW_EXTRACTION.password, 117 "driver": self._BW_EXTRACTION.driver, 118 }, 119 } 120 from lakehouse_engine.io.reader_factory import ReaderFactory 121 122 changelog_df = ReaderFactory.get_data( 123 InputSpec( 124 spec_id="changelog_table", 125 data_format=InputFormat.JDBC.value, 126 read_type=ReadType.BATCH.value, 127 jdbc_args=jdbc_args, 128 ) 129 ) 130 changelog_table = ( 131 f'{self._BW_EXTRACTION.sap_bw_schema}."{changelog_df.first()[0]}"' 132 if self._BW_EXTRACTION.sap_bw_schema 133 else str(changelog_df.first()[0]) 134 ) 135 else: 136 changelog_table = ( 137 self._BW_EXTRACTION.changelog_table 138 if self._BW_EXTRACTION.changelog_table 139 else f"{self._BW_EXTRACTION.dbtable}_cl" 140 ) 141 self._LOGGER.info(f"The changelog table derived is: '{changelog_table}'") 142 143 return changelog_table 144 145 @staticmethod 146 def get_odsobject(input_spec_opt: dict) -> str: 147 """Get the odsobject based on the provided options. 148 149 With the table name we may also get the db name, so we need to split. 150 Moreover, there might be the need for people to specify odsobject if 151 it is different from the dbtable. 152 153 Args: 154 input_spec_opt: options from the input_spec. 155 156 Returns: 157 A string with the odsobject. 158 """ 159 return str( 160 input_spec_opt["dbtable"].split(".")[1] 161 if len(input_spec_opt["dbtable"].split(".")) > 1 162 else input_spec_opt["dbtable"] 163 ) 164 165 def _get_init_query(self) -> Tuple[str, str]: 166 """Get a query to do an init load based on a DSO on a SAP BW system. 167 168 Returns: 169 A query to submit to SAP BW for the initial data extraction. The query 170 is enclosed in parentheses so that Spark treats it as a table and supports 171 it in the dbtable option. 172 """ 173 if self._BW_EXTRACTION.get_timestamp_from_act_request: 174 # check if we are dealing with a DSO of type Write Optimised 175 if self._BW_EXTRACTION.dbtable == self._BW_EXTRACTION.changelog_table: 176 extraction_query = self._get_init_extraction_query_act_req_timestamp() 177 else: 178 raise AttributeError( 179 "Not able to get the extraction query. The option " 180 "'get_timestamp_from_act_request' is only " 181 "available/useful for DSOs of type Write Optimised." 182 ) 183 else: 184 extraction_query = self._get_init_extraction_query() 185 186 predicates_query = f""" 187 (SELECT DISTINCT({self._BW_EXTRACTION.partition_column}) 188 FROM {self._BW_EXTRACTION.dbtable} t) 189 """ # nosec: B608 190 191 return extraction_query, predicates_query 192 193 def _get_init_extraction_query(self) -> str: 194 """Get extraction query based on given/current timestamp. 195 196 Returns: 197 A query to submit to SAP BW for the initial data extraction. 198 """ 199 changelog_tech_cols = ( 200 f"""'0' AS request, 201 CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0)) 202 AS actrequest_timestamp, 203 '0' AS datapakid, 204 0 AS partno, 205 0 AS record,""" 206 if self._BW_EXTRACTION.include_changelog_tech_cols 207 else f"CAST({self._BW_EXTRACTION.extraction_timestamp} " 208 f"AS DECIMAL(15, 0))" 209 f" AS actrequest_timestamp," 210 ) 211 212 extraction_query = f""" 213 (SELECT t.*, 214 {changelog_tech_cols} 215 CAST({self._BW_EXTRACTION.extraction_timestamp} 216 AS DECIMAL(15, 0)) AS extraction_start_timestamp 217 FROM {self._BW_EXTRACTION.dbtable} t 218 )""" # nosec: B608 219 220 return extraction_query 221 222 def _get_init_extraction_query_act_req_timestamp(self) -> str: 223 """Get extraction query assuming the init timestamp from act_request table. 224 225 Returns: 226 A query to submit to SAP BW for the initial data extraction from 227 write optimised DSOs, receiving the actrequest_timestamp from 228 the activation requests table. 229 """ 230 extraction_query = f""" 231 (SELECT t.*, 232 act_req.timestamp as actrequest_timestamp, 233 CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0)) 234 AS extraction_start_timestamp 235 FROM {self._BW_EXTRACTION.dbtable} t 236 JOIN {self._BW_EXTRACTION.act_request_table} AS act_req ON 237 t.request = act_req.{self._BW_EXTRACTION.request_col_name} 238 WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}' 239 AND operation = 'A' AND status = '0' 240 )""" # nosec: B608 241 242 return extraction_query 243 244 def _get_delta_query(self) -> Tuple[str, str]: 245 """Get a delta query for an SAP BW DSO. 246 247 An SAP BW DSO requires a join with a special type of table often called 248 activation requests table, in which BW tracks down the timestamps associated 249 with the several data loads that were performed into BW. Because the changelog 250 table only contains the active request id, and that cannot be sorted by the 251 downstream consumers to figure out the latest change, we need to join the 252 changelog table with this special table to get the activation requests 253 timestamps to then use them to figure out the latest changes in the delta load 254 logic afterwards. 255 256 Additionally, we also need to know which was the latest timestamp already loaded 257 into the lakehouse bronze layer. The latest timestamp should always be available 258 in the bronze dataset itself or in a dataset that tracks down all the actrequest 259 timestamps that were already loaded. So we get the max value out of the 260 respective actrequest timestamp column in that dataset. 261 262 Returns: 263 A query to submit to SAP BW for the delta data extraction. The query 264 is enclosed in parentheses so that Spark treats it as a table and supports 265 it in the dbtable option. 266 """ 267 if not self._BW_EXTRACTION.min_timestamp: 268 from lakehouse_engine.io.reader_factory import ReaderFactory 269 270 latest_timestamp_data_df = ReaderFactory.get_data( 271 InputSpec( 272 spec_id="data_with_latest_timestamp", 273 data_format=self._BW_EXTRACTION.latest_timestamp_data_format, 274 read_type=ReadType.BATCH.value, 275 location=self._BW_EXTRACTION.latest_timestamp_data_location, 276 ) 277 ) 278 min_timestamp = latest_timestamp_data_df.transform( 279 Aggregators.get_max_value( 280 self._BW_EXTRACTION.latest_timestamp_input_col 281 ) 282 ).first()[0] 283 else: 284 min_timestamp = self._BW_EXTRACTION.min_timestamp 285 286 max_timestamp = ( 287 self._BW_EXTRACTION.max_timestamp 288 if self._BW_EXTRACTION.max_timestamp 289 else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY) 290 ) 291 292 if self._BW_EXTRACTION.act_req_join_condition: 293 join_condition = f"{self._BW_EXTRACTION.act_req_join_condition}" 294 else: 295 join_condition = ( 296 f"changelog_tbl.request = " 297 f"act_req.{self._BW_EXTRACTION.request_col_name}" 298 ) 299 300 base_query = f""" --# nosec 301 FROM {self._BW_EXTRACTION.changelog_table} AS changelog_tbl 302 JOIN {self._BW_EXTRACTION.act_request_table} AS act_req 303 ON {join_condition} 304 WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}' 305 AND act_req.timestamp > {min_timestamp} 306 AND act_req.timestamp <= {max_timestamp} 307 AND operation = 'A' AND status = '0') 308 """ 309 310 main_cols = f""" 311 (SELECT changelog_tbl.*, 312 act_req.TIMESTAMP AS actrequest_timestamp, 313 CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0)) 314 AS extraction_start_timestamp 315 """ 316 # We join the main columns considered for the extraction with 317 # extra_cols_act_request that people might want to use, filtering to only 318 # add the comma and join the strings, in case extra_cols_act_request is 319 # not None or empty. 320 extraction_query_cols = ",".join( 321 filter(None, [main_cols, self._BW_EXTRACTION.extra_cols_act_request]) 322 ) 323 324 extraction_query = extraction_query_cols + base_query 325 326 predicates_query = f""" 327 (SELECT DISTINCT({self._BW_EXTRACTION.partition_column}) 328 {base_query} 329 """ 330 331 return extraction_query, predicates_query
@dataclass
class
SAPBWExtraction19@dataclass 20class SAPBWExtraction(JDBCExtraction): 21 """Configurations available for an Extraction from SAP BW. 22 23 It inherits from SAPBWExtraction configurations, so it can use 24 and/or overwrite those configurations. 25 26 These configurations cover: 27 - latest_timestamp_input_col: the column containing the actrequest timestamp 28 in the dataset in latest_timestamp_data_location. Default: 29 "actrequest_timestamp". 30 - act_request_table: the name of the SAP BW activation requests table. 31 Composed of database.table. Default: SAPPHA.RSODSACTREQ. 32 - request_col_name: name of the column having the request to join 33 with the activation request table. Default: actrequest. 34 - act_req_join_condition: the join condition into activation table 35 can be changed using this property. 36 Default: 'changelog_tbl.request = act_req.request_col_name'. 37 - odsobject: name of BW Object, used for joining with the activation request 38 table to get the max actrequest_timestamp to consider while filtering 39 the changelog table. 40 - include_changelog_tech_cols: whether to include the technical columns 41 (usually coming from the changelog) table or not. Default: True. 42 - extra_cols_act_request: list of columns to be added from act request table. 43 It needs to contain the prefix "act_req.". E.g. "act_req.col1 44 as column_one, act_req.col2 as column_two". 45 - get_timestamp_from_act_request: whether to get init timestamp 46 from act request table or assume current/given timestamp. 47 - sap_bw_schema: sap bw schema. Default: SAPPHA. 48 - max_timestamp_custom_schema: the custom schema to apply on the calculation of 49 the max timestamp to consider for the delta extractions. 50 Default: timestamp DECIMAL(23,0). 51 - default_max_timestamp: the timestamp to use as default, when it is not possible 52 to derive one. 53 """ 54 55 latest_timestamp_input_col: str = "actrequest_timestamp" 56 act_request_table: str = "SAPPHA.RSODSACTREQ" 57 request_col_name: str = "actrequest" 58 act_req_join_condition: Optional[str] = None 59 odsobject: Optional[str] = None 60 include_changelog_tech_cols: bool = True 61 extra_cols_act_request: Optional[str] = None 62 get_timestamp_from_act_request: bool = False 63 sap_bw_schema: str = "SAPPHA" 64 max_timestamp_custom_schema: str = "timestamp DECIMAL(15,0)" 65 default_max_timestamp: str = "197000000000000"
Configurations available for an Extraction from SAP BW.
It inherits from SAPBWExtraction configurations, so it can use and/or overwrite those configurations.
These configurations cover:
- latest_timestamp_input_col: the column containing the actrequest timestamp in the dataset in latest_timestamp_data_location. Default: "actrequest_timestamp".
- act_request_table: the name of the SAP BW activation requests table. Composed of database.table. Default: SAPPHA.RSODSACTREQ.
- request_col_name: name of the column having the request to join with the activation request table. Default: actrequest.
- act_req_join_condition: the join condition into activation table can be changed using this property. Default: 'changelog_tbl.request = act_req.request_col_name'.
- odsobject: name of BW Object, used for joining with the activation request table to get the max actrequest_timestamp to consider while filtering the changelog table.
- include_changelog_tech_cols: whether to include the technical columns (usually coming from the changelog) table or not. Default: True.
- extra_cols_act_request: list of columns to be added from act request table. It needs to contain the prefix "act_req.". E.g. "act_req.col1 as column_one, act_req.col2 as column_two".
- get_timestamp_from_act_request: whether to get init timestamp from act request table or assume current/given timestamp.
- sap_bw_schema: sap bw schema. Default: SAPPHA.
- max_timestamp_custom_schema: the custom schema to apply on the calculation of the max timestamp to consider for the delta extractions. Default: timestamp DECIMAL(23,0).
- default_max_timestamp: the timestamp to use as default, when it is not possible to derive one.
SAPBWExtraction( user: str, password: str, url: str, dbtable: str, calc_upper_bound_schema: Optional[str] = None, changelog_table: Optional[str] = None, partition_column: Optional[str] = None, latest_timestamp_data_location: Optional[str] = None, latest_timestamp_data_format: str = 'delta', extraction_type: str = 'delta', driver: str = 'com.sap.db.jdbc.Driver', num_partitions: Optional[int] = None, lower_bound: Union[int, float, str, NoneType] = None, upper_bound: Union[int, float, str, NoneType] = None, default_upper_bound: str = '1', fetch_size: str = '100000', compress: bool = True, custom_schema: Optional[str] = None, min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None, generate_predicates: bool = False, predicates: Optional[List] = None, predicates_add_null: bool = True, extraction_timestamp: str = '20241028145833', max_timestamp_custom_schema: str = 'timestamp DECIMAL(15,0)', latest_timestamp_input_col: str = 'actrequest_timestamp', act_request_table: str = 'SAPPHA.RSODSACTREQ', request_col_name: str = 'actrequest', act_req_join_condition: Optional[str] = None, odsobject: Optional[str] = None, include_changelog_tech_cols: bool = True, extra_cols_act_request: Optional[str] = None, get_timestamp_from_act_request: bool = False, sap_bw_schema: str = 'SAPPHA', default_max_timestamp: str = '197000000000000')
Inherited Members
- lakehouse_engine.utils.extraction.jdbc_extraction_utils.JDBCExtraction
- user
- password
- url
- dbtable
- calc_upper_bound_schema
- changelog_table
- partition_column
- latest_timestamp_data_location
- latest_timestamp_data_format
- extraction_type
- driver
- num_partitions
- lower_bound
- upper_bound
- default_upper_bound
- fetch_size
- compress
- custom_schema
- min_timestamp
- max_timestamp
- generate_predicates
- predicates
- predicates_add_null
- extraction_timestamp
class
SAPBWExtractionUtils(lakehouse_engine.utils.extraction.jdbc_extraction_utils.JDBCExtractionUtils):
68class SAPBWExtractionUtils(JDBCExtractionUtils): 69 """Utils for managing data extraction from particularly relevant JDBC sources.""" 70 71 def __init__(self, sap_bw_extraction: SAPBWExtraction): 72 """Construct SAPBWExtractionUtils. 73 74 Args: 75 sap_bw_extraction: SAP BW Extraction configurations. 76 """ 77 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 78 self._BW_EXTRACTION = sap_bw_extraction 79 self._BW_EXTRACTION.changelog_table = self.get_changelog_table() 80 self._MAX_TIMESTAMP_QUERY = f""" --# nosec 81 (SELECT COALESCE(MAX(timestamp), 82 {self._BW_EXTRACTION.default_max_timestamp}) as timestamp 83 FROM {self._BW_EXTRACTION.act_request_table} 84 WHERE odsobject = '{self._BW_EXTRACTION.odsobject}' 85 AND operation = 'A' AND status = '0') 86 """ # nosec: B608 87 super().__init__(sap_bw_extraction) 88 89 def get_changelog_table(self) -> str: 90 """Get the changelog table, given an odsobject. 91 92 Returns: 93 String to use as changelog_table. 94 """ 95 if ( 96 self._BW_EXTRACTION.odsobject is not None 97 and self._BW_EXTRACTION.changelog_table is None 98 and self._BW_EXTRACTION.extraction_type != JDBCExtractionType.INIT.value 99 ): 100 escaped_odsobject = copy(self._BW_EXTRACTION.odsobject).replace("_", "$_") 101 102 if self._BW_EXTRACTION.sap_bw_schema: 103 system_table = f"{self._BW_EXTRACTION.sap_bw_schema}.RSTSODS" 104 else: 105 system_table = "RSTSODS" 106 107 jdbc_args = { 108 "url": self._BW_EXTRACTION.url, 109 "table": f""" -- # nosec 110 (SELECT ODSNAME_TECH 111 FROM {system_table} 112 WHERE ODSNAME LIKE '8{escaped_odsobject}$_%' 113 ESCAPE '$' AND USERAPP = 'CHANGELOG' AND VERSION = '000') 114 """, # nosec: B608 115 "properties": { 116 "user": self._BW_EXTRACTION.user, 117 "password": self._BW_EXTRACTION.password, 118 "driver": self._BW_EXTRACTION.driver, 119 }, 120 } 121 from lakehouse_engine.io.reader_factory import ReaderFactory 122 123 changelog_df = ReaderFactory.get_data( 124 InputSpec( 125 spec_id="changelog_table", 126 data_format=InputFormat.JDBC.value, 127 read_type=ReadType.BATCH.value, 128 jdbc_args=jdbc_args, 129 ) 130 ) 131 changelog_table = ( 132 f'{self._BW_EXTRACTION.sap_bw_schema}."{changelog_df.first()[0]}"' 133 if self._BW_EXTRACTION.sap_bw_schema 134 else str(changelog_df.first()[0]) 135 ) 136 else: 137 changelog_table = ( 138 self._BW_EXTRACTION.changelog_table 139 if self._BW_EXTRACTION.changelog_table 140 else f"{self._BW_EXTRACTION.dbtable}_cl" 141 ) 142 self._LOGGER.info(f"The changelog table derived is: '{changelog_table}'") 143 144 return changelog_table 145 146 @staticmethod 147 def get_odsobject(input_spec_opt: dict) -> str: 148 """Get the odsobject based on the provided options. 149 150 With the table name we may also get the db name, so we need to split. 151 Moreover, there might be the need for people to specify odsobject if 152 it is different from the dbtable. 153 154 Args: 155 input_spec_opt: options from the input_spec. 156 157 Returns: 158 A string with the odsobject. 159 """ 160 return str( 161 input_spec_opt["dbtable"].split(".")[1] 162 if len(input_spec_opt["dbtable"].split(".")) > 1 163 else input_spec_opt["dbtable"] 164 ) 165 166 def _get_init_query(self) -> Tuple[str, str]: 167 """Get a query to do an init load based on a DSO on a SAP BW system. 168 169 Returns: 170 A query to submit to SAP BW for the initial data extraction. The query 171 is enclosed in parentheses so that Spark treats it as a table and supports 172 it in the dbtable option. 173 """ 174 if self._BW_EXTRACTION.get_timestamp_from_act_request: 175 # check if we are dealing with a DSO of type Write Optimised 176 if self._BW_EXTRACTION.dbtable == self._BW_EXTRACTION.changelog_table: 177 extraction_query = self._get_init_extraction_query_act_req_timestamp() 178 else: 179 raise AttributeError( 180 "Not able to get the extraction query. The option " 181 "'get_timestamp_from_act_request' is only " 182 "available/useful for DSOs of type Write Optimised." 183 ) 184 else: 185 extraction_query = self._get_init_extraction_query() 186 187 predicates_query = f""" 188 (SELECT DISTINCT({self._BW_EXTRACTION.partition_column}) 189 FROM {self._BW_EXTRACTION.dbtable} t) 190 """ # nosec: B608 191 192 return extraction_query, predicates_query 193 194 def _get_init_extraction_query(self) -> str: 195 """Get extraction query based on given/current timestamp. 196 197 Returns: 198 A query to submit to SAP BW for the initial data extraction. 199 """ 200 changelog_tech_cols = ( 201 f"""'0' AS request, 202 CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0)) 203 AS actrequest_timestamp, 204 '0' AS datapakid, 205 0 AS partno, 206 0 AS record,""" 207 if self._BW_EXTRACTION.include_changelog_tech_cols 208 else f"CAST({self._BW_EXTRACTION.extraction_timestamp} " 209 f"AS DECIMAL(15, 0))" 210 f" AS actrequest_timestamp," 211 ) 212 213 extraction_query = f""" 214 (SELECT t.*, 215 {changelog_tech_cols} 216 CAST({self._BW_EXTRACTION.extraction_timestamp} 217 AS DECIMAL(15, 0)) AS extraction_start_timestamp 218 FROM {self._BW_EXTRACTION.dbtable} t 219 )""" # nosec: B608 220 221 return extraction_query 222 223 def _get_init_extraction_query_act_req_timestamp(self) -> str: 224 """Get extraction query assuming the init timestamp from act_request table. 225 226 Returns: 227 A query to submit to SAP BW for the initial data extraction from 228 write optimised DSOs, receiving the actrequest_timestamp from 229 the activation requests table. 230 """ 231 extraction_query = f""" 232 (SELECT t.*, 233 act_req.timestamp as actrequest_timestamp, 234 CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0)) 235 AS extraction_start_timestamp 236 FROM {self._BW_EXTRACTION.dbtable} t 237 JOIN {self._BW_EXTRACTION.act_request_table} AS act_req ON 238 t.request = act_req.{self._BW_EXTRACTION.request_col_name} 239 WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}' 240 AND operation = 'A' AND status = '0' 241 )""" # nosec: B608 242 243 return extraction_query 244 245 def _get_delta_query(self) -> Tuple[str, str]: 246 """Get a delta query for an SAP BW DSO. 247 248 An SAP BW DSO requires a join with a special type of table often called 249 activation requests table, in which BW tracks down the timestamps associated 250 with the several data loads that were performed into BW. Because the changelog 251 table only contains the active request id, and that cannot be sorted by the 252 downstream consumers to figure out the latest change, we need to join the 253 changelog table with this special table to get the activation requests 254 timestamps to then use them to figure out the latest changes in the delta load 255 logic afterwards. 256 257 Additionally, we also need to know which was the latest timestamp already loaded 258 into the lakehouse bronze layer. The latest timestamp should always be available 259 in the bronze dataset itself or in a dataset that tracks down all the actrequest 260 timestamps that were already loaded. So we get the max value out of the 261 respective actrequest timestamp column in that dataset. 262 263 Returns: 264 A query to submit to SAP BW for the delta data extraction. The query 265 is enclosed in parentheses so that Spark treats it as a table and supports 266 it in the dbtable option. 267 """ 268 if not self._BW_EXTRACTION.min_timestamp: 269 from lakehouse_engine.io.reader_factory import ReaderFactory 270 271 latest_timestamp_data_df = ReaderFactory.get_data( 272 InputSpec( 273 spec_id="data_with_latest_timestamp", 274 data_format=self._BW_EXTRACTION.latest_timestamp_data_format, 275 read_type=ReadType.BATCH.value, 276 location=self._BW_EXTRACTION.latest_timestamp_data_location, 277 ) 278 ) 279 min_timestamp = latest_timestamp_data_df.transform( 280 Aggregators.get_max_value( 281 self._BW_EXTRACTION.latest_timestamp_input_col 282 ) 283 ).first()[0] 284 else: 285 min_timestamp = self._BW_EXTRACTION.min_timestamp 286 287 max_timestamp = ( 288 self._BW_EXTRACTION.max_timestamp 289 if self._BW_EXTRACTION.max_timestamp 290 else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY) 291 ) 292 293 if self._BW_EXTRACTION.act_req_join_condition: 294 join_condition = f"{self._BW_EXTRACTION.act_req_join_condition}" 295 else: 296 join_condition = ( 297 f"changelog_tbl.request = " 298 f"act_req.{self._BW_EXTRACTION.request_col_name}" 299 ) 300 301 base_query = f""" --# nosec 302 FROM {self._BW_EXTRACTION.changelog_table} AS changelog_tbl 303 JOIN {self._BW_EXTRACTION.act_request_table} AS act_req 304 ON {join_condition} 305 WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}' 306 AND act_req.timestamp > {min_timestamp} 307 AND act_req.timestamp <= {max_timestamp} 308 AND operation = 'A' AND status = '0') 309 """ 310 311 main_cols = f""" 312 (SELECT changelog_tbl.*, 313 act_req.TIMESTAMP AS actrequest_timestamp, 314 CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0)) 315 AS extraction_start_timestamp 316 """ 317 # We join the main columns considered for the extraction with 318 # extra_cols_act_request that people might want to use, filtering to only 319 # add the comma and join the strings, in case extra_cols_act_request is 320 # not None or empty. 321 extraction_query_cols = ",".join( 322 filter(None, [main_cols, self._BW_EXTRACTION.extra_cols_act_request]) 323 ) 324 325 extraction_query = extraction_query_cols + base_query 326 327 predicates_query = f""" 328 (SELECT DISTINCT({self._BW_EXTRACTION.partition_column}) 329 {base_query} 330 """ 331 332 return extraction_query, predicates_query
Utils for managing data extraction from particularly relevant JDBC sources.
SAPBWExtractionUtils( sap_bw_extraction: SAPBWExtraction)
71 def __init__(self, sap_bw_extraction: SAPBWExtraction): 72 """Construct SAPBWExtractionUtils. 73 74 Args: 75 sap_bw_extraction: SAP BW Extraction configurations. 76 """ 77 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 78 self._BW_EXTRACTION = sap_bw_extraction 79 self._BW_EXTRACTION.changelog_table = self.get_changelog_table() 80 self._MAX_TIMESTAMP_QUERY = f""" --# nosec 81 (SELECT COALESCE(MAX(timestamp), 82 {self._BW_EXTRACTION.default_max_timestamp}) as timestamp 83 FROM {self._BW_EXTRACTION.act_request_table} 84 WHERE odsobject = '{self._BW_EXTRACTION.odsobject}' 85 AND operation = 'A' AND status = '0') 86 """ # nosec: B608 87 super().__init__(sap_bw_extraction)
Construct SAPBWExtractionUtils.
Arguments:
- sap_bw_extraction: SAP BW Extraction configurations.
def
get_changelog_table(self) -> str:
89 def get_changelog_table(self) -> str: 90 """Get the changelog table, given an odsobject. 91 92 Returns: 93 String to use as changelog_table. 94 """ 95 if ( 96 self._BW_EXTRACTION.odsobject is not None 97 and self._BW_EXTRACTION.changelog_table is None 98 and self._BW_EXTRACTION.extraction_type != JDBCExtractionType.INIT.value 99 ): 100 escaped_odsobject = copy(self._BW_EXTRACTION.odsobject).replace("_", "$_") 101 102 if self._BW_EXTRACTION.sap_bw_schema: 103 system_table = f"{self._BW_EXTRACTION.sap_bw_schema}.RSTSODS" 104 else: 105 system_table = "RSTSODS" 106 107 jdbc_args = { 108 "url": self._BW_EXTRACTION.url, 109 "table": f""" -- # nosec 110 (SELECT ODSNAME_TECH 111 FROM {system_table} 112 WHERE ODSNAME LIKE '8{escaped_odsobject}$_%' 113 ESCAPE '$' AND USERAPP = 'CHANGELOG' AND VERSION = '000') 114 """, # nosec: B608 115 "properties": { 116 "user": self._BW_EXTRACTION.user, 117 "password": self._BW_EXTRACTION.password, 118 "driver": self._BW_EXTRACTION.driver, 119 }, 120 } 121 from lakehouse_engine.io.reader_factory import ReaderFactory 122 123 changelog_df = ReaderFactory.get_data( 124 InputSpec( 125 spec_id="changelog_table", 126 data_format=InputFormat.JDBC.value, 127 read_type=ReadType.BATCH.value, 128 jdbc_args=jdbc_args, 129 ) 130 ) 131 changelog_table = ( 132 f'{self._BW_EXTRACTION.sap_bw_schema}."{changelog_df.first()[0]}"' 133 if self._BW_EXTRACTION.sap_bw_schema 134 else str(changelog_df.first()[0]) 135 ) 136 else: 137 changelog_table = ( 138 self._BW_EXTRACTION.changelog_table 139 if self._BW_EXTRACTION.changelog_table 140 else f"{self._BW_EXTRACTION.dbtable}_cl" 141 ) 142 self._LOGGER.info(f"The changelog table derived is: '{changelog_table}'") 143 144 return changelog_table
Get the changelog table, given an odsobject.
Returns:
String to use as changelog_table.
@staticmethod
def
get_odsobject(input_spec_opt: dict) -> str:
146 @staticmethod 147 def get_odsobject(input_spec_opt: dict) -> str: 148 """Get the odsobject based on the provided options. 149 150 With the table name we may also get the db name, so we need to split. 151 Moreover, there might be the need for people to specify odsobject if 152 it is different from the dbtable. 153 154 Args: 155 input_spec_opt: options from the input_spec. 156 157 Returns: 158 A string with the odsobject. 159 """ 160 return str( 161 input_spec_opt["dbtable"].split(".")[1] 162 if len(input_spec_opt["dbtable"].split(".")) > 1 163 else input_spec_opt["dbtable"] 164 )
Get the odsobject based on the provided options.
With the table name we may also get the db name, so we need to split. Moreover, there might be the need for people to specify odsobject if it is different from the dbtable.
Arguments:
- input_spec_opt: options from the input_spec.
Returns:
A string with the odsobject.