lakehouse_engine.utils.extraction.sap_bw_extraction_utils

Utilities module for SAP BW extraction processes.

  1"""Utilities module for SAP BW extraction processes."""
  2
  3from copy import copy
  4from dataclasses import dataclass
  5from logging import Logger
  6from typing import Optional, Tuple
  7
  8from lakehouse_engine.core.definitions import InputFormat, InputSpec, ReadType
  9from lakehouse_engine.transformers.aggregators import Aggregators
 10from lakehouse_engine.utils.extraction.jdbc_extraction_utils import (
 11    JDBCExtraction,
 12    JDBCExtractionType,
 13    JDBCExtractionUtils,
 14)
 15from lakehouse_engine.utils.logging_handler import LoggingHandler
 16
 17
 18@dataclass
 19class SAPBWExtraction(JDBCExtraction):
 20    """Configurations available for an Extraction from SAP BW.
 21
 22    It inherits from SAPBWExtraction configurations, so it can use
 23    and/or overwrite those configurations.
 24
 25    These configurations cover:
 26    - latest_timestamp_input_col: the column containing the actrequest timestamp
 27        in the dataset in latest_timestamp_data_location. Default:
 28        "actrequest_timestamp".
 29    - act_request_table: the name of the SAP BW activation requests table.
 30        Composed of database.table. Default: SAPPHA.RSODSACTREQ.
 31    - request_col_name: name of the column having the request to join
 32        with the activation request table. Default: actrequest.
 33    - act_req_join_condition: the join condition into activation table
 34        can be changed using this property.
 35        Default: 'changelog_tbl.request = act_req.request_col_name'.
 36    - odsobject: name of BW Object, used for joining with the activation request
 37        table to get the max actrequest_timestamp to consider while filtering
 38        the changelog table.
 39    - include_changelog_tech_cols: whether to include the technical columns
 40        (usually coming from the changelog) table or not. Default: True.
 41    - extra_cols_act_request: list of columns to be added from act request table.
 42        It needs to contain the prefix "act_req.". E.g. "act_req.col1
 43        as column_one, act_req.col2 as column_two".
 44    - get_timestamp_from_act_request: whether to get init timestamp
 45        from act request table or assume current/given timestamp.
 46    - sap_bw_schema: sap bw schema. Default: SAPPHA.
 47    - max_timestamp_custom_schema: the custom schema to apply on the calculation of
 48        the max timestamp to consider for the delta extractions.
 49        Default: timestamp DECIMAL(23,0).
 50    - default_max_timestamp: the timestamp to use as default, when it is not possible
 51        to derive one.
 52    """
 53
 54    latest_timestamp_input_col: str = "actrequest_timestamp"
 55    act_request_table: str = "SAPPHA.RSODSACTREQ"
 56    request_col_name: str = "actrequest"
 57    act_req_join_condition: Optional[str] = None
 58    odsobject: Optional[str] = None
 59    include_changelog_tech_cols: bool = True
 60    extra_cols_act_request: Optional[str] = None
 61    get_timestamp_from_act_request: bool = False
 62    sap_bw_schema: str = "SAPPHA"
 63    max_timestamp_custom_schema: str = "timestamp DECIMAL(15,0)"
 64    default_max_timestamp: str = "197000000000000"
 65
 66
 67class SAPBWExtractionUtils(JDBCExtractionUtils):
 68    """Utils for managing data extraction from particularly relevant JDBC sources."""
 69
 70    def __init__(self, sap_bw_extraction: SAPBWExtraction):
 71        """Construct SAPBWExtractionUtils.
 72
 73        Args:
 74            sap_bw_extraction: SAP BW Extraction configurations.
 75        """
 76        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
 77        self._BW_EXTRACTION = sap_bw_extraction
 78        self._BW_EXTRACTION.changelog_table = self.get_changelog_table()
 79        self._MAX_TIMESTAMP_QUERY = f""" --# nosec
 80                (SELECT COALESCE(MAX(timestamp),
 81                    {self._BW_EXTRACTION.default_max_timestamp}) as timestamp
 82                FROM {self._BW_EXTRACTION.act_request_table}
 83                WHERE odsobject = '{self._BW_EXTRACTION.odsobject}'
 84                 AND operation = 'A' AND status = '0')
 85            """  # nosec: B608
 86        super().__init__(sap_bw_extraction)
 87
 88    def get_changelog_table(self) -> str:
 89        """Get the changelog table, given an odsobject.
 90
 91        Returns:
 92             String to use as changelog_table.
 93        """
 94        if (
 95            self._BW_EXTRACTION.odsobject is not None
 96            and self._BW_EXTRACTION.changelog_table is None
 97            and self._BW_EXTRACTION.extraction_type != JDBCExtractionType.INIT.value
 98        ):
 99            escaped_odsobject = copy(self._BW_EXTRACTION.odsobject).replace("_", "$_")
