lakehouse_engine.utils.extraction.jdbc_extraction_utils
Utilities module for JDBC extraction processes.
1"""Utilities module for JDBC extraction processes.""" 2 3from abc import abstractmethod 4from dataclasses import dataclass 5from datetime import datetime, timezone 6from enum import Enum 7from logging import Logger 8from typing import Any, Dict, List, Optional, Tuple, Union 9 10from lakehouse_engine.core.definitions import InputFormat, InputSpec, ReadType 11from lakehouse_engine.utils.logging_handler import LoggingHandler 12 13 14class JDBCExtractionType(Enum): 15 """Standardize the types of extractions we can have from a JDBC source.""" 16 17 INIT = "init" 18 DELTA = "delta" 19 20 21@dataclass 22class JDBCExtraction(object): 23 """Configurations available for an Extraction from a JDBC source. 24 25 These configurations cover: 26 - user: username to connect to JDBC source. 27 - password: password to connect to JDBC source (always use secrets, 28 don't use text passwords in your code). 29 - url: url to connect to JDBC source. 30 - dbtable: `database.table` to extract data from. 31 - calc_upper_bound_schema: custom schema used for the upper bound calculation. 32 - changelog_table: table of type changelog from which to extract data, 33 when the extraction type is delta. 34 - partition_column: column used to split the extraction. 35 - latest_timestamp_data_location: data location (e.g., s3) containing the data 36 to get the latest timestamp already loaded into bronze. 37 - latest_timestamp_data_format: the format of the dataset in 38 latest_timestamp_data_location. Default: delta. 39 - extraction_type: type of extraction (delta or init). Default: "delta". 40 - driver: JDBC driver name. Default: "com.sap.db.jdbc.Driver". 41 - num_partitions: number of Spark partitions to split the extraction. 42 - lower_bound: lower bound to decide the partition stride. 43 - upper_bound: upper bound to decide the partition stride. If 44 calculate_upper_bound is True, then upperBound will be 45 derived by our upper bound optimizer, using the partition column. 46 - default_upper_bound: the value to use as default upper bound in case 47 the result of the upper bound calculation is None. Default: "1". 48 - fetch_size: how many rows to fetch per round trip. Default: "100000". 49 - compress: enable network compression. Default: True. 50 - custom_schema: specify custom_schema for particular columns of the 51 returned dataframe in the init/delta extraction of the source table. 52 - min_timestamp: min timestamp to consider to filter the changelog data. 53 Default: None and automatically derived from the location provided. 54 In case this one is provided it has precedence and the calculation 55 is not done. 56 - max_timestamp: max timestamp to consider to filter the changelog data. 57 Default: None and automatically derived from the table having information 58 about the extraction requests, their timestamps and their status. 59 In case this one is provided it has precedence and the calculation 60 is not done. 61 - generate_predicates: whether to generate predicates automatically or not. 62 Default: False. 63 - predicates: list containing all values to partition (if generate_predicates 64 is used, the manual values provided are ignored). Default: None. 65 - predicates_add_null: whether to consider null on predicates list. 66 Default: True. 67 - extraction_timestamp: the timestamp of the extraction. Default: current time 68 following the format "%Y%m%d%H%M%S". 69 - max_timestamp_custom_schema: custom schema used on the max_timestamp derivation 70 from the table holding the extraction requests information. 71 """ 72 73 user: str 74 password: str 75 url: str 76 dbtable: str 77 calc_upper_bound_schema: Optional[str] = None 78 changelog_table: Optional[str] = None 79 partition_column: Optional[str] = None 80 latest_timestamp_data_location: Optional[str] = None 81 latest_timestamp_data_format: str = InputFormat.DELTAFILES.value 82 extraction_type: str = JDBCExtractionType.DELTA.value 83 driver: str = "com.sap.db.jdbc.Driver" 84 num_partitions: Optional[int] = None 85 lower_bound: Optional[Union[int, float, str]] = None 86 upper_bound: Optional[Union[int, float, str]] = None 87 default_upper_bound: str = "1" 88 fetch_size: str = "100000" 89 compress: bool = True 90 custom_schema: Optional[str] = None 91 min_timestamp: Optional[str] = None 92 max_timestamp: Optional[str] = None 93 generate_predicates: bool = False 94 predicates: Optional[List] = None 95 predicates_add_null: bool = True 96 extraction_timestamp: str = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") 97 max_timestamp_custom_schema: Optional[str] = None 98 99 100class JDBCExtractionUtils(object): 101 """Utils for managing data extraction from particularly relevant JDBC sources.""" 102 103 def __init__(self, jdbc_extraction: Any): 104 """Construct JDBCExtractionUtils. 105 106 Args: 107 jdbc_extraction: JDBC Extraction configurations. Can be of type: 108 JDBCExtraction, SAPB4Extraction or SAPBWExtraction. 109 """ 110 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 111 self._JDBC_EXTRACTION = jdbc_extraction 112 113 @staticmethod 114 def get_additional_spark_options( 115 input_spec: InputSpec, options: dict, ignore_options: List = None 116 ) -> dict: 117 """Helper to get additional Spark Options initially passed. 118 119 If people provide additional Spark options, not covered by the util function 120 arguments (get_spark_jdbc_options), we need to consider them. 121 Thus, we update the options retrieved by the utils, by checking if there is 122 any Spark option initially provided that is not yet considered in the retrieved 123 options or function arguments and if the value for the key is not None. 124 If these conditions are filled, we add the options and return the complete dict. 125 126 Args: 127 input_spec: the input specification. 128 options: dict with Spark options. 129 ignore_options: list of options to be ignored by the process. 130 Spark read has two different approaches to parallelize 131 reading process, one of them is using upper/lower bound, 132 another one is using predicates, those process can't be 133 executed at the same time, you must choose one of them. 134 By choosing predicates you can't pass lower and upper bound, 135 also can't pass number of partitions and partition column 136 otherwise spark will interpret the execution partitioned by 137 upper and lower bound and will expect to fill all variables. 138 To avoid fill all predicates hardcoded at the acon, there is 139 a feature that automatically generates all predicates for init 140 or delta load based on input partition column, but at the end 141 of the process, partition column can't be passed to the options, 142 because we are choosing predicates execution, that is why to 143 generate predicates we need to pass some options to ignore. 144 145 Returns: 146 a dict with all the options passed as argument, plus the options that 147 were initially provided, but were not used in the util 148 (get_spark_jdbc_options). 149 """ 150 func_args = JDBCExtractionUtils.get_spark_jdbc_options.__code__.co_varnames 151 152 if ignore_options is None: 153 ignore_options = [] 154 ignore_options = ignore_options + list(options.keys()) + list(func_args) 155 156 return { 157 key: value 158 for key, value in input_spec.options.items() 159 if key not in ignore_options and value is not None 160 } 161 162 def get_predicates(self, predicates_query: str) -> List: 163 """Get the predicates list, based on a predicates query. 164 165 Args: 166 predicates_query: query to use as the basis to get the distinct values for 167 a specified column, based on which predicates are generated. 168 169 Returns: 170 List containing the predicates to use to split the extraction from 171 JDBC sources. 172 """ 173 jdbc_args = { 174 "url": self._JDBC_EXTRACTION.url, 175 "table": predicates_query, 176 "properties": { 177 "user": self._JDBC_EXTRACTION.user, 178 "password": self._JDBC_EXTRACTION.password, 179 "driver": self._JDBC_EXTRACTION.driver, 180 }, 181 } 182 from lakehouse_engine.io.reader_factory import ReaderFactory 183 184 predicates_df = ReaderFactory.get_data( 185 InputSpec( 186 spec_id="get_predicates", 187 data_format=InputFormat.JDBC.value, 188 read_type=ReadType.BATCH.value, 189 jdbc_args=jdbc_args, 190 ) 191 ) 192 193 predicates_list = [ 194 f"{self._JDBC_EXTRACTION.partition_column}='{row[0]}'" 195 for row in predicates_df.collect() 196 ] 197 198 if self._JDBC_EXTRACTION.predicates_add_null: 199 predicates_list.append(f"{self._JDBC_EXTRACTION.partition_column} IS NULL") 200 self._LOGGER.info( 201 f"The following predicate list was generated: {predicates_list}" 202 ) 203 204 return predicates_list 205 206 def get_spark_jdbc_options(self) -> Tuple[dict, dict]: 207 """Get the Spark options to extract data from a JDBC source. 208 209 Returns: 210 The Spark jdbc args dictionary, including the query to submit 211 and also options args dictionary. 212 """ 213 options_args: Dict[str, Any] = { 214 "fetchSize": self._JDBC_EXTRACTION.fetch_size, 215 "compress": self._JDBC_EXTRACTION.compress, 216 } 217 218 jdbc_args = { 219 "url": self._JDBC_EXTRACTION.url, 220 "properties": { 221 "user": self._JDBC_EXTRACTION.user, 222 "password": self._JDBC_EXTRACTION.password, 223 "driver": self._JDBC_EXTRACTION.driver, 224 }, 225 } 226 227 if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.DELTA.value: 228 jdbc_args["table"], predicates_query = self._get_delta_query() 229 else: 230 jdbc_args["table"], predicates_query = self._get_init_query() 231 232 if self._JDBC_EXTRACTION.custom_schema: 233 options_args["customSchema"] = self._JDBC_EXTRACTION.custom_schema 234 235 if self._JDBC_EXTRACTION.generate_predicates: 236 jdbc_args["predicates"] = self.get_predicates(predicates_query) 237 else: 238 if self._JDBC_EXTRACTION.predicates: 239 jdbc_args["predicates"] = self._JDBC_EXTRACTION.predicates 240 else: 241 options_args = self._get_extraction_partition_opts( 242 options_args, 243 ) 244 245 return options_args, jdbc_args 246 247 def get_spark_jdbc_optimal_upper_bound(self) -> Any: 248 """Get an optimal upperBound to properly split a Spark JDBC extraction. 249 250 Returns: 251 Either an int, date or timestamp to serve as upperBound Spark JDBC option. 252 """ 253 options = {} 254 if self._JDBC_EXTRACTION.calc_upper_bound_schema: 255 options["customSchema"] = self._JDBC_EXTRACTION.calc_upper_bound_schema 256 257 table = ( 258 self._JDBC_EXTRACTION.dbtable 259 if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.INIT.value 260 else self._JDBC_EXTRACTION.changelog_table 261 ) 262 jdbc_args = { 263 "url": self._JDBC_EXTRACTION.url, 264 "table": f"(SELECT COALESCE(MAX({self._JDBC_EXTRACTION.partition_column}), " 265 f"{self._JDBC_EXTRACTION.default_upper_bound}) " 266 f"upper_bound FROM {table})", # nosec: B608 267 "properties": { 268 "user": self._JDBC_EXTRACTION.user, 269 "password": self._JDBC_EXTRACTION.password, 270 "driver": self._JDBC_EXTRACTION.driver, 271 }, 272 } 273 274 from lakehouse_engine.io.reader_factory import ReaderFactory 275 276 upper_bound_df = ReaderFactory.get_data( 277 InputSpec( 278 spec_id="get_optimal_upper_bound", 279 data_format=InputFormat.JDBC.value, 280 read_type=ReadType.BATCH.value, 281 jdbc_args=jdbc_args, 282 options=options, 283 ) 284 ) 285 upper_bound = upper_bound_df.first()[0] 286 287 if upper_bound is not None: 288 self._LOGGER.info( 289 f"Upper Bound '{upper_bound}' derived from " 290 f"'{self._JDBC_EXTRACTION.dbtable}' using the column " 291 f"'{self._JDBC_EXTRACTION.partition_column}'" 292 ) 293 return upper_bound 294 else: 295 raise AttributeError( 296 f"Not able to calculate upper bound from " 297 f"'{self._JDBC_EXTRACTION.dbtable}' using " 298 f"the column '{self._JDBC_EXTRACTION.partition_column}'" 299 ) 300 301 def _get_extraction_partition_opts( 302 self, 303 options_args: dict, 304 ) -> dict: 305 """Get an options dict with custom extraction partition options. 306 307 Args: 308 options_args: spark jdbc reader options. 309 """ 310 if self._JDBC_EXTRACTION.num_partitions: 311 options_args["numPartitions"] = self._JDBC_EXTRACTION.num_partitions 312 if self._JDBC_EXTRACTION.upper_bound: 313 options_args["upperBound"] = self._JDBC_EXTRACTION.upper_bound 314 if self._JDBC_EXTRACTION.lower_bound: 315 options_args["lowerBound"] = self._JDBC_EXTRACTION.lower_bound 316 if self._JDBC_EXTRACTION.partition_column: 317 options_args["partitionColumn"] = self._JDBC_EXTRACTION.partition_column 318 319 return options_args 320 321 def _get_max_timestamp(self, max_timestamp_query: str) -> str: 322 """Get the max timestamp, based on the provided query. 323 324 Args: 325 max_timestamp_query: the query used to derive the max timestamp. 326 327 Returns: 328 A string having the max timestamp. 329 """ 330 jdbc_args = { 331 "url": self._JDBC_EXTRACTION.url, 332 "table": max_timestamp_query, 333 "properties": { 334 "user": self._JDBC_EXTRACTION.user, 335 "password": self._JDBC_EXTRACTION.password, 336 "driver": self._JDBC_EXTRACTION.driver, 337 }, 338 } 339 from lakehouse_engine.io.reader_factory import ReaderFactory 340 341 max_timestamp_df = ReaderFactory.get_data( 342 InputSpec( 343 spec_id="get_max_timestamp", 344 data_format=InputFormat.JDBC.value, 345 read_type=ReadType.BATCH.value, 346 jdbc_args=jdbc_args, 347 options={ 348 "customSchema": self._JDBC_EXTRACTION.max_timestamp_custom_schema 349 }, 350 ) 351 ) 352 max_timestamp = max_timestamp_df.first()[0] 353 self._LOGGER.info( 354 f"Max timestamp {max_timestamp} derived from query: {max_timestamp_query}" 355 ) 356 357 return str(max_timestamp) 358 359 @abstractmethod 360 def _get_delta_query(self) -> Tuple[str, str]: 361 """Get a query to extract delta (partially) from a source.""" 362 pass 363 364 @abstractmethod 365 def _get_init_query(self) -> Tuple[str, str]: 366 """Get a query to extract init (fully) from a source.""" 367 pass
15class JDBCExtractionType(Enum): 16 """Standardize the types of extractions we can have from a JDBC source.""" 17 18 INIT = "init" 19 DELTA = "delta"
Standardize the types of extractions we can have from a JDBC source.
Inherited Members
- enum.Enum
- name
- value
22@dataclass 23class JDBCExtraction(object): 24 """Configurations available for an Extraction from a JDBC source. 25 26 These configurations cover: 27 - user: username to connect to JDBC source. 28 - password: password to connect to JDBC source (always use secrets, 29 don't use text passwords in your code). 30 - url: url to connect to JDBC source. 31 - dbtable: `database.table` to extract data from. 32 - calc_upper_bound_schema: custom schema used for the upper bound calculation. 33 - changelog_table: table of type changelog from which to extract data, 34 when the extraction type is delta. 35 - partition_column: column used to split the extraction. 36 - latest_timestamp_data_location: data location (e.g., s3) containing the data 37 to get the latest timestamp already loaded into bronze. 38 - latest_timestamp_data_format: the format of the dataset in 39 latest_timestamp_data_location. Default: delta. 40 - extraction_type: type of extraction (delta or init). Default: "delta". 41 - driver: JDBC driver name. Default: "com.sap.db.jdbc.Driver". 42 - num_partitions: number of Spark partitions to split the extraction. 43 - lower_bound: lower bound to decide the partition stride. 44 - upper_bound: upper bound to decide the partition stride. If 45 calculate_upper_bound is True, then upperBound will be 46 derived by our upper bound optimizer, using the partition column. 47 - default_upper_bound: the value to use as default upper bound in case 48 the result of the upper bound calculation is None. Default: "1". 49 - fetch_size: how many rows to fetch per round trip. Default: "100000". 50 - compress: enable network compression. Default: True. 51 - custom_schema: specify custom_schema for particular columns of the 52 returned dataframe in the init/delta extraction of the source table. 53 - min_timestamp: min timestamp to consider to filter the changelog data. 54 Default: None and automatically derived from the location provided. 55 In case this one is provided it has precedence and the calculation 56 is not done. 57 - max_timestamp: max timestamp to consider to filter the changelog data. 58 Default: None and automatically derived from the table having information 59 about the extraction requests, their timestamps and their status. 60 In case this one is provided it has precedence and the calculation 61 is not done. 62 - generate_predicates: whether to generate predicates automatically or not. 63 Default: False. 64 - predicates: list containing all values to partition (if generate_predicates 65 is used, the manual values provided are ignored). Default: None. 66 - predicates_add_null: whether to consider null on predicates list. 67 Default: True. 68 - extraction_timestamp: the timestamp of the extraction. Default: current time 69 following the format "%Y%m%d%H%M%S". 70 - max_timestamp_custom_schema: custom schema used on the max_timestamp derivation 71 from the table holding the extraction requests information. 72 """ 73 74 user: str 75 password: str 76 url: str 77 dbtable: str 78 calc_upper_bound_schema: Optional[str] = None 79 changelog_table: Optional[str] = None 80 partition_column: Optional[str] = None 81 latest_timestamp_data_location: Optional[str] = None 82 latest_timestamp_data_format: str = InputFormat.DELTAFILES.value 83 extraction_type: str = JDBCExtractionType.DELTA.value 84 driver: str = "com.sap.db.jdbc.Driver" 85 num_partitions: Optional[int] = None 86 lower_bound: Optional[Union[int, float, str]] = None 87 upper_bound: Optional[Union[int, float, str]] = None 88 default_upper_bound: str = "1" 89 fetch_size: str = "100000" 90 compress: bool = True 91 custom_schema: Optional[str] = None 92 min_timestamp: Optional[str] = None 93 max_timestamp: Optional[str] = None 94 generate_predicates: bool = False 95 predicates: Optional[List] = None 96 predicates_add_null: bool = True 97 extraction_timestamp: str = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") 98 max_timestamp_custom_schema: Optional[str] = None
Configurations available for an Extraction from a JDBC source.
These configurations cover:
- user: username to connect to JDBC source.
- password: password to connect to JDBC source (always use secrets, don't use text passwords in your code).
- url: url to connect to JDBC source.
- dbtable:
database.table
to extract data from. - calc_upper_bound_schema: custom schema used for the upper bound calculation.
- changelog_table: table of type changelog from which to extract data, when the extraction type is delta.
- partition_column: column used to split the extraction.
- latest_timestamp_data_location: data location (e.g., s3) containing the data to get the latest timestamp already loaded into bronze.
- latest_timestamp_data_format: the format of the dataset in latest_timestamp_data_location. Default: delta.
- extraction_type: type of extraction (delta or init). Default: "delta".
- driver: JDBC driver name. Default: "com.sap.db.jdbc.Driver".
- num_partitions: number of Spark partitions to split the extraction.
- lower_bound: lower bound to decide the partition stride.
- upper_bound: upper bound to decide the partition stride. If calculate_upper_bound is True, then upperBound will be derived by our upper bound optimizer, using the partition column.
- default_upper_bound: the value to use as default upper bound in case the result of the upper bound calculation is None. Default: "1".
- fetch_size: how many rows to fetch per round trip. Default: "100000".
- compress: enable network compression. Default: True.
- custom_schema: specify custom_schema for particular columns of the returned dataframe in the init/delta extraction of the source table.
- min_timestamp: min timestamp to consider to filter the changelog data. Default: None and automatically derived from the location provided. In case this one is provided it has precedence and the calculation is not done.
- max_timestamp: max timestamp to consider to filter the changelog data. Default: None and automatically derived from the table having information about the extraction requests, their timestamps and their status. In case this one is provided it has precedence and the calculation is not done.
- generate_predicates: whether to generate predicates automatically or not. Default: False.
- predicates: list containing all values to partition (if generate_predicates is used, the manual values provided are ignored). Default: None.
- predicates_add_null: whether to consider null on predicates list. Default: True.
- extraction_timestamp: the timestamp of the extraction. Default: current time following the format "%Y%m%d%H%M%S".
- max_timestamp_custom_schema: custom schema used on the max_timestamp derivation from the table holding the extraction requests information.
101class JDBCExtractionUtils(object): 102 """Utils for managing data extraction from particularly relevant JDBC sources.""" 103 104 def __init__(self, jdbc_extraction: Any): 105 """Construct JDBCExtractionUtils. 106 107 Args: 108 jdbc_extraction: JDBC Extraction configurations. Can be of type: 109 JDBCExtraction, SAPB4Extraction or SAPBWExtraction. 110 """ 111 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 112 self._JDBC_EXTRACTION = jdbc_extraction 113 114 @staticmethod 115 def get_additional_spark_options( 116 input_spec: InputSpec, options: dict, ignore_options: List = None 117 ) -> dict: 118 """Helper to get additional Spark Options initially passed. 119 120 If people provide additional Spark options, not covered by the util function 121 arguments (get_spark_jdbc_options), we need to consider them. 122 Thus, we update the options retrieved by the utils, by checking if there is 123 any Spark option initially provided that is not yet considered in the retrieved 124 options or function arguments and if the value for the key is not None. 125 If these conditions are filled, we add the options and return the complete dict. 126 127 Args: 128 input_spec: the input specification. 129 options: dict with Spark options. 130 ignore_options: list of options to be ignored by the process. 131 Spark read has two different approaches to parallelize 132 reading process, one of them is using upper/lower bound, 133 another one is using predicates, those process can't be 134 executed at the same time, you must choose one of them. 135 By choosing predicates you can't pass lower and upper bound, 136 also can't pass number of partitions and partition column 137 otherwise spark will interpret the execution partitioned by 138 upper and lower bound and will expect to fill all variables. 139 To avoid fill all predicates hardcoded at the acon, there is 140 a feature that automatically generates all predicates for init 141 or delta load based on input partition column, but at the end 142 of the process, partition column can't be passed to the options, 143 because we are choosing predicates execution, that is why to 144 generate predicates we need to pass some options to ignore. 145 146 Returns: 147 a dict with all the options passed as argument, plus the options that 148 were initially provided, but were not used in the util 149 (get_spark_jdbc_options). 150 """ 151 func_args = JDBCExtractionUtils.get_spark_jdbc_options.__code__.co_varnames 152 153 if ignore_options is None: 154 ignore_options = [] 155 ignore_options = ignore_options + list(options.keys()) + list(func_args) 156 157 return { 158 key: value 159 for key, value in input_spec.options.items() 160 if key not in ignore_options and value is not None 161 } 162 163 def get_predicates(self, predicates_query: str) -> List: 164 """Get the predicates list, based on a predicates query. 165 166 Args: 167 predicates_query: query to use as the basis to get the distinct values for 168 a specified column, based on which predicates are generated. 169 170 Returns: 171 List containing the predicates to use to split the extraction from 172 JDBC sources. 173 """ 174 jdbc_args = { 175 "url": self._JDBC_EXTRACTION.url, 176 "table": predicates_query, 177 "properties": { 178 "user": self._JDBC_EXTRACTION.user, 179 "password": self._JDBC_EXTRACTION.password, 180 "driver": self._JDBC_EXTRACTION.driver, 181 }, 182 } 183 from lakehouse_engine.io.reader_factory import ReaderFactory 184 185 predicates_df = ReaderFactory.get_data( 186 InputSpec( 187 spec_id="get_predicates", 188 data_format=InputFormat.JDBC.value, 189 read_type=ReadType.BATCH.value, 190 jdbc_args=jdbc_args, 191 ) 192 ) 193 194 predicates_list = [ 195 f"{self._JDBC_EXTRACTION.partition_column}='{row[0]}'" 196 for row in predicates_df.collect() 197 ] 198 199 if self._JDBC_EXTRACTION.predicates_add_null: 200 predicates_list.append(f"{self._JDBC_EXTRACTION.partition_column} IS NULL") 201 self._LOGGER.info( 202 f"The following predicate list was generated: {predicates_list}" 203 ) 204 205 return predicates_list 206 207 def get_spark_jdbc_options(self) -> Tuple[dict, dict]: 208 """Get the Spark options to extract data from a JDBC source. 209 210 Returns: 211 The Spark jdbc args dictionary, including the query to submit 212 and also options args dictionary. 213 """ 214 options_args: Dict[str, Any] = { 215 "fetchSize": self._JDBC_EXTRACTION.fetch_size, 216 "compress": self._JDBC_EXTRACTION.compress, 217 } 218 219 jdbc_args = { 220 "url": self._JDBC_EXTRACTION.url, 221 "properties": { 222 "user": self._JDBC_EXTRACTION.user, 223 "password": self._JDBC_EXTRACTION.password, 224 "driver": self._JDBC_EXTRACTION.driver, 225 }, 226 } 227 228 if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.DELTA.value: 229 jdbc_args["table"], predicates_query = self._get_delta_query() 230 else: 231 jdbc_args["table"], predicates_query = self._get_init_query() 232 233 if self._JDBC_EXTRACTION.custom_schema: 234 options_args["customSchema"] = self._JDBC_EXTRACTION.custom_schema 235 236 if self._JDBC_EXTRACTION.generate_predicates: 237 jdbc_args["predicates"] = self.get_predicates(predicates_query) 238 else: 239 if self._JDBC_EXTRACTION.predicates: 240 jdbc_args["predicates"] = self._JDBC_EXTRACTION.predicates 241 else: 242 options_args = self._get_extraction_partition_opts( 243 options_args, 244 ) 245 246 return options_args, jdbc_args 247 248 def get_spark_jdbc_optimal_upper_bound(self) -> Any: 249 """Get an optimal upperBound to properly split a Spark JDBC extraction. 250 251 Returns: 252 Either an int, date or timestamp to serve as upperBound Spark JDBC option. 253 """ 254 options = {} 255 if self._JDBC_EXTRACTION.calc_upper_bound_schema: 256 options["customSchema"] = self._JDBC_EXTRACTION.calc_upper_bound_schema 257 258 table = ( 259 self._JDBC_EXTRACTION.dbtable 260 if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.INIT.value 261 else self._JDBC_EXTRACTION.changelog_table 262 ) 263 jdbc_args = { 264 "url": self._JDBC_EXTRACTION.url, 265 "table": f"(SELECT COALESCE(MAX({self._JDBC_EXTRACTION.partition_column}), " 266 f"{self._JDBC_EXTRACTION.default_upper_bound}) " 267 f"upper_bound FROM {table})", # nosec: B608 268 "properties": { 269 "user": self._JDBC_EXTRACTION.user, 270 "password": self._JDBC_EXTRACTION.password, 271 "driver": self._JDBC_EXTRACTION.driver, 272 }, 273 } 274 275 from lakehouse_engine.io.reader_factory import ReaderFactory 276 277 upper_bound_df = ReaderFactory.get_data( 278 InputSpec( 279 spec_id="get_optimal_upper_bound", 280 data_format=InputFormat.JDBC.value, 281 read_type=ReadType.BATCH.value, 282 jdbc_args=jdbc_args, 283 options=options, 284 ) 285 ) 286 upper_bound = upper_bound_df.first()[0] 287 288 if upper_bound is not None: 289 self._LOGGER.info( 290 f"Upper Bound '{upper_bound}' derived from " 291 f"'{self._JDBC_EXTRACTION.dbtable}' using the column " 292 f"'{self._JDBC_EXTRACTION.partition_column}'" 293 ) 294 return upper_bound 295 else: 296 raise AttributeError( 297 f"Not able to calculate upper bound from " 298 f"'{self._JDBC_EXTRACTION.dbtable}' using " 299 f"the column '{self._JDBC_EXTRACTION.partition_column}'" 300 ) 301 302 def _get_extraction_partition_opts( 303 self, 304 options_args: dict, 305 ) -> dict: 306 """Get an options dict with custom extraction partition options. 307 308 Args: 309 options_args: spark jdbc reader options. 310 """ 311 if self._JDBC_EXTRACTION.num_partitions: 312 options_args["numPartitions"] = self._JDBC_EXTRACTION.num_partitions 313 if self._JDBC_EXTRACTION.upper_bound: 314 options_args["upperBound"] = self._JDBC_EXTRACTION.upper_bound 315 if self._JDBC_EXTRACTION.lower_bound: 316 options_args["lowerBound"] = self._JDBC_EXTRACTION.lower_bound 317 if self._JDBC_EXTRACTION.partition_column: 318 options_args["partitionColumn"] = self._JDBC_EXTRACTION.partition_column 319 320 return options_args 321 322 def _get_max_timestamp(self, max_timestamp_query: str) -> str: 323 """Get the max timestamp, based on the provided query. 324 325 Args: 326 max_timestamp_query: the query used to derive the max timestamp. 327 328 Returns: 329 A string having the max timestamp. 330 """ 331 jdbc_args = { 332 "url": self._JDBC_EXTRACTION.url, 333 "table": max_timestamp_query, 334 "properties": { 335 "user": self._JDBC_EXTRACTION.user, 336 "password": self._JDBC_EXTRACTION.password, 337 "driver": self._JDBC_EXTRACTION.driver, 338 }, 339 } 340 from lakehouse_engine.io.reader_factory import ReaderFactory 341 342 max_timestamp_df = ReaderFactory.get_data( 343 InputSpec( 344 spec_id="get_max_timestamp", 345 data_format=InputFormat.JDBC.value, 346 read_type=ReadType.BATCH.value, 347 jdbc_args=jdbc_args, 348 options={ 349 "customSchema": self._JDBC_EXTRACTION.max_timestamp_custom_schema 350 }, 351 ) 352 ) 353 max_timestamp = max_timestamp_df.first()[0] 354 self._LOGGER.info( 355 f"Max timestamp {max_timestamp} derived from query: {max_timestamp_query}" 356 ) 357 358 return str(max_timestamp) 359 360 @abstractmethod 361 def _get_delta_query(self) -> Tuple[str, str]: 362 """Get a query to extract delta (partially) from a source.""" 363 pass 364 365 @abstractmethod 366 def _get_init_query(self) -> Tuple[str, str]: 367 """Get a query to extract init (fully) from a source.""" 368 pass
Utils for managing data extraction from particularly relevant JDBC sources.
104 def __init__(self, jdbc_extraction: Any): 105 """Construct JDBCExtractionUtils. 106 107 Args: 108 jdbc_extraction: JDBC Extraction configurations. Can be of type: 109 JDBCExtraction, SAPB4Extraction or SAPBWExtraction. 110 """ 111 self._LOGGER: Logger = LoggingHandler(__name__).get_logger() 112 self._JDBC_EXTRACTION = jdbc_extraction
Construct JDBCExtractionUtils.
Arguments:
- jdbc_extraction: JDBC Extraction configurations. Can be of type: JDBCExtraction, SAPB4Extraction or SAPBWExtraction.
114 @staticmethod 115 def get_additional_spark_options( 116 input_spec: InputSpec, options: dict, ignore_options: List = None 117 ) -> dict: 118 """Helper to get additional Spark Options initially passed. 119 120 If people provide additional Spark options, not covered by the util function 121 arguments (get_spark_jdbc_options), we need to consider them. 122 Thus, we update the options retrieved by the utils, by checking if there is 123 any Spark option initially provided that is not yet considered in the retrieved 124 options or function arguments and if the value for the key is not None. 125 If these conditions are filled, we add the options and return the complete dict. 126 127 Args: 128 input_spec: the input specification. 129 options: dict with Spark options. 130 ignore_options: list of options to be ignored by the process. 131 Spark read has two different approaches to parallelize 132 reading process, one of them is using upper/lower bound, 133 another one is using predicates, those process can't be 134 executed at the same time, you must choose one of them. 135 By choosing predicates you can't pass lower and upper bound, 136 also can't pass number of partitions and partition column 137 otherwise spark will interpret the execution partitioned by 138 upper and lower bound and will expect to fill all variables. 139 To avoid fill all predicates hardcoded at the acon, there is 140 a feature that automatically generates all predicates for init 141 or delta load based on input partition column, but at the end 142 of the process, partition column can't be passed to the options, 143 because we are choosing predicates execution, that is why to 144 generate predicates we need to pass some options to ignore. 145 146 Returns: 147 a dict with all the options passed as argument, plus the options that 148 were initially provided, but were not used in the util 149 (get_spark_jdbc_options). 150 """ 151 func_args = JDBCExtractionUtils.get_spark_jdbc_options.__code__.co_varnames 152 153 if ignore_options is None: 154 ignore_options = [] 155 ignore_options = ignore_options + list(options.keys()) + list(func_args) 156 157 return { 158 key: value 159 for key, value in input_spec.options.items() 160 if key not in ignore_options and value is not None 161 }
Helper to get additional Spark Options initially passed.
If people provide additional Spark options, not covered by the util function arguments (get_spark_jdbc_options), we need to consider them. Thus, we update the options retrieved by the utils, by checking if there is any Spark option initially provided that is not yet considered in the retrieved options or function arguments and if the value for the key is not None. If these conditions are filled, we add the options and return the complete dict.
Arguments:
- input_spec: the input specification.
- options: dict with Spark options.
- ignore_options: list of options to be ignored by the process. Spark read has two different approaches to parallelize reading process, one of them is using upper/lower bound, another one is using predicates, those process can't be executed at the same time, you must choose one of them. By choosing predicates you can't pass lower and upper bound, also can't pass number of partitions and partition column otherwise spark will interpret the execution partitioned by upper and lower bound and will expect to fill all variables. To avoid fill all predicates hardcoded at the acon, there is a feature that automatically generates all predicates for init or delta load based on input partition column, but at the end of the process, partition column can't be passed to the options, because we are choosing predicates execution, that is why to generate predicates we need to pass some options to ignore.
Returns:
a dict with all the options passed as argument, plus the options that were initially provided, but were not used in the util (get_spark_jdbc_options).
163 def get_predicates(self, predicates_query: str) -> List: 164 """Get the predicates list, based on a predicates query. 165 166 Args: 167 predicates_query: query to use as the basis to get the distinct values for 168 a specified column, based on which predicates are generated. 169 170 Returns: 171 List containing the predicates to use to split the extraction from 172 JDBC sources. 173 """ 174 jdbc_args = { 175 "url": self._JDBC_EXTRACTION.url, 176 "table": predicates_query, 177 "properties": { 178 "user": self._JDBC_EXTRACTION.user, 179 "password": self._JDBC_EXTRACTION.password, 180 "driver": self._JDBC_EXTRACTION.driver, 181 }, 182 } 183 from lakehouse_engine.io.reader_factory import ReaderFactory 184 185 predicates_df = ReaderFactory.get_data( 186 InputSpec( 187 spec_id="get_predicates", 188 data_format=InputFormat.JDBC.value, 189 read_type=ReadType.BATCH.value, 190 jdbc_args=jdbc_args, 191 ) 192 ) 193 194 predicates_list = [ 195 f"{self._JDBC_EXTRACTION.partition_column}='{row[0]}'" 196 for row in predicates_df.collect() 197 ] 198 199 if self._JDBC_EXTRACTION.predicates_add_null: 200 predicates_list.append(f"{self._JDBC_EXTRACTION.partition_column} IS NULL") 201 self._LOGGER.info( 202 f"The following predicate list was generated: {predicates_list}" 203 ) 204 205 return predicates_list
Get the predicates list, based on a predicates query.
Arguments:
- predicates_query: query to use as the basis to get the distinct values for a specified column, based on which predicates are generated.
Returns:
List containing the predicates to use to split the extraction from JDBC sources.
207 def get_spark_jdbc_options(self) -> Tuple[dict, dict]: 208 """Get the Spark options to extract data from a JDBC source. 209 210 Returns: 211 The Spark jdbc args dictionary, including the query to submit 212 and also options args dictionary. 213 """ 214 options_args: Dict[str, Any] = { 215 "fetchSize": self._JDBC_EXTRACTION.fetch_size, 216 "compress": self._JDBC_EXTRACTION.compress, 217 } 218 219 jdbc_args = { 220 "url": self._JDBC_EXTRACTION.url, 221 "properties": { 222 "user": self._JDBC_EXTRACTION.user, 223 "password": self._JDBC_EXTRACTION.password, 224 "driver": self._JDBC_EXTRACTION.driver, 225 }, 226 } 227 228 if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.DELTA.value: 229 jdbc_args["table"], predicates_query = self._get_delta_query() 230 else: 231 jdbc_args["table"], predicates_query = self._get_init_query() 232 233 if self._JDBC_EXTRACTION.custom_schema: 234 options_args["customSchema"] = self._JDBC_EXTRACTION.custom_schema 235 236 if self._JDBC_EXTRACTION.generate_predicates: 237 jdbc_args["predicates"] = self.get_predicates(predicates_query) 238 else: 239 if self._JDBC_EXTRACTION.predicates: 240 jdbc_args["predicates"] = self._JDBC_EXTRACTION.predicates 241 else: 242 options_args = self._get_extraction_partition_opts( 243 options_args, 244 ) 245 246 return options_args, jdbc_args
Get the Spark options to extract data from a JDBC source.
Returns:
The Spark jdbc args dictionary, including the query to submit and also options args dictionary.
248 def get_spark_jdbc_optimal_upper_bound(self) -> Any: 249 """Get an optimal upperBound to properly split a Spark JDBC extraction. 250 251 Returns: 252 Either an int, date or timestamp to serve as upperBound Spark JDBC option. 253 """ 254 options = {} 255 if self._JDBC_EXTRACTION.calc_upper_bound_schema: 256 options["customSchema"] = self._JDBC_EXTRACTION.calc_upper_bound_schema 257 258 table = ( 259 self._JDBC_EXTRACTION.dbtable 260 if self._JDBC_EXTRACTION.extraction_type == JDBCExtractionType.INIT.value 261 else self._JDBC_EXTRACTION.changelog_table 262 ) 263 jdbc_args = { 264 "url": self._JDBC_EXTRACTION.url, 265 "table": f"(SELECT COALESCE(MAX({self._JDBC_EXTRACTION.partition_column}), " 266 f"{self._JDBC_EXTRACTION.default_upper_bound}) " 267 f"upper_bound FROM {table})", # nosec: B608 268 "properties": { 269 "user": self._JDBC_EXTRACTION.user, 270 "password": self._JDBC_EXTRACTION.password, 271 "driver": self._JDBC_EXTRACTION.driver, 272 }, 273 } 274 275 from lakehouse_engine.io.reader_factory import ReaderFactory 276 277 upper_bound_df = ReaderFactory.get_data( 278 InputSpec( 279 spec_id="get_optimal_upper_bound", 280 data_format=InputFormat.JDBC.value, 281 read_type=ReadType.BATCH.value, 282 jdbc_args=jdbc_args, 283 options=options, 284 ) 285 ) 286 upper_bound = upper_bound_df.first()[0] 287 288 if upper_bound is not None: 289 self._LOGGER.info( 290 f"Upper Bound '{upper_bound}' derived from " 291 f"'{self._JDBC_EXTRACTION.dbtable}' using the column " 292 f"'{self._JDBC_EXTRACTION.partition_column}'" 293 ) 294 return upper_bound 295 else: 296 raise AttributeError( 297 f"Not able to calculate upper bound from " 298 f"'{self._JDBC_EXTRACTION.dbtable}' using " 299 f"the column '{self._JDBC_EXTRACTION.partition_column}'" 300 )
Get an optimal upperBound to properly split a Spark JDBC extraction.
Returns:
Either an int, date or timestamp to serve as upperBound Spark JDBC option.