lakehouse_engine.utils.extraction.sap_b4_extraction_utils
Utilities module for SAP B4 extraction processes.
1"""Utilities module for SAP B4 extraction processes.""" 2 3import re 4from dataclasses import dataclass 5from enum import Enum 6from logging import Logger 7from typing import Any, Optional, Tuple 8 9from lakehouse_engine.core.definitions import InputSpec, ReadType 10from lakehouse_engine.transformers.aggregators import Aggregators 11from lakehouse_engine.utils.extraction.jdbc_extraction_utils import ( 12 JDBCExtraction, 13 JDBCExtractionUtils, 14) 15from lakehouse_engine.utils.logging_handler import LoggingHandler 16 17 18class ADSOTypes(Enum): 19 """Standardise the types of ADSOs we can have for Extractions from SAP B4.""" 20 21 AQ: str = "AQ" 22 CL: str = "CL" 23 SUPPORTED_TYPES: list = [AQ, CL] 24 25 26@dataclass 27class SAPB4Extraction(JDBCExtraction): 28 """Configurations available for an Extraction from SAP B4. 29 30 It inherits from JDBCExtraction configurations, so it can use 31 and/or overwrite those configurations. 32 33 These configurations cover: 34 - latest_timestamp_input_col: the column containing the request timestamps 35 in the dataset in latest_timestamp_data_location. Default: REQTSN. 36 - request_status_tbl: the name of the SAP B4 table having information 37 about the extraction requests. Composed of database.table. 38 Default: SAPHANADB.RSPMREQUEST. 39 - request_col_name: name of the column having the request timestamp to join 40 with the request status table. Default: REQUEST_TSN. 41 - data_target: the data target to extract from. User in the join operation with 42 the request status table. 43 - act_req_join_condition: the join condition into activation table 44 can be changed using this property. 45 Default: 'tbl.reqtsn = req.request_col_name'. 46 - include_changelog_tech_cols: whether to include the technical columns 47 (usually coming from the changelog) table or not. 48 - extra_cols_req_status_tbl: columns to be added from request status table. 49 It needs to contain the prefix "req.". E.g. "req.col1 as column_one, 50 req.col2 as column_two". 51 - request_status_tbl_filter: filter to use for filtering the request status table, 52 influencing the calculation of the max timestamps and the delta extractions. 53 - adso_type: the type of ADSO that you are extracting from. Can be "AQ" or "CL". 54 - max_timestamp_custom_schema: the custom schema to apply on the calculation of 55 the max timestamp to consider for the delta extractions. 56 Default: timestamp DECIMAL(23,0). 57 - default_max_timestamp: the timestamp to use as default, when it is not possible 58 to derive one. 59 - custom_schema: specify custom_schema for particular columns of the 60 returned dataframe in the init/delta extraction of the source table. 61 """ 62 63 latest_timestamp_input_col: str = "REQTSN" 64 request_status_tbl: str = "SAPHANADB.RSPMREQUEST" 65 request_col_name: str = "REQUEST_TSN" 66 data_target: Optional[str] = None 67 act_req_join_condition: Optional[str] = None 68 include_changelog_tech_cols: Optional[bool] = None 69 extra_cols_req_status_tbl: Optional[str] = None 70 request_status_tbl_filter: Optional[str] = None 71 adso_type: Optional[str] = None 72 max_timestamp_custom_schema: str = "timestamp DECIMAL(23,0)" 73 default_max_timestamp: str = "1970000000000000000000" 74 custom_schema: str = "REQTSN DECIMAL(23,0)" 75 76 77class SAPB4ExtractionUtils(JDBCExtractionUtils): 78 """Utils for managing data extraction from SAP B4.""" 79 80 def __init__(self, sap_b4_extraction: SAPB4Extraction): 81 """Construct SAPB4ExtractionUtils. 82 83 Args: 84 sap_b4_extraction: SAP B4 Extraction configurations. 85 """ 86 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 87 self._B4_EXTRACTION = sap_b4_extraction 88 self._B4_EXTRACTION.request_status_tbl_filter = ( 89 self._get_req_status_tbl_filter() 90 ) 91 self._MAX_TIMESTAMP_QUERY = f""" --# nosec 92 (SELECT COALESCE(MAX({self._B4_EXTRACTION.request_col_name}), 93 {self._B4_EXTRACTION.default_max_timestamp}) as timestamp 94 FROM {self._B4_EXTRACTION.request_status_tbl} 95 WHERE {self._B4_EXTRACTION.request_status_tbl_filter}) 96 """ # nosec: B608 97 super().__init__(sap_b4_extraction) 98 99 @staticmethod 100 def get_data_target(input_spec_opt: dict) -> str: 101 """Get the data_target from the data_target option or derive it. 102 103 By definition data_target is the same for the table and changelog table and 104 is the same string ignoring everything before / and the first and last 105 character after /. E.g. for a dbtable /BIC/abtable12, the data_target 106 would be btable1. 107 108 Args: 109 input_spec_opt: options from the input_spec. 110 111 Returns: 112 A string with the data_target. 113 """ 114 exclude_chars = """["'\\\\]""" 115 data_target: str = input_spec_opt.get( 116 "data_target", 117 re.sub(exclude_chars, "", input_spec_opt["dbtable"]).split("/")[-1][1:-1], 118 ) 119 120 return data_target 121 122 def _get_init_query(self) -> Tuple[str, str]: 123 """Get a query to do an init load based on a ADSO on a SAP B4 system. 124 125 Returns: 126 A query to submit to SAP B4 for the initial data extraction. The query 127 is enclosed in parentheses so that Spark treats it as a table and supports 128 it in the dbtable option. 129 """ 130 extraction_query = self._get_init_extraction_query() 131 132 predicates_query = f""" 133 (SELECT DISTINCT({self._B4_EXTRACTION.partition_column}) 134 FROM {self._B4_EXTRACTION.dbtable} t) 135 """ # nosec: B608 136 137 return extraction_query, predicates_query 138 139 def _get_init_extraction_query(self) -> str: 140 """Get the init extraction query based on current timestamp. 141 142 Returns: 143 A query to submit to SAP B4 for the initial data extraction. 144 """ 145 changelog_tech_cols = ( 146 f"""{self._B4_EXTRACTION.extraction_timestamp}000000000 AS reqtsn, 147 '0' AS datapakid, 148 0 AS record,""" 149 if self._B4_EXTRACTION.include_changelog_tech_cols 150 else "" 151 ) 152 153 extraction_query = f""" 154 (SELECT t.*, {changelog_tech_cols} 155 CAST({self._B4_EXTRACTION.extraction_timestamp} 156 AS DECIMAL(15,0)) AS extraction_start_timestamp 157 FROM {self._B4_EXTRACTION.dbtable} t 158 )""" # nosec: B608 159 160 return extraction_query 161 162 def _get_delta_query(self) -> Tuple[str, str]: 163 """Get a delta query for an SAP B4 ADSO. 164 165 An SAP B4 ADSO requires a join with a special type of table often called 166 requests status table (RSPMREQUEST), in which B4 tracks down the timestamps, 167 status and metrics associated with the several data loads that were performed 168 into B4. Depending on the type of ADSO (AQ or CL) the join condition and also 169 the ADSO/table to consider to extract from will be different. 170 For AQ types, there is only the active table, from which we extract both inits 171 and deltas and this is also the table used to join with RSPMREQUEST to derive 172 the next portion of the data to extract. 173 For the CL types, we have an active table/adso from which we extract the init 174 and one changelog table from which we extract the delta portions of data. 175 Depending, if it is an init or delta one table or the other is also used to join 176 with RSPMREQUEST. 177 178 The logic on this function basically ensures that we are reading from the source 179 table considering the data that has arrived between the maximum timestamp that 180 is available in our target destination and the max timestamp of the extractions 181 performed and registered in the RSPMREQUEST table, which follow the filtering 182 criteria. 183 184 Returns: 185 A query to submit to SAP B4 for the delta data extraction. The query 186 is enclosed in parentheses so that Spark treats it as a table and supports 187 it in the dbtable option. 188 """ 189 if not self._B4_EXTRACTION.min_timestamp: 190 from lakehouse_engine.io.reader_factory import ReaderFactory 191 192 latest_timestamp_data_df = ReaderFactory.get_data( 193 InputSpec( 194 spec_id="data_with_latest_timestamp", 195 data_format=self._B4_EXTRACTION.latest_timestamp_data_format, 196 read_type=ReadType.BATCH.value, 197 location=self._B4_EXTRACTION.latest_timestamp_data_location, 198 ) 199 ) 200 min_timestamp = latest_timestamp_data_df.transform( 201 Aggregators.get_max_value( 202 self._B4_EXTRACTION.latest_timestamp_input_col 203 ) 204 ).first()[0] 205 else: 206 min_timestamp = self._B4_EXTRACTION.min_timestamp 207 208 max_timestamp = ( 209 self._B4_EXTRACTION.max_timestamp 210 if self._B4_EXTRACTION.max_timestamp 211 else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY) 212 ) 213 214 if self._B4_EXTRACTION.act_req_join_condition: 215 join_condition = f"{self._B4_EXTRACTION.act_req_join_condition}" 216 else: 217 join_condition = f"tbl.reqtsn = req.{self._B4_EXTRACTION.request_col_name}" 218 219 base_query = f""" --# nosec 220 FROM {self._B4_EXTRACTION.changelog_table} AS tbl 221 JOIN {self._B4_EXTRACTION.request_status_tbl} AS req 222 ON {join_condition} 223 WHERE {self._B4_EXTRACTION.request_status_tbl_filter} 224 AND req.{self._B4_EXTRACTION.request_col_name} > {min_timestamp} 225 AND req.{self._B4_EXTRACTION.request_col_name} <= {max_timestamp}) 226 """ 227 228 main_cols = f""" 229 (SELECT tbl.*, 230 CAST({self._B4_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0)) 231 AS extraction_start_timestamp 232 """ 233 234 # We join the main columns considered for the extraction with 235 # extra_cols_act_request that people might want to use, filtering to only 236 # add the comma and join the strings, in case extra_cols_act_request is 237 # not None or empty. 238 extraction_query_cols = ",".join( 239 filter(None, [main_cols, self._B4_EXTRACTION.extra_cols_req_status_tbl]) 240 ) 241 242 extraction_query = extraction_query_cols + base_query 243 244 predicates_query = f""" 245 (SELECT DISTINCT({self._B4_EXTRACTION.partition_column}) 246 {base_query} 247 """ 248 249 return extraction_query, predicates_query 250 251 def _get_req_status_tbl_filter(self) -> Any: 252 if self._B4_EXTRACTION.request_status_tbl_filter: 253 return self._B4_EXTRACTION.request_status_tbl_filter 254 else: 255 if self._B4_EXTRACTION.adso_type == ADSOTypes.AQ.value: 256 return f""" 257 STORAGE = 'AQ' AND REQUEST_IS_IN_PROCESS = 'N' AND 258 LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG', 'GR') 259 AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}') 260 """ 261 elif self._B4_EXTRACTION.adso_type == ADSOTypes.CL.value: 262 return f""" 263 STORAGE = 'AT' AND REQUEST_IS_IN_PROCESS = 'N' AND 264 LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG') 265 AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}') 266 """ 267 else: 268 raise NotImplementedError( 269 f"The requested ADSO Type is not fully implemented and/or tested." 270 f"Supported ADSO Types: {ADSOTypes.SUPPORTED_TYPES}" 271 )
class
ADSOTypes(enum.Enum):
19class ADSOTypes(Enum): 20 """Standardise the types of ADSOs we can have for Extractions from SAP B4.""" 21 22 AQ: str = "AQ" 23 CL: str = "CL" 24 SUPPORTED_TYPES: list = [AQ, CL]
Standardise the types of ADSOs we can have for Extractions from SAP B4.
Inherited Members
- enum.Enum
- name
- value
@dataclass
class
SAPB4Extraction27@dataclass 28class SAPB4Extraction(JDBCExtraction): 29 """Configurations available for an Extraction from SAP B4. 30 31 It inherits from JDBCExtraction configurations, so it can use 32 and/or overwrite those configurations. 33 34 These configurations cover: 35 - latest_timestamp_input_col: the column containing the request timestamps 36 in the dataset in latest_timestamp_data_location. Default: REQTSN. 37 - request_status_tbl: the name of the SAP B4 table having information 38 about the extraction requests. Composed of database.table. 39 Default: SAPHANADB.RSPMREQUEST. 40 - request_col_name: name of the column having the request timestamp to join 41 with the request status table. Default: REQUEST_TSN. 42 - data_target: the data target to extract from. User in the join operation with 43 the request status table. 44 - act_req_join_condition: the join condition into activation table 45 can be changed using this property. 46 Default: 'tbl.reqtsn = req.request_col_name'. 47 - include_changelog_tech_cols: whether to include the technical columns 48 (usually coming from the changelog) table or not. 49 - extra_cols_req_status_tbl: columns to be added from request status table. 50 It needs to contain the prefix "req.". E.g. "req.col1 as column_one, 51 req.col2 as column_two". 52 - request_status_tbl_filter: filter to use for filtering the request status table, 53 influencing the calculation of the max timestamps and the delta extractions. 54 - adso_type: the type of ADSO that you are extracting from. Can be "AQ" or "CL". 55 - max_timestamp_custom_schema: the custom schema to apply on the calculation of 56 the max timestamp to consider for the delta extractions. 57 Default: timestamp DECIMAL(23,0). 58 - default_max_timestamp: the timestamp to use as default, when it is not possible 59 to derive one. 60 - custom_schema: specify custom_schema for particular columns of the 61 returned dataframe in the init/delta extraction of the source table. 62 """ 63 64 latest_timestamp_input_col: str = "REQTSN" 65 request_status_tbl: str = "SAPHANADB.RSPMREQUEST" 66 request_col_name: str = "REQUEST_TSN" 67 data_target: Optional[str] = None 68 act_req_join_condition: Optional[str] = None 69 include_changelog_tech_cols: Optional[bool] = None 70 extra_cols_req_status_tbl: Optional[str] = None 71 request_status_tbl_filter: Optional[str] = None 72 adso_type: Optional[str] = None 73 max_timestamp_custom_schema: str = "timestamp DECIMAL(23,0)" 74 default_max_timestamp: str = "1970000000000000000000" 75 custom_schema: str = "REQTSN DECIMAL(23,0)"
Configurations available for an Extraction from SAP B4.
It inherits from JDBCExtraction configurations, so it can use and/or overwrite those configurations.
These configurations cover:
- latest_timestamp_input_col: the column containing the request timestamps in the dataset in latest_timestamp_data_location. Default: REQTSN.
- request_status_tbl: the name of the SAP B4 table having information about the extraction requests. Composed of database.table. Default: SAPHANADB.RSPMREQUEST.
- request_col_name: name of the column having the request timestamp to join with the request status table. Default: REQUEST_TSN.
- data_target: the data target to extract from. User in the join operation with the request status table.
- act_req_join_condition: the join condition into activation table can be changed using this property. Default: 'tbl.reqtsn = req.request_col_name'.
- include_changelog_tech_cols: whether to include the technical columns (usually coming from the changelog) table or not.
- extra_cols_req_status_tbl: columns to be added from request status table. It needs to contain the prefix "req.". E.g. "req.col1 as column_one, req.col2 as column_two".
- request_status_tbl_filter: filter to use for filtering the request status table, influencing the calculation of the max timestamps and the delta extractions.
- adso_type: the type of ADSO that you are extracting from. Can be "AQ" or "CL".
- max_timestamp_custom_schema: the custom schema to apply on the calculation of the max timestamp to consider for the delta extractions. Default: timestamp DECIMAL(23,0).
- default_max_timestamp: the timestamp to use as default, when it is not possible to derive one.
- custom_schema: specify custom_schema for particular columns of the returned dataframe in the init/delta extraction of the source table.
SAPB4Extraction( user: str, password: str, url: str, dbtable: str, calc_upper_bound_schema: Optional[str] = None, changelog_table: Optional[str] = None, partition_column: Optional[str] = None, latest_timestamp_data_location: Optional[str] = None, latest_timestamp_data_format: str = 'delta', extraction_type: str = 'delta', driver: str = 'com.sap.db.jdbc.Driver', num_partitions: Optional[int] = None, lower_bound: Union[int, float, str, NoneType] = None, upper_bound: Union[int, float, str, NoneType] = None, default_upper_bound: str = '1', fetch_size: str = '100000', compress: bool = True, custom_schema: str = 'REQTSN DECIMAL(23,0)', min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None, generate_predicates: bool = False, predicates: Optional[List] = None, predicates_add_null: bool = True, extraction_timestamp: str = '20241028145833', max_timestamp_custom_schema: str = 'timestamp DECIMAL(23,0)', latest_timestamp_input_col: str = 'REQTSN', request_status_tbl: str = 'SAPHANADB.RSPMREQUEST', request_col_name: str = 'REQUEST_TSN', data_target: Optional[str] = None, act_req_join_condition: Optional[str] = None, include_changelog_tech_cols: Optional[bool] = None, extra_cols_req_status_tbl: Optional[str] = None, request_status_tbl_filter: Optional[str] = None, adso_type: Optional[str] = None, default_max_timestamp: str = '1970000000000000000000')
Inherited Members
- lakehouse_engine.utils.extraction.jdbc_extraction_utils.JDBCExtraction
- user
- password
- url
- dbtable
- calc_upper_bound_schema
- changelog_table
- partition_column
- latest_timestamp_data_location
- latest_timestamp_data_format
- extraction_type
- driver
- num_partitions
- lower_bound
- upper_bound
- default_upper_bound
- fetch_size
- compress
- min_timestamp
- max_timestamp
- generate_predicates
- predicates
- predicates_add_null
- extraction_timestamp
class
SAPB4ExtractionUtils(lakehouse_engine.utils.extraction.jdbc_extraction_utils.JDBCExtractionUtils):
78class SAPB4ExtractionUtils(JDBCExtractionUtils): 79 """Utils for managing data extraction from SAP B4.""" 80 81 def __init__(self, sap_b4_extraction: SAPB4Extraction): 82 """Construct SAPB4ExtractionUtils. 83 84 Args: 85 sap_b4_extraction: SAP B4 Extraction configurations. 86 """ 87 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 88 self._B4_EXTRACTION = sap_b4_extraction 89 self._B4_EXTRACTION.request_status_tbl_filter = ( 90 self._get_req_status_tbl_filter() 91 ) 92 self._MAX_TIMESTAMP_QUERY = f""" --# nosec 93 (SELECT COALESCE(MAX({self._B4_EXTRACTION.request_col_name}), 94 {self._B4_EXTRACTION.default_max_timestamp}) as timestamp 95 FROM {self._B4_EXTRACTION.request_status_tbl} 96 WHERE {self._B4_EXTRACTION.request_status_tbl_filter}) 97 """ # nosec: B608 98 super().__init__(sap_b4_extraction) 99 100 @staticmethod 101 def get_data_target(input_spec_opt: dict) -> str: 102 """Get the data_target from the data_target option or derive it. 103 104 By definition data_target is the same for the table and changelog table and 105 is the same string ignoring everything before / and the first and last 106 character after /. E.g. for a dbtable /BIC/abtable12, the data_target 107 would be btable1. 108 109 Args: 110 input_spec_opt: options from the input_spec. 111 112 Returns: 113 A string with the data_target. 114 """ 115 exclude_chars = """["'\\\\]""" 116 data_target: str = input_spec_opt.get( 117 "data_target", 118 re.sub(exclude_chars, "", input_spec_opt["dbtable"]).split("/")[-1][1:-1], 119 ) 120 121 return data_target 122 123 def _get_init_query(self) -> Tuple[str, str]: 124 """Get a query to do an init load based on a ADSO on a SAP B4 system. 125 126 Returns: 127 A query to submit to SAP B4 for the initial data extraction. The query 128 is enclosed in parentheses so that Spark treats it as a table and supports 129 it in the dbtable option. 130 """ 131 extraction_query = self._get_init_extraction_query() 132 133 predicates_query = f""" 134 (SELECT DISTINCT({self._B4_EXTRACTION.partition_column}) 135 FROM {self._B4_EXTRACTION.dbtable} t) 136 """ # nosec: B608 137 138 return extraction_query, predicates_query 139 140 def _get_init_extraction_query(self) -> str: 141 """Get the init extraction query based on current timestamp. 142 143 Returns: 144 A query to submit to SAP B4 for the initial data extraction. 145 """ 146 changelog_tech_cols = ( 147 f"""{self._B4_EXTRACTION.extraction_timestamp}000000000 AS reqtsn, 148 '0' AS datapakid, 149 0 AS record,""" 150 if self._B4_EXTRACTION.include_changelog_tech_cols 151 else "" 152 ) 153 154 extraction_query = f""" 155 (SELECT t.*, {changelog_tech_cols} 156 CAST({self._B4_EXTRACTION.extraction_timestamp} 157 AS DECIMAL(15,0)) AS extraction_start_timestamp 158 FROM {self._B4_EXTRACTION.dbtable} t 159 )""" # nosec: B608 160 161 return extraction_query 162 163 def _get_delta_query(self) -> Tuple[str, str]: 164 """Get a delta query for an SAP B4 ADSO. 165 166 An SAP B4 ADSO requires a join with a special type of table often called 167 requests status table (RSPMREQUEST), in which B4 tracks down the timestamps, 168 status and metrics associated with the several data loads that were performed 169 into B4. Depending on the type of ADSO (AQ or CL) the join condition and also 170 the ADSO/table to consider to extract from will be different. 171 For AQ types, there is only the active table, from which we extract both inits 172 and deltas and this is also the table used to join with RSPMREQUEST to derive 173 the next portion of the data to extract. 174 For the CL types, we have an active table/adso from which we extract the init 175 and one changelog table from which we extract the delta portions of data. 176 Depending, if it is an init or delta one table or the other is also used to join 177 with RSPMREQUEST. 178 179 The logic on this function basically ensures that we are reading from the source 180 table considering the data that has arrived between the maximum timestamp that 181 is available in our target destination and the max timestamp of the extractions 182 performed and registered in the RSPMREQUEST table, which follow the filtering 183 criteria. 184 185 Returns: 186 A query to submit to SAP B4 for the delta data extraction. The query 187 is enclosed in parentheses so that Spark treats it as a table and supports 188 it in the dbtable option. 189 """ 190 if not self._B4_EXTRACTION.min_timestamp: 191 from lakehouse_engine.io.reader_factory import ReaderFactory 192 193 latest_timestamp_data_df = ReaderFactory.get_data( 194 InputSpec( 195 spec_id="data_with_latest_timestamp", 196 data_format=self._B4_EXTRACTION.latest_timestamp_data_format, 197 read_type=ReadType.BATCH.value, 198 location=self._B4_EXTRACTION.latest_timestamp_data_location, 199 ) 200 ) 201 min_timestamp = latest_timestamp_data_df.transform( 202 Aggregators.get_max_value( 203 self._B4_EXTRACTION.latest_timestamp_input_col 204 ) 205 ).first()[0] 206 else: 207 min_timestamp = self._B4_EXTRACTION.min_timestamp 208 209 max_timestamp = ( 210 self._B4_EXTRACTION.max_timestamp 211 if self._B4_EXTRACTION.max_timestamp 212 else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY) 213 ) 214 215 if self._B4_EXTRACTION.act_req_join_condition: 216 join_condition = f"{self._B4_EXTRACTION.act_req_join_condition}" 217 else: 218 join_condition = f"tbl.reqtsn = req.{self._B4_EXTRACTION.request_col_name}" 219 220 base_query = f""" --# nosec 221 FROM {self._B4_EXTRACTION.changelog_table} AS tbl 222 JOIN {self._B4_EXTRACTION.request_status_tbl} AS req 223 ON {join_condition} 224 WHERE {self._B4_EXTRACTION.request_status_tbl_filter} 225 AND req.{self._B4_EXTRACTION.request_col_name} > {min_timestamp} 226 AND req.{self._B4_EXTRACTION.request_col_name} <= {max_timestamp}) 227 """ 228 229 main_cols = f""" 230 (SELECT tbl.*, 231 CAST({self._B4_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0)) 232 AS extraction_start_timestamp 233 """ 234 235 # We join the main columns considered for the extraction with 236 # extra_cols_act_request that people might want to use, filtering to only 237 # add the comma and join the strings, in case extra_cols_act_request is 238 # not None or empty. 239 extraction_query_cols = ",".join( 240 filter(None, [main_cols, self._B4_EXTRACTION.extra_cols_req_status_tbl]) 241 ) 242 243 extraction_query = extraction_query_cols + base_query 244 245 predicates_query = f""" 246 (SELECT DISTINCT({self._B4_EXTRACTION.partition_column}) 247 {base_query} 248 """ 249 250 return extraction_query, predicates_query 251 252 def _get_req_status_tbl_filter(self) -> Any: 253 if self._B4_EXTRACTION.request_status_tbl_filter: 254 return self._B4_EXTRACTION.request_status_tbl_filter 255 else: 256 if self._B4_EXTRACTION.adso_type == ADSOTypes.AQ.value: 257 return f""" 258 STORAGE = 'AQ' AND REQUEST_IS_IN_PROCESS = 'N' AND 259 LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG', 'GR') 260 AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}') 261 """ 262 elif self._B4_EXTRACTION.adso_type == ADSOTypes.CL.value: 263 return f""" 264 STORAGE = 'AT' AND REQUEST_IS_IN_PROCESS = 'N' AND 265 LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG') 266 AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}') 267 """ 268 else: 269 raise NotImplementedError( 270 f"The requested ADSO Type is not fully implemented and/or tested." 271 f"Supported ADSO Types: {ADSOTypes.SUPPORTED_TYPES}" 272 )
Utils for managing data extraction from SAP B4.
SAPB4ExtractionUtils( sap_b4_extraction: SAPB4Extraction)
81 def __init__(self, sap_b4_extraction: SAPB4Extraction): 82 """Construct SAPB4ExtractionUtils. 83 84 Args: 85 sap_b4_extraction: SAP B4 Extraction configurations. 86 """ 87 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 88 self._B4_EXTRACTION = sap_b4_extraction 89 self._B4_EXTRACTION.request_status_tbl_filter = ( 90 self._get_req_status_tbl_filter() 91 ) 92 self._MAX_TIMESTAMP_QUERY = f""" --# nosec 93 (SELECT COALESCE(MAX({self._B4_EXTRACTION.request_col_name}), 94 {self._B4_EXTRACTION.default_max_timestamp}) as timestamp 95 FROM {self._B4_EXTRACTION.request_status_tbl} 96 WHERE {self._B4_EXTRACTION.request_status_tbl_filter}) 97 """ # nosec: B608 98 super().__init__(sap_b4_extraction)
Construct SAPB4ExtractionUtils.
Arguments:
- sap_b4_extraction: SAP B4 Extraction configurations.
@staticmethod
def
get_data_target(input_spec_opt: dict) -> str:
100 @staticmethod 101 def get_data_target(input_spec_opt: dict) -> str: 102 """Get the data_target from the data_target option or derive it. 103 104 By definition data_target is the same for the table and changelog table and 105 is the same string ignoring everything before / and the first and last 106 character after /. E.g. for a dbtable /BIC/abtable12, the data_target 107 would be btable1. 108 109 Args: 110 input_spec_opt: options from the input_spec. 111 112 Returns: 113 A string with the data_target. 114 """ 115 exclude_chars = """["'\\\\]""" 116 data_target: str = input_spec_opt.get( 117 "data_target", 118 re.sub(exclude_chars, "", input_spec_opt["dbtable"]).split("/")[-1][1:-1], 119 ) 120 121 return data_target
Get the data_target from the data_target option or derive it.
By definition data_target is the same for the table and changelog table and is the same string ignoring everything before / and the first and last character after /. E.g. for a dbtable /BIC/abtable12, the data_target would be btable1.
Arguments:
- input_spec_opt: options from the input_spec.
Returns:
A string with the data_target.