lakehouse_engine.utils.extraction.jdbc_extraction_utils

Utilities module for JDBC extraction processes.

  1"""Utilities module for JDBC extraction processes."""
  2
  3from abc import abstractmethod
  4from dataclasses import dataclass
  5from datetime import datetime, timezone
  6from enum import Enum
  7from logging import Logger
  8from typing import Any, Dict, List, Optional, Tuple, Union
  9
 10from lakehouse_engine.core.definitions import InputFormat, InputSpec, ReadType
 11from lakehouse_engine.utils.logging_handler import LoggingHandler
 12
 13
 14class JDBCExtractionType(Enum):
 15    """Standardize the types of extractions we can have from a JDBC source."""
 16
 17    INIT = "init"
 18    DELTA = "delta"
 19
 20
 21@dataclass
 22class JDBCExtraction(object):
 23    """Configurations available for an Extraction from a JDBC source.
 24
 25    These configurations cover:
 26    - user: username to connect to JDBC source.
 27    - password: password to connect to JDBC source (always use secrets,
 28        don't use text passwords in your code).
 29    - url: url to connect to JDBC source.
 30    - dbtable: `database.table` to extract data from.
 31    - calc_upper_bound_schema: custom schema used for the upper bound calculation.
 32    - changelog_table: table of type changelog from which to extract data,
 33        when the extraction type is delta.
 34    - partition_column: column used to split the extraction.
 35    - latest_timestamp_data_location: data location (e.g., s3) containing the data
 36        to get the latest timestamp already loaded into bronze.
 37    - latest_timestamp_data_format: the format of the dataset in
 38        latest_timestamp_data_location. Default: delta.
 39    - extraction_type: type of extraction (delta or init). Default: "delta".
 40    - driver: JDBC driver name. Default: "com.sap.db.jdbc.Driver".
 41    - num_partitions: number of Spark partitions to split the extraction.
 42    - lower_bound: lower bound to decide the partition stride.
 43    - upper_bound: upper bound to decide the partition stride. If
 44        calculate_upper_bound is True, then upperBound will be
 45        derived by our upper bound optimizer, using the partition column.
 46    - default_upper_bound: the value to use as default upper bound in case
 47        the result of the upper bound calculation is None. Default: "1".
 48    - fetch_size: how many rows to fetch per round trip. Default: "100000".
 49    - compress: enable network compression. Default: True.
 50    - custom_schema: specify custom_schema for particular columns of the
 51        returned dataframe in the init/delta extraction of the source table.
 52    - min_timestamp: min timestamp to consider to filter the changelog data.
 53        Default: None and automatically derived from the location provided.
 54        In case this one is provided it has precedence and the calculation
 55        is not done.
 56    - max_timestamp: max timestamp to consider to filter the changelog data.
 57        Default: None and automatically derived from the table having information
 58        about the extraction requests, their timestamps and their status.
 59        In case this one is provided it has precedence and the calculation
 60        is not done.
 61    - generate_predicates: whether to generate predicates automatically or not.
 62        Default: False.
 63    - predicates: list containing all values to partition (if generate_predicates
 64        is used, the manual values provided are ignored). Default: None.
 65    - predicates_add_null: whether to consider null on predicates list.
 66        Default: True.
 67    - extraction_timestamp: the timestamp of the extraction. Default: current time
 68        following the format "%Y%m%d%H%M%S".
 69    - max_timestamp_custom_schema: custom schema used on the max_timestamp derivation
 70        from the table holding the extraction requests information.
 71    """
 72
 73    user: str
 74    password: str
 75    url: str
 76    dbtable: str
 77    calc_upper_bound_schema: Optional[str] = None
 78    changelog_table: Optional[str] = None
 79    partition_column: Optional[str] = None
 80    latest_timestamp_data_location: Optional[str] = None
 81    latest_timestamp_data_format: str = InputFormat.DELTAFILES.value
 82    extraction_type: str = JDBCExtractionType.DELTA.value
 83    driver: str = "com.sap.db.jdbc.Driver"
 84    num_partitions: Optional[int] = None
 85    lower_bound: Optional[Union[int, float, str]] = None
 86    upper_bound: Optional[Union[int, float, str]] = None
 87    default_upper_bound: str = "1"
 88    fetch_size: str = "100000"
 89    compress: bool = True
 90    custom_schema: Optional[str] = None
 91    min_timestamp: Optional[str] = None
 92    max_timestamp: Optional[str] = None
 93    generate_predicates: bool = False
 94    predicates: Optional[List] = None
 95    predicates_add_null: bool = True
 96    extraction_timestamp: str = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
 97    max_timestamp_custom_schema: Optional[str] = None
 98
 99
