lakehouse_engine.utils.extraction.sap_b4_extraction_utils

Utilities module for SAP B4 extraction processes.

  1"""Utilities module for SAP B4 extraction processes."""
  2
  3import re
  4from dataclasses import dataclass
  5from enum import Enum
  6from logging import Logger
  7from typing import Any, Optional, Tuple
  8
  9from lakehouse_engine.core.definitions import InputSpec, ReadType
 10from lakehouse_engine.transformers.aggregators import Aggregators
 11from lakehouse_engine.utils.extraction.jdbc_extraction_utils import (
 12    JDBCExtraction,
 13    JDBCExtractionUtils,
 14)
 15from lakehouse_engine.utils.logging_handler import LoggingHandler
 16
 17
 18class ADSOTypes(Enum):
 19    """Standardise the types of ADSOs we can have for Extractions from SAP B4."""
 20
 21    AQ: str = "AQ"
 22    CL: str = "CL"
 23    SUPPORTED_TYPES: list = [AQ, CL]
 24
 25
 26@dataclass
 27class SAPB4Extraction(JDBCExtraction):
 28    """Configurations available for an Extraction from SAP B4.
 29
 30    It inherits from JDBCExtraction configurations, so it can use
 31    and/or overwrite those configurations.
 32
 33    These configurations cover:
 34    - latest_timestamp_input_col: the column containing the request timestamps
 35        in the dataset in latest_timestamp_data_location. Default: REQTSN.
 36    - request_status_tbl: the name of the SAP B4 table having information
 37        about the extraction requests. Composed of database.table.
 38        Default: SAPHANADB.RSPMREQUEST.
 39    - request_col_name: name of the column having the request timestamp to join
 40        with the request status table. Default: REQUEST_TSN.
 41    - data_target: the data target to extract from. User in the join operation with
 42        the request status table.
 43    - act_req_join_condition: the join condition into activation table
 44        can be changed using this property.
 45        Default: 'tbl.reqtsn = req.request_col_name'.
 46    - include_changelog_tech_cols: whether to include the technical columns
 47        (usually coming from the changelog) table or not.
 48    - extra_cols_req_status_tbl: columns to be added from request status table.
 49        It needs to contain the prefix "req.". E.g. "req.col1 as column_one,
 50        req.col2 as column_two".
 51    - request_status_tbl_filter: filter to use for filtering the request status table,
 52        influencing the calculation of the max timestamps and the delta extractions.
 53    - adso_type: the type of ADSO that you are extracting from. Can be "AQ" or "CL".
 54    - max_timestamp_custom_schema: the custom schema to apply on the calculation of
 55        the max timestamp to consider for the delta extractions.
 56        Default: timestamp DECIMAL(23,0).
 57    - default_max_timestamp: the timestamp to use as default, when it is not possible
 58        to derive one.
 59    - custom_schema: specify custom_schema for particular columns of the
 60        returned dataframe in the init/delta extraction of the source table.
 61    """
 62
 63    latest_timestamp_input_col: str = "REQTSN"
 64    request_status_tbl: str = "SAPHANADB.RSPMREQUEST"
 65    request_col_name: str = "REQUEST_TSN"
 66    data_target: Optional[str] = None
 67    act_req_join_condition: Optional[str] = None
 68    include_changelog_tech_cols: Optional[bool] = None
 69    extra_cols_req_status_tbl: Optional[str] = None
 70    request_status_tbl_filter: Optional[str] = None
 71    adso_type: Optional[str] = None
 72    max_timestamp_custom_schema: str = "timestamp DECIMAL(23,0)"
 73    default_max_timestamp: str = "1970000000000000000000"
 74    custom_schema: str = "REQTSN DECIMAL(23,0)"
 75
 76
 77class SAPB4ExtractionUtils(JDBCExtractionUtils):
 78    """Utils for managing data extraction from SAP B4."""
 79
 80    def __init__(self, sap_b4_extraction: SAPB4Extraction):
 81        """Construct SAPB4ExtractionUtils.
 82
 83        Args:
 84            sap_b4_extraction: SAP B4 Extraction configurations.
 85        """
 86        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
 87        self._B4_EXTRACTION = sap_b4_extraction
 88        self._B4_EXTRACTION.request_status_tbl_filter = (
 89            self._get_req_status_tbl_filter()
 90        )
 91        self._MAX_TIMESTAMP_QUERY = f""" --# nosec
 92                (SELECT COALESCE(MAX({self._B4_EXTRACTION.request_col_name}),
 93                    {self._B4_EXTRACTION.default_max_timestamp}) as timestamp
 94                FROM {self._B4_EXTRACTION.request_status_tbl}
 95                WHERE {self._B4_EXTRACTION.request_status_tbl_filter})
 96            """  # nosec: B608
 97        super().__init__(sap_b4_extraction)
 98
 99    @staticmethod