100
101            if self._BW_EXTRACTION.sap_bw_schema:
102                system_table = f"{self._BW_EXTRACTION.sap_bw_schema}.RSTSODS"
103            else:
104                system_table = "RSTSODS"
105
106            jdbc_args = {
107                "url": self._BW_EXTRACTION.url,
108                "table": f""" -- # nosec
109                    (SELECT ODSNAME_TECH
110                    FROM {system_table}
111                    WHERE ODSNAME LIKE '8{escaped_odsobject}$_%'
112                        ESCAPE '$' AND USERAPP = 'CHANGELOG' AND VERSION = '000')
113                """,  # nosec: B608
114                "properties": {
115                    "user": self._BW_EXTRACTION.user,
116                    "password": self._BW_EXTRACTION.password,
117                    "driver": self._BW_EXTRACTION.driver,
118                },
119            }
120            from lakehouse_engine.io.reader_factory import ReaderFactory
121
122            changelog_df = ReaderFactory.get_data(
123                InputSpec(
124                    spec_id="changelog_table",
125                    data_format=InputFormat.JDBC.value,
126                    read_type=ReadType.BATCH.value,
127                    jdbc_args=jdbc_args,
128                )
129            )
130            changelog_table = (
131                f'{self._BW_EXTRACTION.sap_bw_schema}."{changelog_df.first()[0]}"'
132                if self._BW_EXTRACTION.sap_bw_schema
133                else str(changelog_df.first()[0])
134            )
135        else:
136            changelog_table = (
137                self._BW_EXTRACTION.changelog_table
138                if self._BW_EXTRACTION.changelog_table
139                else f"{self._BW_EXTRACTION.dbtable}_cl"
140            )
141        self._LOGGER.info(f"The changelog table derived is: '{changelog_table}'")
142
143        return changelog_table
144
145    @staticmethod
146    def get_odsobject(input_spec_opt: dict) -> str:
147        """Get the odsobject based on the provided options.
148
149        With the table name we may also get the db name, so we need to split.
150        Moreover, there might be the need for people to specify odsobject if
151        it is different from the dbtable.
152
153        Args:
154            input_spec_opt: options from the input_spec.
155
156        Returns:
157            A string with the odsobject.
158        """
159        return str(
160            input_spec_opt["dbtable"].split(".")[1]
161            if len(input_spec_opt["dbtable"].split(".")) > 1
162            else input_spec_opt["dbtable"]
163        )
164
165    def _get_init_query(self) -> Tuple[str, str]:
166        """Get a query to do an init load based on a DSO on a SAP BW system.
167
168        Returns:
169            A query to submit to SAP BW for the initial data extraction. The query
170            is enclosed in parentheses so that Spark treats it as a table and supports
171            it in the dbtable option.
172        """
173        if self._BW_EXTRACTION.get_timestamp_from_act_request:
174            # check if we are dealing with a DSO of type Write Optimised
175            if self._BW_EXTRACTION.dbtable == self._BW_EXTRACTION.changelog_table:
176                extraction_query = self._get_init_extraction_query_act_req_timestamp()
177            else:
178                raise AttributeError(
179                    "Not able to get the extraction query. The option "
180                    "'get_timestamp_from_act_request' is only "
181                    "available/useful for DSOs of type Write Optimised."
182                )
183        else:
184            extraction_query = self._get_init_extraction_query()
185
186        predicates_query = f"""
187        (SELECT DISTINCT({self._BW_EXTRACTION.partition_column})
188        FROM {self._BW_EXTRACTION.dbtable} t)
189        """  # nosec: B608
190
191        return extraction_query, predicates_query
192
193    def _get_init_extraction_query(self) -> str:
194        """Get extraction query based on given/current timestamp.
195
196        Returns:
197            A query to submit to SAP BW for the initial data extraction.
198        """
199        changelog_tech_cols = (
200            f"""'0' AS request,
201                CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0))
202                 AS actrequest_timestamp,
203                '0' AS datapakid,
204                0 AS partno,
205                0 AS record,"""
206            if self._BW_EXTRACTION.include_changelog_tech_cols
207            else f"CAST({self._BW_EXTRACTION.extraction_timestamp} "
208            f"AS DECIMAL(15, 0))"
209            f" AS actrequest_timestamp,"
210        )
211
212        extraction_query = f"""
213                (SELECT t.*,
214                    {changelog_tech_cols}
215                    CAST({self._BW_EXTRACTION.extraction_timestamp}
216                        AS DECIMAL(15, 0)) AS extraction_start_timestamp
217                FROM {self._BW_EXTRACTION.dbtable} t
218                )"""  # nosec: B608
219
220        return extraction_query
221
222    def _get_init_extraction_query_act_req_timestamp(self) -> str:
223        """Get extraction query assuming the init timestamp from act_request table.
224
225        Returns:
226            A query to submit to SAP BW for the initial data extraction from
227            write optimised DSOs, receiving the actrequest_timestamp from
228            the activation requests table.
229        """
230        extraction_query = f"""
231            (SELECT t.*,
232                act_req.timestamp as actrequest_timestamp,
233                CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0))
234                 AS extraction_start_timestamp
235            FROM {self._BW_EXTRACTION.dbtable} t
236            JOIN {self._BW_EXTRACTION.act_request_table} AS act_req ON
237                t.request = act_req.{self._BW_EXTRACTION.request_col_name}
238            WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}'
239                AND operation = 'A' AND status = '0'
240            )"""  # nosec: B608
241
242        return extraction_query
243
244    def _get_delta_query(self) -> Tuple[str, str]:
245        """Get a delta query for an SAP BW DSO.
246
247        An SAP BW DSO requires a join with a special type of table often called
248        activation requests table, in which BW tracks down the timestamps associated
249        with the several data loads that were performed into BW. Because the changelog
250        table only contains the active request id, and that cannot be sorted by the
251        downstream consumers to figure out the latest change, we need to join the
252        changelog table with this special table to get the activation requests
253        timestamps to then use them to figure out the latest changes in the delta load
254        logic afterwards.
255
256        Additionally, we also need to know which was the latest timestamp already loaded
257        into the lakehouse bronze layer. The latest timestamp should always be available
258        in the bronze dataset itself or in a dataset that tracks down all the actrequest
259        timestamps that were already loaded. So we get the max value out of the
260        respective actrequest timestamp column in that dataset.
261
262        Returns:
263            A query to submit to SAP BW for the delta data extraction. The query
264            is enclosed in parentheses so that Spark treats it as a table and supports
265            it in the dbtable option.
266        """
267        if not self._BW_EXTRACTION.min_timestamp:
268            from lakehouse_engine.io.reader_factory import ReaderFactory
269
270            latest_timestamp_data_df = ReaderFactory.get_data(
271                InputSpec(
272                    spec_id="data_with_latest_timestamp",
273                    data_format=self._BW_EXTRACTION.latest_timestamp_data_format,
274                    read_type=ReadType.BATCH.value,
275                    location=self._BW_EXTRACTION.latest_timestamp_data_location,
276                )
277            )
278            min_timestamp = latest_timestamp_data_df.transform(
279                Aggregators.get_max_value(
280                    self._BW_EXTRACTION.latest_timestamp_input_col
281                )
282            ).first()[0]
283        else:
284            min_timestamp = self._BW_EXTRACTION.min_timestamp
285
286        max_timestamp = (
287            self._BW_EXTRACTION.max_timestamp
288            if self._BW_EXTRACTION.max_timestamp
289            else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY)
290        )
291
292        if self._BW_EXTRACTION.act_req_join_condition:
293            join_condition = f"{self._BW_EXTRACTION.act_req_join_condition}"
294        else:
295            join_condition = (
296                f"changelog_tbl.request = "
297                f"act_req.{self._BW_EXTRACTION.request_col_name}"
298            )
299
300        base_query = f""" --# nosec
301        FROM {self._BW_EXTRACTION.changelog_table} AS changelog_tbl
302        JOIN {self._BW_EXTRACTION.act_request_table} AS act_req
303            ON {join_condition}
304        WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}'
305            AND act_req.timestamp > {min_timestamp}
306            AND act_req.timestamp <= {max_timestamp}
307            AND operation = 'A' AND status = '0')
308        """
309
310        main_cols = f"""
311            (SELECT changelog_tbl.*,
312                act_req.TIMESTAMP AS actrequest_timestamp,
313                CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0))
314                    AS extraction_start_timestamp
315            """
316        # We join the main columns considered for the extraction with
317        # extra_cols_act_request that people might want to use, filtering to only
318        # add the comma and join the strings, in case extra_cols_act_request is
319        # not None or empty.
320        extraction_query_cols = ",".join(
321            filter(None, [main_cols, self._BW_EXTRACTION.extra_cols_act_request])
322        )
323
324        extraction_query = extraction_query_cols + base_query
325
326        predicates_query = f"""
327        (SELECT DISTINCT({self._BW_EXTRACTION.partition_column})
328        {base_query}
329        """
330
331        return extraction_query, predicates_query
@dataclass
class SAPBWExtraction(lakehouse_engine.utils.extraction.jdbc_extraction_utils.JDBCExtraction):
19@dataclass
20class SAPBWExtraction(JDBCExtraction):
21    """Configurations available for an Extraction from SAP BW.
22
23    It inherits from SAPBWExtraction configurations, so it can use
24    and/or overwrite those configurations.
25
26    These configurations cover:
27    - latest_timestamp_input_col: the column containing the actrequest timestamp
28        in the dataset in latest_timestamp_data_location. Default:
29        "actrequest_timestamp".
30    - act_request_table: the name of the SAP BW activation requests table.
31        Composed of database.table. Default: SAPPHA.RSODSACTREQ.
32    - request_col_name: name of the column having the request to join
33        with the activation request table. Default: actrequest.
34    - act_req_join_condition: the join condition into activation table
35        can be changed using this property.
36        Default: 'changelog_tbl.request = act_req.request_col_name'.
37    - odsobject: name of BW Object, used for joining with the activation request
38        table to get the max actrequest_timestamp to consider while filtering
39        the changelog table.
40    - include_changelog_tech_cols: whether to include the technical columns
41        (usually coming from the changelog) table or not. Default: True.
42    - extra_cols_act_request: list of columns to be added from act request table.
43        It needs to contain the prefix "act_req.". E.g. "act_req.col1
44        as column_one, act_req.col2 as column_two".
45    - get_timestamp_from_act_request: whether to get init timestamp
46        from act request table or assume current/given timestamp.
47    - sap_bw_schema: sap bw schema. Default: SAPPHA.
48    - max_timestamp_custom_schema: the custom schema to apply on the calculation of
49        the max timestamp to consider for the delta extractions.
50        Default: timestamp DECIMAL(23,0).
51    - default_max_timestamp: the timestamp to use as default, when it is not possible
52        to derive one.
53    """
54
55    latest_timestamp_input_col: str = "actrequest_timestamp"
56    act_request_table: str = "SAPPHA.RSODSACTREQ"
57    request_col_name: str = "actrequest"
58    act_req_join_condition: Optional[str] = None
59    odsobject: Optional[str] = None
60    include_changelog_tech_cols: bool = True
61    extra_cols_act_request: Optional[str] = None
62    get_timestamp_from_act_request: bool = False
63    sap_bw_schema: str = "SAPPHA"
64    max_timestamp_custom_schema: str = "timestamp DECIMAL(15,0)"
65    default_max_timestamp: str = "197000000000000"