100class JDBCExtractionUtils(object):
101    """Utils for managing data extraction from particularly relevant JDBC sources."""
102
103    def __init__(self, jdbc_extraction: Any):
104        """Construct JDBCExtractionUtils.
105
106        Args:
107            jdbc_extraction: JDBC Extraction configurations. Can be of type:
108                JDBCExtraction, SAPB4Extraction or SAPBWExtraction.
109        """
110        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
111        self._JDBC_EXTRACTION = jdbc_extraction
112
113    @staticmethod
114    def get_additional_spark_options(
115        input_spec: InputSpec, options: dict, ignore_options: List = None
116    ) -> dict:
117        """Helper to get additional Spark Options initially passed.
118
119        If people provide additional Spark options, not covered by the util function
120        arguments (get_spark_jdbc_options), we need to consider them.
121        Thus, we update the options retrieved by the utils, by checking if there is
122        any Spark option initially provided that is not yet considered in the retrieved
123        options or function arguments and if the value for the key is not None.
124        If these conditions are filled, we add the options and return the complete dict.
125
126        Args:
127            input_spec: the input specification.
128            options: dict with Spark options.
129            ignore_options: list of options to be ignored by the process.
130                Spark read has two different approaches to parallelize
131                reading process, one of them is using upper/lower bound,
132                another one is using predicates, those process can't be
133                executed at the same time, you must choose one of them.
134                By choosing predicates you can't pass lower and upper bound,
135                also can't pass number of partitions and partition column
136                otherwise spark will interpret the execution partitioned by
137                upper and lower bound and will expect to fill all variables.
138                To avoid fill all predicates hardcoded at the acon, there is
139                a feature that automatically generates all predicates for init
140                or delta load based on input partition column, but at the end
141                of the process, partition column can't be passed to the options,
142                because we are choosing predicates execution, that is why to
143                generate predicates we need to pass some options to ignore.
144
145        Returns:
146             a dict with all the options passed as argument, plus the options that
147             were initially provided, but were not used in the util
148             (get_spark_jdbc_options).
149        """
150        func_args = JDBCExtractionUtils.get_spark_jdbc_options.__code__.co_varnames
151
152        if ignore_options is None:
153            ignore_options = []
154        ignore_options = ignore_options + list(options.keys()) + list(func_args)
155
156        return {
157            key: value
158            for key, value in input_spec.options.items()
159            if key not in ignore_options and value is not None
160        }
161
162    def get_predicates(self, predicates_query: str) -> List:
163        """Get the predicates list, based on a predicates query.
164
165        Args:
166            predicates_query: query to use as the basis to get the distinct values for
167                a specified column, based on which predicates are generated.
168
169        Returns:
170            List containing the predicates to use to split the extraction from
171            JDBC sources.
172        """
173        jdbc_args = {
174            "url": self._JDBC_EXTRACTION.url,
175            "table": predicates_query,
176            "properties": {
177                "user": self._JDBC_EXTRACTION.user,
178                "password": self._JDBC_EXTRACTION.password,
179                "driver": self._JDBC_EXTRACTION.driver,
180            },
181        }
182        from lakehouse_engine.io.reader_factory import ReaderFactory
183
184        predicates_df = ReaderFactory.get_data(
185            InputSpec(
186                spec_id="get_predicates",
187                data_format=InputFormat.JDBC.value,
188                read_type=ReadType.BATCH.value,
189                jdbc_args=jdbc_args,
190            )
191        )
192
193        predicates_list = [
194            f"{self._JDBC_EXTRACTION.partition_column}='{row[0]}'"
195            for row in predicates_df.collect()
196        ]
197
198        if self._JDBC_EXTRACTION.predicates_add_null:
199            predicates_list.append(f"{self._JDBC_EXTRACTION.partition_column} IS NULL")
200        self._LOGGER.info(
201            f"The following predicate list was generated: {predicates_list}"
202        )
203
204        return predicates_list
205
206    def get_spark_jdbc_options(self) -> Tuple[dict, dict]:
207        """Get the Spark options to extract data from a JDBC source.
208
209        Returns:
210            The Spark jdbc args dictionary, including the query to submit
211            and also options args dictionary.
212        """
213        options_args: Dict[str, Any] = {
214            "fetchSize": self._JDBC_EXTRACTION.fetch_size,
215            "compress": self._JDBC_EXTRACTION.compress,
216        }
217
218        jdbc_args = {
219            "url": self._JDBC_EXTRACTION.url,
220            "properties": {
221                "user": self._JDBC_EXTRACTION.user,
222                "password": self._JDBC_EXTRACTION.password,
223                "driver": self._JDBC_EXTRACTION.driver,
224            },
225        }
226
227        if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.DELTA.value:
228            jdbc_args["table"], predicates_query = self._get_delta_query()
229        else:
230            jdbc_args["table"], predicates_query = self._get_init_query()
231
232        if self._JDBC_EXTRACTION.custom_schema:
233            options_args["customSchema"] = self._JDBC_EXTRACTION.custom_schema
234
235        if self._JDBC_EXTRACTION.generate_predicates:
236            jdbc_args["predicates"] = self.get_predicates(predicates_query)
237        else:
238            if self._JDBC_EXTRACTION.predicates:
239                jdbc_args["predicates"] = self._JDBC_EXTRACTION.predicates
240            else:
241                options_args = self._get_extraction_partition_opts(
242                    options_args,
243                )
244
245        return options_args, jdbc_args
246
247    def get_spark_jdbc_optimal_upper_bound(self) -> Any:
248        """Get an optimal upperBound to properly split a Spark JDBC extraction.
249
250        Returns:
251             Either an int, date or timestamp to serve as upperBound Spark JDBC option.
252        """
253        options = {}
254        if self._JDBC_EXTRACTION.calc_upper_bound_schema:
255            options["customSchema"] = self._JDBC_EXTRACTION.calc_upper_bound_schema
256
257        table = (
258            self._JDBC_EXTRACTION.dbtable
259            if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.INIT.value
260            else self._JDBC_EXTRACTION.changelog_table
261        )
262        jdbc_args = {
263            "url": self._JDBC_EXTRACTION.url,
264            "table": f"(SELECT COALESCE(MAX({self._JDBC_EXTRACTION.partition_column}), "
265            f"{self._JDBC_EXTRACTION.default_upper_bound}) "
266            f"upper_bound FROM {table})",  # nosec: B608
267            "properties": {
268                "user": self._JDBC_EXTRACTION.user,
269                "password": self._JDBC_EXTRACTION.password,
270                "driver": self._JDBC_EXTRACTION.driver,
271            },
272        }
273
274        from lakehouse_engine.io.reader_factory import ReaderFactory
275
276        upper_bound_df = ReaderFactory.get_data(
277            InputSpec(
278                spec_id="get_optimal_upper_bound",
279                data_format=InputFormat.JDBC.value,
280                read_type=ReadType.BATCH.value,
281                jdbc_args=jdbc_args,
282                options=options,
283            )
284        )
285        upper_bound = upper_bound_df.first()[0]
286
287        if upper_bound is not None:
288            self._LOGGER.info(
289                f"Upper Bound '{upper_bound}' derived from "
290                f"'{self._JDBC_EXTRACTION.dbtable}' using the column "
291                f"'{self._JDBC_EXTRACTION.partition_column}'"
292            )
293            return upper_bound
294        else:
295            raise AttributeError(
296                f"Not able to calculate upper bound from "
297                f"'{self._JDBC_EXTRACTION.dbtable}' using "
298                f"the column '{self._JDBC_EXTRACTION.partition_column}'"
299            )
300
301    def _get_extraction_partition_opts(
302        self,
303        options_args: dict,
304    ) -> dict:
305        """Get an options dict with custom extraction partition options.
306
307        Args:
308            options_args: spark jdbc reader options.
309        """
310        if self._JDBC_EXTRACTION.num_partitions:
311            options_args["numPartitions"] = self._JDBC_EXTRACTION.num_partitions
312        if self._JDBC_EXTRACTION.upper_bound:
313            options_args["upperBound"] = self._JDBC_EXTRACTION.upper_bound
314        if self._JDBC_EXTRACTION.lower_bound:
315            options_args["lowerBound"] = self._JDBC_EXTRACTION.lower_bound
316        if self._JDBC_EXTRACTION.partition_column:
317            options_args["partitionColumn"] = self._JDBC_EXTRACTION.partition_column
318
319        return options_args
320
321    def _get_max_timestamp(self, max_timestamp_query: str) -> str:
322        """Get the max timestamp, based on the provided query.
323
324        Args:
325            max_timestamp_query: the query used to derive the max timestamp.
326
327        Returns:
328            A string having the max timestamp.
329        """
330        jdbc_args = {
331            "url": self._JDBC_EXTRACTION.url,
332            "table": max_timestamp_query,
333            "properties": {
334                "user": self._JDBC_EXTRACTION.user,
335                "password": self._JDBC_EXTRACTION.password,
336                "driver": self._JDBC_EXTRACTION.driver,
337            },
338        }
339        from lakehouse_engine.io.reader_factory import ReaderFactory
340
341        max_timestamp_df = ReaderFactory.get_data(
342            InputSpec(
343                spec_id="get_max_timestamp",
344                data_format=InputFormat.JDBC.value,
345                read_type=ReadType.BATCH.value,
346                jdbc_args=jdbc_args,
347                options={
348                    "customSchema": self._JDBC_EXTRACTION.max_timestamp_custom_schema
349                },
350            )
351        )
352        max_timestamp = max_timestamp_df.first()[0]
353        self._LOGGER.info(
354            f"Max timestamp {max_timestamp} derived from query: {max_timestamp_query}"
355        )
356
357        return str(max_timestamp)
358
359    @abstractmethod
360    def _get_delta_query(self) -> Tuple[str, str]:
361        """Get a query to extract delta (partially) from a source."""
362        pass
363
364    @abstractmethod
365    def _get_init_query(self) -> Tuple[str, str]:
366        """Get a query to extract init (fully) from a source."""
367        pass
class JDBCExtractionType(enum.Enum):
15class JDBCExtractionType(Enum):
16    """Standardize the types of extractions we can have from a JDBC source."""
17
18    INIT = "init"
19    DELTA = "delta"