100    def get_data_target(input_spec_opt: dict) -> str:
101        """Get the data_target from the data_target option or derive it.
102
103        By definition data_target is the same for the table and changelog table and
104        is the same string ignoring everything before / and the first and last
105        character after /. E.g. for a dbtable /BIC/abtable12, the data_target
106        would be btable1.
107
108        Args:
109            input_spec_opt: options from the input_spec.
110
111        Returns:
112            A string with the data_target.
113        """
114        exclude_chars = """["'\\\\]"""
115        data_target: str = input_spec_opt.get(
116            "data_target",
117            re.sub(exclude_chars, "", input_spec_opt["dbtable"]).split("/")[-1][1:-1],
118        )
119
120        return data_target
121
122    def _get_init_query(self) -> Tuple[str, str]:
123        """Get a query to do an init load based on a ADSO on a SAP B4 system.
124
125        Returns:
126            A query to submit to SAP B4 for the initial data extraction. The query
127            is enclosed in parentheses so that Spark treats it as a table and supports
128            it in the dbtable option.
129        """
130        extraction_query = self._get_init_extraction_query()
131
132        predicates_query = f"""
133        (SELECT DISTINCT({self._B4_EXTRACTION.partition_column})
134        FROM {self._B4_EXTRACTION.dbtable} t)
135        """  # nosec: B608
136
137        return extraction_query, predicates_query
138
139    def _get_init_extraction_query(self) -> str:
140        """Get the init extraction query based on current timestamp.
141
142        Returns:
143            A query to submit to SAP B4 for the initial data extraction.
144        """
145        changelog_tech_cols = (
146            f"""{self._B4_EXTRACTION.extraction_timestamp}000000000 AS reqtsn,
147                '0' AS datapakid,
148                0 AS record,"""
149            if self._B4_EXTRACTION.include_changelog_tech_cols
150            else ""
151        )
152
153        extraction_query = f"""
154                (SELECT t.*, {changelog_tech_cols}
155                    CAST({self._B4_EXTRACTION.extraction_timestamp}
156                        AS DECIMAL(15,0)) AS extraction_start_timestamp
157                FROM {self._B4_EXTRACTION.dbtable} t
158                )"""  # nosec: B608
159
160        return extraction_query
161
162    def _get_delta_query(self) -> Tuple[str, str]:
163        """Get a delta query for an SAP B4 ADSO.
164
165        An SAP B4 ADSO requires a join with a special type of table often called
166        requests status table (RSPMREQUEST), in which B4 tracks down the timestamps,
167        status and metrics associated with the several data loads that were performed
168        into B4. Depending on the type of ADSO (AQ or CL) the join condition and also
169        the ADSO/table to consider to extract from will be different.
170        For AQ types, there is only the active table, from which we extract both inits
171        and deltas and this is also the table used to join with RSPMREQUEST to derive
172        the next portion of the data to extract.
173        For the CL types, we have an active table/adso from which we extract the init
174        and one changelog table from which we extract the delta portions of data.
175        Depending, if it is an init or delta one table or the other is also used to join
176        with RSPMREQUEST.
177
178        The logic on this function basically ensures that we are reading from the source
179        table considering the data that has arrived between the maximum timestamp that
180        is available in our target destination and the max timestamp of the extractions
181        performed and registered in the RSPMREQUEST table, which follow the filtering
182         criteria.
183
184        Returns:
185            A query to submit to SAP B4 for the delta data extraction. The query
186            is enclosed in parentheses so that Spark treats it as a table and supports
187            it in the dbtable option.
188        """
189        if not self._B4_EXTRACTION.min_timestamp:
190            from lakehouse_engine.io.reader_factory import ReaderFactory
191
192            latest_timestamp_data_df = ReaderFactory.get_data(
193                InputSpec(
194                    spec_id="data_with_latest_timestamp",
195                    data_format=self._B4_EXTRACTION.latest_timestamp_data_format,
196                    read_type=ReadType.BATCH.value,
197                    location=self._B4_EXTRACTION.latest_timestamp_data_location,
198                )
199            )
200            min_timestamp = latest_timestamp_data_df.transform(
201                Aggregators.get_max_value(
202                    self._B4_EXTRACTION.latest_timestamp_input_col
203                )
204            ).first()[0]
205        else:
206            min_timestamp = self._B4_EXTRACTION.min_timestamp
207
208        max_timestamp = (
209            self._B4_EXTRACTION.max_timestamp
210            if self._B4_EXTRACTION.max_timestamp
211            else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY)
212        )
213
214        if self._B4_EXTRACTION.act_req_join_condition:
215            join_condition = f"{self._B4_EXTRACTION.act_req_join_condition}"
216        else:
217            join_condition = f"tbl.reqtsn = req.{self._B4_EXTRACTION.request_col_name}"
218
219        base_query = f""" --# nosec
220        FROM {self._B4_EXTRACTION.changelog_table} AS tbl
221        JOIN {self._B4_EXTRACTION.request_status_tbl} AS req
222            ON {join_condition}
223        WHERE {self._B4_EXTRACTION.request_status_tbl_filter}
224            AND req.{self._B4_EXTRACTION.request_col_name} > {min_timestamp}
225            AND req.{self._B4_EXTRACTION.request_col_name} <= {max_timestamp})
226        """
227
228        main_cols = f"""
229            (SELECT tbl.*,
230                CAST({self._B4_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0))
231                    AS extraction_start_timestamp
232            """
233
234        # We join the main columns considered for the extraction with
235        # extra_cols_act_request that people might want to use, filtering to only
236        # add the comma and join the strings, in case extra_cols_act_request is
237        # not None or empty.
238        extraction_query_cols = ",".join(
239            filter(None, [main_cols, self._B4_EXTRACTION.extra_cols_req_status_tbl])
240        )
241
242        extraction_query = extraction_query_cols + base_query
243
244        predicates_query = f"""
245        (SELECT DISTINCT({self._B4_EXTRACTION.partition_column})
246        {base_query}
247        """
248
249        return extraction_query, predicates_query
250
251    def _get_req_status_tbl_filter(self) -> Any:
252        if self._B4_EXTRACTION.request_status_tbl_filter:
253            return self._B4_EXTRACTION.request_status_tbl_filter
254        else:
255            if self._B4_EXTRACTION.adso_type == ADSOTypes.AQ.value:
256                return f"""
257                    STORAGE = 'AQ' AND REQUEST_IS_IN_PROCESS = 'N' AND
258                    LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG', 'GR')
259                    AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}')
260                """
261            elif self._B4_EXTRACTION.adso_type == ADSOTypes.CL.value:
262                return f"""
263                    STORAGE = 'AT' AND REQUEST_IS_IN_PROCESS = 'N' AND
264                    LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG')
265                    AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}')
266                """
267            else:
268                raise NotImplementedError(
269                    f"The requested ADSO Type is not fully implemented and/or tested."
270                    f"Supported ADSO Types: {ADSOTypes.SUPPORTED_TYPES}"
271                )
class ADSOTypes(enum.Enum):
19class ADSOTypes(Enum):
20    """Standardise the types of ADSOs we can have for Extractions from SAP B4."""
21
22    AQ: str = "AQ"
23    CL: str = "CL"
24    SUPPORTED_TYPES: list = [AQ, CL]