Configurations available for an Extraction from SAP BW.

It inherits from SAPBWExtraction configurations, so it can use and/or overwrite those configurations.

These configurations cover:

  • latest_timestamp_input_col: the column containing the actrequest timestamp in the dataset in latest_timestamp_data_location. Default: "actrequest_timestamp".
  • act_request_table: the name of the SAP BW activation requests table. Composed of database.table. Default: SAPPHA.RSODSACTREQ.
  • request_col_name: name of the column having the request to join with the activation request table. Default: actrequest.
  • act_req_join_condition: the join condition into activation table can be changed using this property. Default: 'changelog_tbl.request = act_req.request_col_name'.
  • odsobject: name of BW Object, used for joining with the activation request table to get the max actrequest_timestamp to consider while filtering the changelog table.
  • include_changelog_tech_cols: whether to include the technical columns (usually coming from the changelog) table or not. Default: True.
  • extra_cols_act_request: list of columns to be added from act request table. It needs to contain the prefix "act_req.". E.g. "act_req.col1 as column_one, act_req.col2 as column_two".
  • get_timestamp_from_act_request: whether to get init timestamp from act request table or assume current/given timestamp.
  • sap_bw_schema: sap bw schema. Default: SAPPHA.
  • max_timestamp_custom_schema: the custom schema to apply on the calculation of the max timestamp to consider for the delta extractions. Default: timestamp DECIMAL(23,0).
  • default_max_timestamp: the timestamp to use as default, when it is not possible to derive one.