Standardize the types of extractions we can have from a JDBC source.

INIT = <JDBCExtractionType.INIT: 'init'>
DELTA = <JDBCExtractionType.DELTA: 'delta'>
Inherited Members
enum.Enum
name
value
@dataclass
class JDBCExtraction:
22@dataclass
23class JDBCExtraction(object):
24    """Configurations available for an Extraction from a JDBC source.
25
26    These configurations cover:
27    - user: username to connect to JDBC source.
28    - password: password to connect to JDBC source (always use secrets,
29        don't use text passwords in your code).
30    - url: url to connect to JDBC source.
31    - dbtable: `database.table` to extract data from.
32    - calc_upper_bound_schema: custom schema used for the upper bound calculation.
33    - changelog_table: table of type changelog from which to extract data,
34        when the extraction type is delta.
35    - partition_column: column used to split the extraction.
36    - latest_timestamp_data_location: data location (e.g., s3) containing the data
37        to get the latest timestamp already loaded into bronze.
38    - latest_timestamp_data_format: the format of the dataset in
39        latest_timestamp_data_location. Default: delta.
40    - extraction_type: type of extraction (delta or init). Default: "delta".
41    - driver: JDBC driver name. Default: "com.sap.db.jdbc.Driver".
42    - num_partitions: number of Spark partitions to split the extraction.
43    - lower_bound: lower bound to decide the partition stride.
44    - upper_bound: upper bound to decide the partition stride. If
45        calculate_upper_bound is True, then upperBound will be
46        derived by our upper bound optimizer, using the partition column.
47    - default_upper_bound: the value to use as default upper bound in case
48        the result of the upper bound calculation is None. Default: "1".
49    - fetch_size: how many rows to fetch per round trip. Default: "100000".
50    - compress: enable network compression. Default: True.
51    - custom_schema: specify custom_schema for particular columns of the
52        returned dataframe in the init/delta extraction of the source table.
53    - min_timestamp: min timestamp to consider to filter the changelog data.
54        Default: None and automatically derived from the location provided.
55        In case this one is provided it has precedence and the calculation
56        is not done.
57    - max_timestamp: max timestamp to consider to filter the changelog data.
58        Default: None and automatically derived from the table having information
59        about the extraction requests, their timestamps and their status.
60        In case this one is provided it has precedence and the calculation
61        is not done.
62    - generate_predicates: whether to generate predicates automatically or not.
63        Default: False.
64    - predicates: list containing all values to partition (if generate_predicates
65        is used, the manual values provided are ignored). Default: None.
66    - predicates_add_null: whether to consider null on predicates list.
67        Default: True.
68    - extraction_timestamp: the timestamp of the extraction. Default: current time
69        following the format "%Y%m%d%H%M%S".
70    - max_timestamp_custom_schema: custom schema used on the max_timestamp derivation
71        from the table holding the extraction requests information.
72    """
73
74    user: str
75    password: str
76    url: str
77    dbtable: str
78    calc_upper_bound_schema: Optional[str] = None
79    changelog_table: Optional[str] = None
80    partition_column: Optional[str] = None
81    latest_timestamp_data_location: Optional[str] = None
82    latest_timestamp_data_format: str = InputFormat.DELTAFILES.value
83    extraction_type: str = JDBCExtractionType.DELTA.value
84    driver: str = "com.sap.db.jdbc.Driver"
85    num_partitions: Optional[int] = None
86    lower_bound: Optional[Union[int, float, str]] = None
87    upper_bound: Optional[Union[int, float, str]] = None
88    default_upper_bound: str = "1"
89    fetch_size: str = "100000"
90    compress: bool = True
91    custom_schema: Optional[str] = None
92    min_timestamp: Optional[str] = None
93    max_timestamp: Optional[str] = None
94    generate_predicates: bool = False
95    predicates: Optional[List] = None
96    predicates_add_null: bool = True
97    extraction_timestamp: str = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
98    max_timestamp_custom_schema: Optional[str] = None