Standardise the types of ADSOs we can have for Extractions from SAP B4.

AQ: str = <ADSOTypes.AQ: 'AQ'>
CL: str = <ADSOTypes.CL: 'CL'>
SUPPORTED_TYPES: list = <ADSOTypes.SUPPORTED_TYPES: ['AQ', 'CL']>
Inherited Members
enum.Enum
name
value
@dataclass
class SAPB4Extraction(lakehouse_engine.utils.extraction.jdbc_extraction_utils.JDBCExtraction):
27@dataclass
28class SAPB4Extraction(JDBCExtraction):
29    """Configurations available for an Extraction from SAP B4.
30
31    It inherits from JDBCExtraction configurations, so it can use
32    and/or overwrite those configurations.
33
34    These configurations cover:
35    - latest_timestamp_input_col: the column containing the request timestamps
36        in the dataset in latest_timestamp_data_location. Default: REQTSN.
37    - request_status_tbl: the name of the SAP B4 table having information
38        about the extraction requests. Composed of database.table.
39        Default: SAPHANADB.RSPMREQUEST.
40    - request_col_name: name of the column having the request timestamp to join
41        with the request status table. Default: REQUEST_TSN.
42    - data_target: the data target to extract from. User in the join operation with
43        the request status table.
44    - act_req_join_condition: the join condition into activation table
45        can be changed using this property.
46        Default: 'tbl.reqtsn = req.request_col_name'.
47    - include_changelog_tech_cols: whether to include the technical columns
48        (usually coming from the changelog) table or not.
49    - extra_cols_req_status_tbl: columns to be added from request status table.
50        It needs to contain the prefix "req.". E.g. "req.col1 as column_one,
51        req.col2 as column_two".
52    - request_status_tbl_filter: filter to use for filtering the request status table,
53        influencing the calculation of the max timestamps and the delta extractions.
54    - adso_type: the type of ADSO that you are extracting from. Can be "AQ" or "CL".
55    - max_timestamp_custom_schema: the custom schema to apply on the calculation of
56        the max timestamp to consider for the delta extractions.
57        Default: timestamp DECIMAL(23,0).
58    - default_max_timestamp: the timestamp to use as default, when it is not possible
59        to derive one.
60    - custom_schema: specify custom_schema for particular columns of the
61        returned dataframe in the init/delta extraction of the source table.
62    """
63
64    latest_timestamp_input_col: str = "REQTSN"
65    request_status_tbl: str = "SAPHANADB.RSPMREQUEST"
66    request_col_name: str = "REQUEST_TSN"
67    data_target: Optional[str] = None
68    act_req_join_condition: Optional[str] = None
69    include_changelog_tech_cols: Optional[bool] = None
70    extra_cols_req_status_tbl: Optional[str] = None
71    request_status_tbl_filter: Optional[str] = None
72    adso_type: Optional[str] = None
73    max_timestamp_custom_schema: str = "timestamp DECIMAL(23,0)"
74    default_max_timestamp: str = "1970000000000000000000"
75    custom_schema: str = "REQTSN DECIMAL(23,0)"