SAPBWExtraction( user: str, password: str, url: str, dbtable: str, calc_upper_bound_schema: Optional[str] = None, changelog_table: Optional[str] = None, partition_column: Optional[str] = None, latest_timestamp_data_location: Optional[str] = None, latest_timestamp_data_format: str = 'delta', extraction_type: str = 'delta', driver: str = 'com.sap.db.jdbc.Driver', num_partitions: Optional[int] = None, lower_bound: Union[int, float, str, NoneType] = None, upper_bound: Union[int, float, str, NoneType] = None, default_upper_bound: str = '1', fetch_size: str = '100000', compress: bool = True, custom_schema: Optional[str] = None, min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None, generate_predicates: bool = False, predicates: Optional[List] = None, predicates_add_null: bool = True, extraction_timestamp: str = '20241028145833', max_timestamp_custom_schema: str = 'timestamp DECIMAL(15,0)', latest_timestamp_input_col: str = 'actrequest_timestamp', act_request_table: str = 'SAPPHA.RSODSACTREQ', request_col_name: str = 'actrequest', act_req_join_condition: Optional[str] = None, odsobject: Optional[str] = None, include_changelog_tech_cols: bool = True, extra_cols_act_request: Optional[str] = None, get_timestamp_from_act_request: bool = False, sap_bw_schema: str = 'SAPPHA', default_max_timestamp: str = '197000000000000')
latest_timestamp_input_col: str = 'actrequest_timestamp'
act_request_table: str = 'SAPPHA.RSODSACTREQ'
request_col_name: str = 'actrequest'
act_req_join_condition: Optional[str] = None
odsobject: Optional[str] = None
include_changelog_tech_cols: bool = True
extra_cols_act_request: Optional[str] = None
get_timestamp_from_act_request: bool = False
sap_bw_schema: str = 'SAPPHA'
max_timestamp_custom_schema: str = 'timestamp DECIMAL(15,0)'
default_max_timestamp: str = '197000000000000'
 68class SAPBWExtractionUtils(JDBCExtractionUtils):
 69    """Utils for managing data extraction from particularly relevant JDBC sources."""
 70
 71    def __init__(self, sap_bw_extraction: SAPBWExtraction):
 72        """Construct SAPBWExtractionUtils.
 73
 74        Args:
 75            sap_bw_extraction: SAP BW Extraction configurations.
 76        """
 77        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
 78        self._BW_EXTRACTION = sap_bw_extraction
 79        self._BW_EXTRACTION.changelog_table = self.get_changelog_table()
 80        self._MAX_TIMESTAMP_QUERY = f""" --# nosec
 81                (SELECT COALESCE(MAX(timestamp),
 82                    {self._BW_EXTRACTION.default_max_timestamp}) as timestamp
 83                FROM {self._BW_EXTRACTION.act_request_table}
 84                WHERE odsobject = '{self._BW_EXTRACTION.odsobject}'
 85                 AND operation = 'A' AND status = '0')
 86            """  # nosec: B608
 87        super().__init__(sap_bw_extraction)
 88
 89    def get_changelog_table(self) -> str:
 90        """Get the changelog table, given an odsobject.
 91
 92        Returns:
 93             String to use as changelog_table.
 94        """
 95        if (
 96            self._BW_EXTRACTION.odsobject is not None
 97            and self._BW_EXTRACTION.changelog_table is None
 98            and self._BW_EXTRACTION.extraction_type != JDBCExtractionType.INIT.value
 99        ):