Configurations available for an Extraction from a JDBC source.

These configurations cover:

  • user: username to connect to JDBC source.
  • password: password to connect to JDBC source (always use secrets, don't use text passwords in your code).
  • url: url to connect to JDBC source.
  • dbtable: database.table to extract data from.
  • calc_upper_bound_schema: custom schema used for the upper bound calculation.
  • changelog_table: table of type changelog from which to extract data, when the extraction type is delta.
  • partition_column: column used to split the extraction.
  • latest_timestamp_data_location: data location (e.g., s3) containing the data to get the latest timestamp already loaded into bronze.
  • latest_timestamp_data_format: the format of the dataset in latest_timestamp_data_location. Default: delta.
  • extraction_type: type of extraction (delta or init). Default: "delta".
  • driver: JDBC driver name. Default: "com.sap.db.jdbc.Driver".
  • num_partitions: number of Spark partitions to split the extraction.
  • lower_bound: lower bound to decide the partition stride.
  • upper_bound: upper bound to decide the partition stride. If calculate_upper_bound is True, then upperBound will be derived by our upper bound optimizer, using the partition column.
  • default_upper_bound: the value to use as default upper bound in case the result of the upper bound calculation is None. Default: "1".
  • fetch_size: how many rows to fetch per round trip. Default: "100000".
  • compress: enable network compression. Default: True.
  • custom_schema: specify custom_schema for particular columns of the returned dataframe in the init/delta extraction of the source table.
  • min_timestamp: min timestamp to consider to filter the changelog data. Default: None and automatically derived from the location provided. In case this one is provided it has precedence and the calculation is not done.
  • max_timestamp: max timestamp to consider to filter the changelog data. Default: None and automatically derived from the table having information about the extraction requests, their timestamps and their status. In case this one is provided it has precedence and the calculation is not done.
  • generate_predicates: whether to generate predicates automatically or not. Default: False.
  • predicates: list containing all values to partition (if generate_predicates is used, the manual values provided are ignored). Default: None.
  • predicates_add_null: whether to consider null on predicates list. Default: True.
  • extraction_timestamp: the timestamp of the extraction. Default: current time following the format "%Y%m%d%H%M%S".
  • max_timestamp_custom_schema: custom schema used on the max_timestamp derivation from the table holding the extraction requests information.
JDBCExtraction( user: str, password: str, url: str, dbtable: str, calc_upper_bound_schema: Optional[str] = None, changelog_table: Optional[str] = None, partition_column: Optional[str] = None, latest_timestamp_data_location: Optional[str] = None, latest_timestamp_data_format: str = 'delta', extraction_type: str = 'delta', driver: str = 'com.sap.db.jdbc.Driver', num_partitions: Optional[int] = None, lower_bound: Union[int, float, str, NoneType] = None, upper_bound: Union[int, float, str, NoneType] = None, default_upper_bound: str = '1', fetch_size: str = '100000', compress: bool = True, custom_schema: Optional[str] = None, min_timestamp: Optional[str] = None, max_timestamp: Optional[str] = None, generate_predicates: bool = False, predicates: Optional[List] = None, predicates_add_null: bool = True, extraction_timestamp: str = '20241028145833', max_timestamp_custom_schema: Optional[str] = None)
user: str
password: str
url: str
dbtable: str
calc_upper_bound_schema: Optional[str] = None
changelog_table: Optional[str] = None
partition_column: Optional[str] = None
latest_timestamp_data_location: Optional[str] = None
latest_timestamp_data_format: str = 'delta'
extraction_type: str = 'delta'
driver: str = 'com.sap.db.jdbc.Driver'
num_partitions: Optional[int] = None
lower_bound: Union[int, float, str, NoneType] = None
upper_bound: Union[int, float, str, NoneType] = None
default_upper_bound: str = '1'
fetch_size: str = '100000'
compress: bool = True
custom_schema: Optional[str] = None
min_timestamp: Optional[str] = None
max_timestamp: Optional[str] = None
generate_predicates: bool = False
predicates: Optional[List] = None
predicates_add_null: bool = True
extraction_timestamp: str = '20241028145833'
max_timestamp_custom_schema: Optional[str] = None
class JDBCExtractionUtils:
101class JDBCExtractionUtils(object):
102    """Utils for managing data extraction from particularly relevant JDBC sources."""
103
104    def __init__(self, jdbc_extraction: Any):
105        """Construct JDBCExtractionUtils.
106
107        Args:
108            jdbc_extraction: JDBC Extraction configurations. Can be of type:
109                JDBCExtraction, SAPB4Extraction or SAPBWExtraction.
110        """
111        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
112        self._JDBC_EXTRACTION = jdbc_extraction
113
114    @staticmethod
115    def get_additional_spark_options(
116        input_spec: InputSpec, options: dict, ignore_options: List = None
117    ) -> dict:
118        """Helper to get additional Spark Options initially passed.
119
120        If people provide additional Spark options, not covered by the util function
121        arguments (get_spark_jdbc_options), we need to consider them.
122        Thus, we update the options retrieved by the utils, by checking if there is
123        any Spark option initially provided that is not yet considered in the retrieved
124        options or function arguments and if the value for the key is not None.
125        If these conditions are filled, we add the options and return the complete dict.
126
127        Args:
128            input_spec: the input specification.
129            options: dict with Spark options.
130            ignore_options: list of options to be ignored by the process.
131                Spark read has two different approaches to parallelize
132                reading process, one of them is using upper/lower bound,
133                another one is using predicates, those process can't be
134                executed at the same time, you must choose one of them.
135                By choosing predicates you can't pass lower and upper bound,
136                also can't pass number of partitions and partition column
137                otherwise spark will interpret the execution partitioned by
138                upper and lower bound and will expect to fill all variables.
139                To avoid fill all predicates hardcoded at the acon, there is
140                a feature that automatically generates all predicates for init
141                or delta load based on input partition column, but at the end
142                of the process, partition column can't be passed to the options,
143                because we are choosing predicates execution, that is why to
144                generate predicates we need to pass some options to ignore.
145
146        Returns:
147             a dict with all the options passed as argument, plus the options that
148             were initially provided, but were not used in the util
149             (get_spark_jdbc_options).
150        """
151        func_args = JDBCExtractionUtils.get_spark_jdbc_options.__code__.co_varnames
152
153        if ignore_options is None:
154            ignore_options = []
155        ignore_options = ignore_options + list(options.keys()) + list(func_args)
156
157        return {
158            key: value
159            for key, value in input_spec.options.items()
160            if key not in ignore_options and value is not None
161        }
162
163    def get_predicates(self, predicates_query: str) -> List:
164        """Get the predicates list, based on a predicates query.
165
166        Args:
167            predicates_query: query to use as the basis to get the distinct values for
168                a specified column, based on which predicates are generated.
169
170        Returns:
171            List containing the predicates to use to split the extraction from
172            JDBC sources.
173        """
174        jdbc_args = {
175            "url": self._JDBC_EXTRACTION.url,
176            "table": predicates_query,
177            "properties": {
178                "user": self._JDBC_EXTRACTION.user,
179                "password": self._JDBC_EXTRACTION.password,
180                "driver": self._JDBC_EXTRACTION.driver,
181            },
182        }
183        from lakehouse_engine.io.reader_factory import ReaderFactory
184
185        predicates_df = ReaderFactory.get_data(
186            InputSpec(
187                spec_id="get_predicates",
188                data_format=InputFormat.JDBC.value,
189                read_type=ReadType.BATCH.value,
190                jdbc_args=jdbc_args,
191            )
192        )
193
194        predicates_list = [
195            f"{self._JDBC_EXTRACTION.partition_column}='{row[0]}'"
196            for row in predicates_df.collect()
197        ]
198
199        if self._JDBC_EXTRACTION.predicates_add_null:
200            predicates_list.append(f"{self._JDBC_EXTRACTION.partition_column} IS NULL")
201        self._LOGGER.info(
202            f"The following predicate list was generated: {predicates_list}"
203        )
204
205        return predicates_list
206
207    def get_spark_jdbc_options(self) -> Tuple[dict, dict]:
208        """Get the Spark options to extract data from a JDBC source.
209
210        Returns:
211            The Spark jdbc args dictionary, including the query to submit
212            and also options args dictionary.
213        """
214        options_args: Dict[str, Any] = {
215            "fetchSize": self._JDBC_EXTRACTION.fetch_size,
216            "compress": self._JDBC_EXTRACTION.compress,
217        }
218
219        jdbc_args = {
220            "url": self._JDBC_EXTRACTION.url,
221            "properties": {
222                "user": self._JDBC_EXTRACTION.user,
223                "password": self._JDBC_EXTRACTION.password,
224                "driver": self._JDBC_EXTRACTION.driver,
225            },
226        }
227
228        if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.DELTA.value:
229            jdbc_args["table"], predicates_query = self._get_delta_query()
230        else:
231            jdbc_args["table"], predicates_query = self._get_init_query()
232
233        if self._JDBC_EXTRACTION.custom_schema:
234            options_args["customSchema"] = self._JDBC_EXTRACTION.custom_schema
235
236        if self._JDBC_EXTRACTION.generate_predicates:
237            jdbc_args["predicates"] = self.get_predicates(predicates_query)
238        else:
239            if self._JDBC_EXTRACTION.predicates:
240                jdbc_args["predicates"] = self._JDBC_EXTRACTION.predicates
241            else:
242                options_args = self._get_extraction_partition_opts(
243                    options_args,
244                )
245
246        return options_args, jdbc_args
247
248    def get_spark_jdbc_optimal_upper_bound(self) -> Any:
249        """Get an optimal upperBound to properly split a Spark JDBC extraction.
250
251        Returns:
252             Either an int, date or timestamp to serve as upperBound Spark JDBC option.
253        """
254        options = {}
255        if self._JDBC_EXTRACTION.calc_upper_bound_schema:
256            options["customSchema"] = self._JDBC_EXTRACTION.calc_upper_bound_schema
257
258        table = (
259            self._JDBC_EXTRACTION.dbtable
260            if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.INIT.value
261            else self._JDBC_EXTRACTION.changelog_table
262        )
263        jdbc_args = {
264            "url": self._JDBC_EXTRACTION.url,
265            "table": f"(SELECT COALESCE(MAX({self._JDBC_EXTRACTION.partition_column}), "
266            f"{self._JDBC_EXTRACTION.default_upper_bound}) "
267            f"upper_bound FROM {table})",  # nosec: B608
268            "properties": {
269                "user": self._JDBC_EXTRACTION.user,
270                "password": self._JDBC_EXTRACTION.password,
271                "driver": self._JDBC_EXTRACTION.driver,
272            },
273        }
274
275        from lakehouse_engine.io.reader_factory import ReaderFactory
276
277        upper_bound_df = ReaderFactory.get_data(
278            InputSpec(
279                spec_id="get_optimal_upper_bound",
280                data_format=InputFormat.JDBC.value,
281                read_type=ReadType.BATCH.value,
282                jdbc_args=jdbc_args,
283                options=options,
284            )
285        )
286        upper_bound = upper_bound_df.first()[0]
287
288        if upper_bound is not None:
289            self._LOGGER.info(
290                f"Upper Bound '{upper_bound}' derived from "
291                f"'{self._JDBC_EXTRACTION.dbtable}' using the column "
292                f"'{self._JDBC_EXTRACTION.partition_column}'"
293            )
294            return upper_bound
295        else:
296            raise AttributeError(
297                f"Not able to calculate upper bound from "
298                f"'{self._JDBC_EXTRACTION.dbtable}' using "
299                f"the column '{self._JDBC_EXTRACTION.partition_column}'"
300            )
301
302    def _get_extraction_partition_opts(
303        self,
304        options_args: dict,
305    ) -> dict:
306        """Get an options dict with custom extraction partition options.
307
308        Args:
309            options_args: spark jdbc reader options.
310        """
311        if self._JDBC_EXTRACTION.num_partitions:
312            options_args["numPartitions"] = self._JDBC_EXTRACTION.num_partitions
313        if self._JDBC_EXTRACTION.upper_bound:
314            options_args["upperBound"] = self._JDBC_EXTRACTION.upper_bound
315        if self._JDBC_EXTRACTION.lower_bound:
316            options_args["lowerBound"] = self._JDBC_EXTRACTION.lower_bound
317        if self._JDBC_EXTRACTION.partition_column:
318            options_args["partitionColumn"] = self._JDBC_EXTRACTION.partition_column
319
320        return options_args
321
322    def _get_max_timestamp(self, max_timestamp_query: str) -> str:
323        """Get the max timestamp, based on the provided query.
324
325        Args:
326            max_timestamp_query: the query used to derive the max timestamp.
327
328        Returns:
329            A string having the max timestamp.
330        """
331        jdbc_args = {
332            "url": self._JDBC_EXTRACTION.url,
333            "table": max_timestamp_query,
334            "properties": {
335                "user": self._JDBC_EXTRACTION.user,
336                "password": self._JDBC_EXTRACTION.password,
337                "driver": self._JDBC_EXTRACTION.driver,
338            },
339        }
340        from lakehouse_engine.io.reader_factory import ReaderFactory
341
342        max_timestamp_df = ReaderFactory.get_data(
343            InputSpec(
344                spec_id="get_max_timestamp",
345                data_format=InputFormat.JDBC.value,
346                read_type=ReadType.BATCH.value,
347                jdbc_args=jdbc_args,
348                options={
349                    "customSchema": self._JDBC_EXTRACTION.max_timestamp_custom_schema
350                },
351            )
352        )
353        max_timestamp = max_timestamp_df.first()[0]
354        self._LOGGER.info(
355            f"Max timestamp {max_timestamp} derived from query: {max_timestamp_query}"
356        )
357
358        return str(max_timestamp)
359
360    @abstractmethod
361    def _get_delta_query(self) -> Tuple[str, str]:
362        """Get a query to extract delta (partially) from a source."""
363        pass
364
365    @abstractmethod
366    def _get_init_query(self) -> Tuple[str, str]:
367        """Get a query to extract init (fully) from a source."""
368        pass

Utils for managing data extraction from particularly relevant JDBC sources.

JDBCExtractionUtils(jdbc_extraction: Any)
104    def __init__(self, jdbc_extraction: Any):
105        """Construct JDBCExtractionUtils.
106
107        Args:
108            jdbc_extraction: JDBC Extraction configurations. Can be of type:
109                JDBCExtraction, SAPB4Extraction or SAPBWExtraction.
110        """
111        self._LOGGER: Logger = LoggingHandler(__name__).get_logger()
112        self._JDBC_EXTRACTION = jdbc_extraction

Construct JDBCExtractionUtils.

Arguments:
  • jdbc_extraction: JDBC Extraction configurations. Can be of type: JDBCExtraction, SAPB4Extraction or SAPBWExtraction.
@staticmethod
def get_additional_spark_options( input_spec: lakehouse_engine.core.definitions.InputSpec, options: dict, ignore_options: List = None) -> dict:
114    @staticmethod
115    def get_additional_spark_options(
116        input_spec: InputSpec, options: dict, ignore_options: List = None
117    ) -> dict:
118        """Helper to get additional Spark Options initially passed.
119
120        If people provide additional Spark options, not covered by the util function
121        arguments (get_spark_jdbc_options), we need to consider them.
122        Thus, we update the options retrieved by the utils, by checking if there is
123        any Spark option initially provided that is not yet considered in the retrieved
124        options or function arguments and if the value for the key is not None.
125        If these conditions are filled, we add the options and return the complete dict.
126
127        Args:
128            input_spec: the input specification.
129            options: dict with Spark options.
130            ignore_options: list of options to be ignored by the process.
131                Spark read has two different approaches to parallelize
132                reading process, one of them is using upper/lower bound,
133                another one is using predicates, those process can't be
134                executed at the same time, you must choose one of them.
135                By choosing predicates you can't pass lower and upper bound,
136                also can't pass number of partitions and partition column
137                otherwise spark will interpret the execution partitioned by
138                upper and lower bound and will expect to fill all variables.
139                To avoid fill all predicates hardcoded at the acon, there is
140                a feature that automatically generates all predicates for init
141                or delta load based on input partition column, but at the end
142                of the process, partition column can't be passed to the options,
143                because we are choosing predicates execution, that is why to
144                generate predicates we need to pass some options to ignore.
145
146        Returns:
147             a dict with all the options passed as argument, plus the options that
148             were initially provided, but were not used in the util
149             (get_spark_jdbc_options).
150        """
151        func_args = JDBCExtractionUtils.get_spark_jdbc_options.__code__.co_varnames
152
153        if ignore_options is None:
154            ignore_options = []
155        ignore_options = ignore_options + list(options.keys()) + list(func_args)
156
157        return {
158            key: value
159            for key, value in input_spec.options.items()
160            if key not in ignore_options and value is not None
161        }

Helper to get additional Spark Options initially passed.

If people provide additional Spark options, not covered by the util function arguments (get_spark_jdbc_options), we need to consider them. Thus, we update the options retrieved by the utils, by checking if there is any Spark option initially provided that is not yet considered in the retrieved options or function arguments and if the value for the key is not None. If these conditions are filled, we add the options and return the complete dict.

Arguments:
  • input_spec: the input specification.
  • options: dict with Spark options.
  • ignore_options: list of options to be ignored by the process. Spark read has two different approaches to parallelize reading process, one of them is using upper/lower bound, another one is using predicates, those process can't be executed at the same time, you must choose one of them. By choosing predicates you can't pass lower and upper bound, also can't pass number of partitions and partition column otherwise spark will interpret the execution partitioned by upper and lower bound and will expect to fill all variables. To avoid fill all predicates hardcoded at the acon, there is a feature that automatically generates all predicates for init or delta load based on input partition column, but at the end of the process, partition column can't be passed to the options, because we are choosing predicates execution, that is why to generate predicates we need to pass some options to ignore.
Returns:

a dict with all the options passed as argument, plus the options that were initially provided, but were not used in the util (get_spark_jdbc_options).

def get_predicates(self, predicates_query: str) -> List:
163    def get_predicates(self, predicates_query: str) -> List:
164        """Get the predicates list, based on a predicates query.
165
166        Args:
167            predicates_query: query to use as the basis to get the distinct values for
168                a specified column, based on which predicates are generated.
169
170        Returns:
171            List containing the predicates to use to split the extraction from
172            JDBC sources.
173        """
174        jdbc_args = {
175            "url": self._JDBC_EXTRACTION.url,
176            "table": predicates_query,
177            "properties": {
178                "user": self._JDBC_EXTRACTION.user,
179                "password": self._JDBC_EXTRACTION.password,
180                "driver": self._JDBC_EXTRACTION.driver,
181            },
182        }
183        from lakehouse_engine.io.reader_factory import ReaderFactory
184
185        predicates_df = ReaderFactory.get_data(
186            InputSpec(
187                spec_id="get_predicates",
188                data_format=InputFormat.JDBC.value,
189                read_type=ReadType.BATCH.value,
190                jdbc_args=jdbc_args,
191            )
192        )
193
194        predicates_list = [
195            f"{self._JDBC_EXTRACTION.partition_column}='{row[0]}'"
196            for row in predicates_df.collect()
197        ]
198
199        if self._JDBC_EXTRACTION.predicates_add_null:
200            predicates_list.append(f"{self._JDBC_EXTRACTION.partition_column} IS NULL")
201        self._LOGGER.info(
202            f"The following predicate list was generated: {predicates_list}"
203        )
204
205        return predicates_list

Get the predicates list, based on a predicates query.

Arguments:
  • predicates_query: query to use as the basis to get the distinct values for a specified column, based on which predicates are generated.
Returns:

List containing the predicates to use to split the extraction from JDBC sources.

def get_spark_jdbc_options(self) -> Tuple[dict, dict]:
207    def get_spark_jdbc_options(self) -> Tuple[dict, dict]:
208        """Get the Spark options to extract data from a JDBC source.
209
210        Returns:
211            The Spark jdbc args dictionary, including the query to submit
212            and also options args dictionary.
213        """
214        options_args: Dict[str, Any] = {
215            "fetchSize": self._JDBC_EXTRACTION.fetch_size,
216            "compress": self._JDBC_EXTRACTION.compress,
217        }
218
219        jdbc_args = {
220            "url": self._JDBC_EXTRACTION.url,
221            "properties": {
222                "user": self._JDBC_EXTRACTION.user,
223                "password": self._JDBC_EXTRACTION.password,
224                "driver": self._JDBC_EXTRACTION.driver,
225            },
226        }
227
228        if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.DELTA.value:
229            jdbc_args["table"], predicates_query = self._get_delta_query()
230        else:
231            jdbc_args["table"], predicates_query = self._get_init_query()
232
233        if self._JDBC_EXTRACTION.custom_schema:
234            options_args["customSchema"] = self._JDBC_EXTRACTION.custom_schema
235
236        if self._JDBC_EXTRACTION.generate_predicates:
237            jdbc_args["predicates"] = self.get_predicates(predicates_query)
238        else:
239            if self._JDBC_EXTRACTION.predicates:
240                jdbc_args["predicates"] = self._JDBC_EXTRACTION.predicates
241            else:
242                options_args = self._get_extraction_partition_opts(
243                    options_args,
244                )
245
246        return options_args, jdbc_args

Get the Spark options to extract data from a JDBC source.

Returns:

The Spark jdbc args dictionary, including the query to submit and also options args dictionary.

def get_spark_jdbc_optimal_upper_bound(self) -> Any:
248    def get_spark_jdbc_optimal_upper_bound(self) -> Any:
249        """Get an optimal upperBound to properly split a Spark JDBC extraction.
250
251        Returns:
252             Either an int, date or timestamp to serve as upperBound Spark JDBC option.
253        """
254        options = {}
255        if self._JDBC_EXTRACTION.calc_upper_bound_schema:
256            options["customSchema"] = self._JDBC_EXTRACTION.calc_upper_bound_schema
257
258        table = (
259            self._JDBC_EXTRACTION.dbtable
260            if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.INIT.value
261            else self._JDBC_EXTRACTION.changelog_table
262        )
263        jdbc_args = {
264            "url": self._JDBC_EXTRACTION.url,
265            "table": f"(SELECT COALESCE(MAX({self._JDBC_EXTRACTION.partition_column}), "
266            f"{self._JDBC_EXTRACTION.default_upper_bound}) "
267            f"upper_bound FROM {table})",  # nosec: B608
268            "properties": {
269                "user": self._JDBC_EXTRACTION.user,
270                "password": self._JDBC_EXTRACTION.password,
271                "driver": self._JDBC_EXTRACTION.driver,
272            },
273        }
274
275        from lakehouse_engine.io.reader_factory import ReaderFactory
276
277        upper_bound_df = ReaderFactory.get_data(
278            InputSpec(
279                spec_id="get_optimal_upper_bound",
280                data_format=InputFormat.JDBC.value,
281                read_type=ReadType.BATCH.value,
282                jdbc_args=jdbc_args,
283                options=options,
284            )
285        )
286        upper_bound = upper_bound_df.first()[0]
287
288        if upper_bound is not None:
289            self._LOGGER.info(
290                f"Upper Bound '{upper_bound}' derived from "
291                f"'{self._JDBC_EXTRACTION.dbtable}' using the column "
292                f"'{self._JDBC_EXTRACTION.partition_column}'"
293            )
294            return upper_bound
295        else:
296            raise AttributeError(
297                f"Not able to calculate upper bound from "
298                f"'{self._JDBC_EXTRACTION.dbtable}' using "
299                f"the column '{self._JDBC_EXTRACTION.partition_column}'"
300            )

Get an optimal upperBound to properly split a Spark JDBC extraction.

Returns:

Either an int, date or timestamp to serve as upperBound Spark JDBC option.