Configurations available for an Extraction from SAP B4.

It inherits from JDBCExtraction configurations, so it can use and/or overwrite those configurations.

These configurations cover:

  • latest_timestamp_input_col: the column containing the request timestamps in the dataset in latest_timestamp_data_location. Default: REQTSN.
  • request_status_tbl: the name of the SAP B4 table having information about the extraction requests. Composed of database.table. Default: SAPHANADB.RSPMREQUEST.
  • request_col_name: name of the column having the request timestamp to join with the request status table. Default: REQUEST_TSN.
  • data_target: the data target to extract from. User in the join operation with the request status table.
  • act_req_join_condition: the join condition into activation table can be changed using this property. Default: 'tbl.reqtsn = req.request_col_name'.
  • include_changelog_tech_cols: whether to include the technical columns (usually coming from the changelog) table or not.
  • extra_cols_req_status_tbl: columns to be added from request status table. It needs to contain the prefix "req.". E.g. "req.col1 as column_one, req.col2 as column_two".
  • request_status_tbl_filter: filter to use for filtering the request status table, influencing the calculation of the max timestamps and the delta extractions.
  • adso_type: the type of ADSO that you are extracting from. Can be "AQ" or "CL".
  • max_timestamp_custom_schema: the custom schema to apply on the calculation of the max timestamp to consider for the delta extractions. Default: timestamp DECIMAL(23,0).
  • default_max_timestamp: the timestamp to use as default, when it is not possible to derive one.
  • custom_schema: specify custom_schema for particular columns of the returned dataframe in the init/delta extraction of the source table.