100            escaped_odsobject = copy(self._BW_EXTRACTION.odsobject).replace("_", "$_")
101
102            if self._BW_EXTRACTION.sap_bw_schema:
103                system_table = f"{self._BW_EXTRACTION.sap_bw_schema}.RSTSODS"
104            else:
105                system_table = "RSTSODS"
106
107            jdbc_args = {
108                "url": self._BW_EXTRACTION.url,
109                "table": f""" -- # nosec
110                    (SELECT ODSNAME_TECH
111                    FROM {system_table}
112                    WHERE ODSNAME LIKE '8{escaped_odsobject}$_%'
113                        ESCAPE '$' AND USERAPP = 'CHANGELOG' AND VERSION = '000')
114                """,  # nosec: B608
115                "properties": {
116                    "user": self._BW_EXTRACTION.user,
117                    "password": self._BW_EXTRACTION.password,
118                    "driver": self._BW_EXTRACTION.driver,
119                },
120            }
121            from lakehouse_engine.io.reader_factory import ReaderFactory
122
123            changelog_df = ReaderFactory.get_data(
124                InputSpec(
125                    spec_id="changelog_table",
126                    data_format=InputFormat.JDBC.value,
127                    read_type=ReadType.BATCH.value,
128                    jdbc_args=jdbc_args,
129                )
130            )
131            changelog_table = (
132                f'{self._BW_EXTRACTION.sap_bw_schema}."{changelog_df.first()[0]}"'
133                if self._BW_EXTRACTION.sap_bw_schema
134                else str(changelog_df.first()[0])
135            )
136        else:
137            changelog_table = (
138                self._BW_EXTRACTION.changelog_table
139                if self._BW_EXTRACTION.changelog_table
140                else f"{self._BW_EXTRACTION.dbtable}_cl"
141            )
142        self._LOGGER.info(f"The changelog table derived is: '{changelog_table}'")
143
144        return changelog_table
145
146    @staticmethod
147    def get_odsobject(input_spec_opt: dict) -> str:
148        """Get the odsobject based on the provided options.
149
150        With the table name we may also get the db name, so we need to split.
151        Moreover, there might be the need for people to specify odsobject if
152        it is different from the dbtable.
153
154        Args:
155            input_spec_opt: options from the input_spec.
156
157        Returns:
158            A string with the odsobject.
159        """
160        return str(
161            input_spec_opt["dbtable"].split(".")[1]
162            if len(input_spec_opt["dbtable"].split(".")) > 1
163            else input_spec_opt["dbtable"]
164        )
165
166    def _get_init_query(self) -> Tuple[str, str]:
167        """Get a query to do an init load based on a DSO on a SAP BW system.
168
169        Returns:
170            A query to submit to SAP BW for the initial data extraction. The query
171            is enclosed in parentheses so that Spark treats it as a table and supports
172            it in the dbtable option.
173        """
174        if self._BW_EXTRACTION.get_timestamp_from_act_request:
175            # check if we are dealing with a DSO of type Write Optimised
176            if self._BW_EXTRACTION.dbtable == self._BW_EXTRACTION.changelog_table:
177                extraction_query = self._get_init_extraction_query_act_req_timestamp()
178            else:
179                raise AttributeError(
180                    "Not able to get the extraction query. The option "
181                    "'get_timestamp_from_act_request' is only "
182                    "available/useful for DSOs of type Write Optimised."
183                )
184        else:
185            extraction_query = self._get_init_extraction_query()
186
187        predicates_query = f"""
188        (SELECT DISTINCT({self._BW_EXTRACTION.partition_column})
189        FROM {self._BW_EXTRACTION.dbtable} t)
190        """  # nosec: B608
191
192        return extraction_query, predicates_query
193
194    def _get_init_extraction_query(self) -> str:
195        """Get extraction query based on given/current timestamp.
196
197        Returns:
198            A query to submit to SAP BW for the initial data extraction.
199        """
200        changelog_tech_cols = (
201            f"""'0' AS request,
202                CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0))
203                 AS actrequest_timestamp,
204                '0' AS datapakid,
205                0 AS partno,
206                0 AS record,"""
207            if self._BW_EXTRACTION.include_changelog_tech_cols
208            else f"CAST({self._BW_EXTRACTION.extraction_timestamp} "
209            f"AS DECIMAL(15, 0))"
210            f" AS actrequest_timestamp,"
211        )
212
213        extraction_query = f"""
214                (SELECT t.*,
215                    {changelog_tech_cols}
216                    CAST({self._BW_EXTRACTION.extraction_timestamp}
217                        AS DECIMAL(15, 0)) AS extraction_start_timestamp
218                FROM {self._BW_EXTRACTION.dbtable} t
219                )"""  # nosec: B608
220
221        return extraction_query
222
223    def _get_init_extraction_query_act_req_timestamp(self) -> str:
224        """Get extraction query assuming the init timestamp from act_request table.
225
226        Returns:
227            A query to submit to SAP BW for the initial data extraction from
228            write optimised DSOs, receiving the actrequest_timestamp from
229            the activation requests table.
230        """
231        extraction_query = f"""
232            (SELECT t.*,
233                act_req.timestamp as actrequest_timestamp,
234                CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15, 0))
235                 AS extraction_start_timestamp
236            FROM {self._BW_EXTRACTION.dbtable} t
237            JOIN {self._BW_EXTRACTION.act_request_table} AS act_req ON
238                t.request = act_req.{self._BW_EXTRACTION.request_col_name}
239            WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}'
240                AND operation = 'A' AND status = '0'
241            )"""  # nosec: B608
242
243        return extraction_query
244
245    def _get_delta_query(self) -> Tuple[str, str]:
246        """Get a delta query for an SAP BW DSO.
247
248        An SAP BW DSO requires a join with a special type of table often called
249        activation requests table, in which BW tracks down the timestamps associated
250        with the several data loads that were performed into BW. Because the changelog
251        table only contains the active request id, and that cannot be sorted by the
252        downstream consumers to figure out the latest change, we need to join the
253        changelog table with this special table to get the activation requests
254        timestamps to then use them to figure out the latest changes in the delta load
255        logic afterwards.
256
257        Additionally, we also need to know which was the latest timestamp already loaded
258        into the lakehouse bronze layer. The latest timestamp should always be available
259        in the bronze dataset itself or in a dataset that tracks down all the actrequest
260        timestamps that were already loaded. So we get the max value out of the
261        respective actrequest timestamp column in that dataset.
262
263        Returns:
264            A query to submit to SAP BW for the delta data extraction. The query
265            is enclosed in parentheses so that Spark treats it as a table and supports
266            it in the dbtable option.
267        """
268        if not self._BW_EXTRACTION.min_timestamp:
269            from lakehouse_engine.io.reader_factory import ReaderFactory
270
271            latest_timestamp_data_df = ReaderFactory.get_data(
272                InputSpec(
273                    spec_id="data_with_latest_timestamp",
274                    data_format=self._BW_EXTRACTION.latest_timestamp_data_format,
275                    read_type=ReadType.BATCH.value,
276                    location=self._BW_EXTRACTION.latest_timestamp_data_location,
277                )
278            )
279            min_timestamp = latest_timestamp_data_df.transform(
280                Aggregators.get_max_value(
281                    self._BW_EXTRACTION.latest_timestamp_input_col
282                )
283            ).first()[0]
284        else:
285            min_timestamp = self._BW_EXTRACTION.min_timestamp
286
287        max_timestamp = (
288            self._BW_EXTRACTION.max_timestamp
289            if self._BW_EXTRACTION.max_timestamp
290            else self._get_max_timestamp(self._MAX_TIMESTAMP_QUERY)
291        )
292
293        if self._BW_EXTRACTION.act_req_join_condition:
294            join_condition = f"{self._BW_EXTRACTION.act_req_join_condition}"
295        else:
296            join_condition = (
297                f"changelog_tbl.request = "
298                f"act_req.{self._BW_EXTRACTION.request_col_name}"
299            )
300
301        base_query = f""" --# nosec
302        FROM {self._BW_EXTRACTION.changelog_table} AS changelog_tbl
303        JOIN {self._BW_EXTRACTION.act_request_table} AS act_req
304            ON {join_condition}
305        WHERE act_req.odsobject = '{self._BW_EXTRACTION.odsobject}'
306            AND act_req.timestamp > {min_timestamp}
307            AND act_req.timestamp <= {max_timestamp}
308            AND operation = 'A' AND status = '0')
309        """
310
311        main_cols = f"""
312            (SELECT changelog_tbl.*,
313                act_req.TIMESTAMP AS actrequest_timestamp,
314                CAST({self._BW_EXTRACTION.extraction_timestamp} AS DECIMAL(15,0))
315                    AS extraction_start_timestamp
316            """
317        # We join the main columns considered for the extraction with
318        # extra_cols_act_request that people might want to use, filtering to only
319        # add the comma and join the strings, in case extra_cols_act_request is
320        # not None or empty.
321        extraction_query_cols = ",".join(
322            filter(None, [main_cols, self._BW_EXTRACTION.extra_cols_act_request])
323        )
324
325        extraction_query = extraction_query_cols + base_query
326
327        predicates_query = f"""
328        (SELECT DISTINCT({self._BW_EXTRACTION.partition_column})
329        {base_query}
330        """
331
332        return extraction_query, predicates_query