SAPB4Extraction( user: str, password: str, url: str, dbtable: str, calc_upper_bound_schema: Optional[str] = None, changelog_table: Optional[str] = None, partition_column: Optional[str] = None, latest_timestamp_data_location: Optional[str] = None, latest_timestamp_data_format: str = 'delta', extraction_type: str = 'delta', driver: str = 'com.sap.db.jdbc.Driver', num_partitions: Optional[int] = None, lower_bound: Union[int, float, str, NoneType] = None, upper_bound: Union[int, float, str, NoneType] = None, default_upper_bound: str = '1', fetch_size: str = '100000', compress: bool = True, custom_schema: str = 'REQTSN DECIMAL(23,0)', min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None, generate_predicates: bool = False, predicates: Optional[List] = None, predicates_add_null: bool = True, extraction_timestamp: str = '20241028145833', max_timestamp_custom_schema: str = 'timestamp DECIMAL(23,0)', latest_timestamp_input_col: str = 'REQTSN', request_status_tbl: str = 'SAPHANADB.RSPMREQUEST', request_col_name: str = 'REQUEST_TSN', data_target: Optional[str] = None, act_req_join_condition: Optional[str] = None, include_changelog_tech_cols: Optional[bool] = None, extra_cols_req_status_tbl: Optional[str] = None, request_status_tbl_filter: Optional[str] = None, adso_type: Optional[str] = None, default_max_timestamp: str = '1970000000000000000000')
latest_timestamp_input_col: str = 'REQTSN'
request_status_tbl: str = 'SAPHANADB.RSPMREQUEST'
request_col_name: str = 'REQUEST_TSN'
data_target: Optional[str] = None
act_req_join_condition: Optional[str] = None
include_changelog_tech_cols: Optional[bool] = None
extra_cols_req_status_tbl: Optional[str] = None
request_status_tbl_filter: Optional[str] = None
adso_type: Optional[str] = None
max_timestamp_custom_schema: str = 'timestamp DECIMAL(23,0)'
default_max_timestamp: str = '1970000000000000000000'
custom_schema: str = 'REQTSN DECIMAL(23,0)'
 78class SAPB4ExtractionUtils(JDBCExtractionUtils):
 79    """Utils for managing data extraction from SAP B4."""
 80
 81    def __init__(self, sap_b4_extraction: SAPB4Extraction):
 82        """Construct SAPB4ExtractionUtils.
 83
 84        Args:
 85            sap_b4_extraction: SAP B4 Extraction configurations.
 86        """
 87        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
 88        self._B4_EXTRACTION = sap_b4_extraction
 89        self._B4_EXTRACTION.request_status_tbl_filter = (
 90            self._get_req_status_tbl_filter()
 91        )
 92        self._MAX_TIMESTAMP_QUERY = f""" --# nosec
 93                (SELECT COALESCE(MAX({self._B4_EXTRACTION.request_col_name}),
 94                    {self._B4_EXTRACTION.default_max_timestamp}) as timestamp
 95                FROM {self._B4_EXTRACTION.request_status_tbl}
 96                WHERE {self._B4_EXTRACTION.request_status_tbl_filter})
 97            """  # nosec: B608
 98        super().__init__(sap_b4_extraction)
 99