Utils for managing data extraction from particularly relevant JDBC sources.

SAPBWExtractionUtils( sap_bw_extraction: SAPBWExtraction)
71    def __init__(self, sap_bw_extraction: SAPBWExtraction):
72        """Construct SAPBWExtractionUtils.
73
74        Args:
75            sap_bw_extraction: SAP BW Extraction configurations.
76        """
77        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
78        self._BW_EXTRACTION = sap_bw_extraction
79        self._BW_EXTRACTION.changelog_table = self.get_changelog_table()
80        self._MAX_TIMESTAMP_QUERY = f""" --# nosec
81                (SELECT COALESCE(MAX(timestamp),
82                    {self._BW_EXTRACTION.default_max_timestamp}) as timestamp
83                FROM {self._BW_EXTRACTION.act_request_table}
84                WHERE odsobject = '{self._BW_EXTRACTION.odsobject}'
85                 AND operation = 'A' AND status = '0')
86            """  # nosec: B608
87        super().__init__(sap_bw_extraction)

Construct SAPBWExtractionUtils.

Arguments:
  • sap_bw_extraction: SAP BW Extraction configurations.
def get_changelog_table(self) -> str:
 89    def get_changelog_table(self) -> str:
 90        """Get the changelog table, given an odsobject.
 91
 92        Returns:
 93             String to use as changelog_table.
 94        """
 95        if (
 96            self._BW_EXTRACTION.odsobject is not None
 97            and self._BW_EXTRACTION.changelog_table is None
 98            and self._BW_EXTRACTION.extraction_type != JDBCExtractionType.INIT.value
 99        ):
100            escaped_odsobject = copy(self._BW_EXTRACTION.odsobject).replace("_", "$_")
101
102            if self._BW_EXTRACTION.sap_bw_schema:
103                system_table = f"{self._BW_EXTRACTION.sap_bw_schema}.RSTSODS"
104            else:
105                system_table = "RSTSODS"
106
107            jdbc_args = {
108                "url": self._BW_EXTRACTION.url,
109                "table": f""" -- # nosec
110                    (SELECT ODSNAME_TECH
111                    FROM {system_table}
112                    WHERE ODSNAME LIKE '8{escaped_odsobject}$_%'
113                        ESCAPE '$' AND USERAPP = 'CHANGELOG' AND VERSION = '000')
114                """,  # nosec: B608
115                "properties": {
116                    "user": self._BW_EXTRACTION.user,
117                    "password": self._BW_EXTRACTION.password,
118                    "driver": self._BW_EXTRACTION.driver,
119                },
120            }
121            from lakehouse_engine.io.reader_factory import ReaderFactory
122
123            changelog_df = ReaderFactory.get_data(
124                InputSpec(
125                    spec_id="changelog_table",
126                    data_format=InputFormat.JDBC.value,
127                    read_type=ReadType.BATCH.value,
128                    jdbc_args=jdbc_args,
129                )
130            )
131            changelog_table = (
132                f'{self._BW_EXTRACTION.sap_bw_schema}."{changelog_df.first()[0]}"'
133                if self._BW_EXTRACTION.sap_bw_schema
134                else str(changelog_df.first()[0])
135            )
136        else:
137            changelog_table = (
138                self._BW_EXTRACTION.changelog_table
139                if self._BW_EXTRACTION.changelog_table
140                else f"{self._BW_EXTRACTION.dbtable}_cl"
141            )
142        self._LOGGER.info(f"The changelog table derived is: '{changelog_table}'")
143
144        return changelog_table

Get the changelog table, given an odsobject.

Returns:

String to use as changelog_table.

@staticmethod
def get_odsobject(input_spec_opt: dict) -> str:
146    @staticmethod
147    def get_odsobject(input_spec_opt: dict) -> str:
148        """Get the odsobject based on the provided options.
149
150        With the table name we may also get the db name, so we need to split.
151        Moreover, there might be the need for people to specify odsobject if
152        it is different from the dbtable.
153
154        Args:
155            input_spec_opt: options from the input_spec.
156
157        Returns:
158            A string with the odsobject.
159        """
160        return str(
161            input_spec_opt["dbtable"].split(".")[1]
162            if len(input_spec_opt["dbtable"].split(".")) > 1
163            else input_spec_opt["dbtable"]
164        )

Get the odsobject based on the provided options.

With the table name we may also get the db name, so we need to split. Moreover, there might be the need for people to specify odsobject if it is different from the dbtable.

Arguments:
  • input_spec_opt: options from the input_spec.
Returns:

A string with the odsobject.