100    @staticmethod
101    def get_data_target(input_spec_opt: dict) -> str:
102        """Get the data_target from the data_target option or derive it.
103
104        By definition data_target is the same for the table and changelog table and
105        is the same string ignoring everything before / and the first and last
106        character after /. E.g. for a dbtable /BIC/abtable12, the data_target
107        would be btable1.
108
109        Args:
110            input_spec_opt: options from the input_spec.
111
112        Returns:
113            A string with the data_target.
114        """
115        exclude_chars = """["'\\\\]"""
116        data_target: str = input_spec_opt.get(
117            "data_target",
118            re.sub(exclude_chars, "", input_spec_opt["dbtable"]).split("/")[-1][1:-1],
119        )
120
121        return data_target
122
123    def _get_init_query(self) -> Tuple[str, str]:
124        """Get a query to do an init load based on a ADSO on a SAP B4 system.
125
126        Returns:
127            A query to submit to SAP B4 for the initial data extraction. The query
128            is enclosed in parentheses so that Spark treats it as a table and supports
129            it in the dbtable option.
130        """
131        extraction_query = self._get_init_extraction_query()
132
133        predicates_query = f"""
134        (SELECT DISTINCT({self._B4_EXTRACTION.partition_column})
135        FROM {self._B4_EXTRACTION.dbtable} t)
136        """  # nosec: B608
137
138        return extraction_query, predicates_query
139
140    def _get_init_extraction_query(self) -> str:
141        """Get the init extraction query based on current timestamp.
142
143        Returns:
144            A query to submit to SAP B4 for the initial data extraction.
145        """
146        changelog_tech_cols = (
147            f"""{self._B4_EXTRACTION.extraction_timestamp}000000000 AS reqtsn,
148                '0' AS datapakid,
149                0 AS record,"""
150            if self._B4_EXTRACTION.include_changelog_tech_cols
151            else ""
152        )
153
154        extraction_query = f"""
155                (SELECT t.*, {changelog_tech_cols}
156                    CAST({self._B4_EXTRACTION.extraction_timestamp}
157                        AS DECIMAL(15,0)) AS extraction_start_timestamp
158                FROM {self._B4_EXTRACTION.dbtable} t
159                )"""  # nosec: B608
160
161        return extraction_query
162
163    def _get_delta_query(self) -> Tuple[str, str]:
164        """Get a delta query for an SAP B4 ADSO.
165
166        An SAP B4 ADSO requires a join with a special type of table often called
167        requests status table (RSPMREQUEST), in which B4 tracks down the timestamps,
168        status and metrics associated with the several data loads that were performed
169        into B4. Depending on the type of ADSO (AQ or CL) the join condition and also
170        the ADSO/table to consider to extract from will be different.
171        For AQ types, there is only the active table, from which we extract both inits
172        and deltas and this is also the table used to join with RSPMREQUEST to derive
173        the next portion of the data to extract.
174        For the CL types, we have an active table/adso from which we extract the init
175        and one changelog table from which we extract the delta portions of data.
176        Depending, if it is an init or delta one table or the other is also used to join
177        with RSPMREQUEST.
178
179        The logic on this function basically ensures that we are reading from the source
180        table considering the data that has arrived between the maximum timestamp that
181        is available in our target destination and the max timestamp of the extractions
182        performed and registered in the RSPMREQUEST table, which follow the filtering
183         criteria.
184
185        Returns:
186            A query to submit to SAP B4 for the delta data extraction. The query
187            is enclosed in parentheses so that Spark treats it as a table and supports
188            it in the dbtable option.
189        """
190        if not self._B4_EXTRACTION.min_timestamp:
191            from lakehouse_engine.io.reader_factory import ReaderFactory
192
193            latest_timestamp_data_df = ReaderFactory.get_data(
194                InputSpec(
195                    spec_id="data_with_latest_timestamp",
196                    data_format=self._B4_EXTRACTION.latest_timestamp_data_format,
197                    read_type=ReadType.BATCH.value,
198                    location=self._B4_EXTRACTION.latest_timestamp_data_location,
199                )
200            )
201            min_timestamp = latest_timestamp_data_df.transform(
202                Aggregators.get_max_value(
203                    self._B4_EXTRACTION.latest_timestamp_input_col
204                )
205            ).first()[0]
206        else:
207            min_timestamp = self._B4_EXTRACTION.min_timestamp
208
209        max_timestamp = (
210            self._B4_EXTRACTION.max_timestamp
211            if self._B4_EXTRACTION.max_timestamp
212            else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY)
213        )
214
215        if self._B4_EXTRACTION.act_req_join_condition:
216            join_condition = f"{self._B4_EXTRACTION.act_req_join_condition}"
217        else:
218            join_condition = f"tbl.reqtsn = req.{self._B4_EXTRACTION.request_col_name}"
219
220        base_query = f""" --# nosec
221        FROM {self._B4_EXTRACTION.changelog_table} AS tbl
222        JOIN {self._B4_EXTRACTION.request_status_tbl} AS req
223            ON {join_condition}
224        WHERE {self._B4_EXTRACTION.request_status_tbl_filter}
225            AND req.{self._B4_EXTRACTION.request_col_name} > {min_timestamp}
226            AND req.{self._B4_EXTRACTION.request_col_name} <= {max_timestamp})
227        """
228
229        main_cols = f"""
230            (SELECT tbl.*,
231                CAST({self._B4_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0))
232                    AS extraction_start_timestamp
233            """
234
235        # We join the main columns considered for the extraction with
236        # extra_cols_act_request that people might want to use, filtering to only
237        # add the comma and join the strings, in case extra_cols_act_request is
238        # not None or empty.
239        extraction_query_cols = ",".join(
240            filter(None, [main_cols, self._B4_EXTRACTION.extra_cols_req_status_tbl])
241        )
242
243        extraction_query = extraction_query_cols + base_query
244
245        predicates_query = f"""
246        (SELECT DISTINCT({self._B4_EXTRACTION.partition_column})
247        {base_query}
248        """
249
250        return extraction_query, predicates_query
251
252    def _get_req_status_tbl_filter(self) -> Any:
253        if self._B4_EXTRACTION.request_status_tbl_filter:
254            return self._B4_EXTRACTION.request_status_tbl_filter
255        else:
256            if self._B4_EXTRACTION.adso_type == ADSOTypes.AQ.value:
257                return f"""
258                    STORAGE = 'AQ' AND REQUEST_IS_IN_PROCESS = 'N' AND
259                    LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG', 'GR')
260                    AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}')
261                """
262            elif self._B4_EXTRACTION.adso_type == ADSOTypes.CL.value:
263                return f"""
264                    STORAGE = 'AT' AND REQUEST_IS_IN_PROCESS = 'N' AND
265                    LAST_OPERATION_TYPE IN ('C', 'U') AND REQUEST_STATUS IN ('GG')
266                    AND UPPER(DATATARGET) = UPPER('{self._B4_EXTRACTION.data_target}')
267                """
268            else:
269                raise NotImplementedError(
270                    f"The requested ADSO Type is not fully implemented and/or tested."
271                    f"Supported ADSO Types: {ADSOTypes.SUPPORTED_TYPES}"
272                )

Utils for managing data extraction from SAP B4.

SAPB4ExtractionUtils( sap_b4_extraction: SAPB4Extraction)
81    def __init__(self, sap_b4_extraction: SAPB4Extraction):
82        """Construct SAPB4ExtractionUtils.
83
84        Args:
85            sap_b4_extraction: SAP B4 Extraction configurations.
86        """
87        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
88        self._B4_EXTRACTION = sap_b4_extraction
89        self._B4_EXTRACTION.request_status_tbl_filter = (
90            self._get_req_status_tbl_filter()
91        )
92        self._MAX_TIMESTAMP_QUERY = f""" --# nosec
93                (SELECT COALESCE(MAX({self._B4_EXTRACTION.request_col_name}),
94                    {self._B4_EXTRACTION.default_max_timestamp}) as timestamp
95                FROM {self._B4_EXTRACTION.request_status_tbl}
96                WHERE {self._B4_EXTRACTION.request_status_tbl_filter})
97            """  # nosec: B608
98        super().__init__(sap_b4_extraction)

Construct SAPB4ExtractionUtils.

Arguments:
  • sap_b4_extraction: SAP B4 Extraction configurations.
@staticmethod
def get_data_target(input_spec_opt: dict) -> str:
100    @staticmethod
101    def get_data_target(input_spec_opt: dict) -> str:
102        """Get the data_target from the data_target option or derive it.
103
104        By definition data_target is the same for the table and changelog table and
105        is the same string ignoring everything before / and the first and last
106        character after /. E.g. for a dbtable /BIC/abtable12, the data_target
107        would be btable1.
108
109        Args:
110            input_spec_opt: options from the input_spec.
111
112        Returns:
113            A string with the data_target.
114        """
115        exclude_chars = """["'\\\\]"""
116        data_target: str = input_spec_opt.get(
117            "data_target",
118            re.sub(exclude_chars, "", input_spec_opt["dbtable"]).split("/")[-1][1:-1],
119        )
120
121        return data_target

Get the data_target from the data_target option or derive it.

By definition data_target is the same for the table and changelog table and is the same string ignoring everything before / and the first and last character after /. E.g. for a dbtable /BIC/abtable12, the data_target would be btable1.

Arguments:
  • input_spec_opt: options from the input_spec.
Returns:

A string with the data_target.