Jdbc extraction utils
Utilities module for JDBC extraction processes.
JDBCExtraction
dataclass
¶
Bases: object
Configurations available for an Extraction from a JDBC source.
These configurations cover:
- user: username to connect to JDBC source.
- password: password to connect to JDBC source (always use secrets, don't use text passwords in your code).
- url: url to connect to JDBC source.
- dbtable:
database.table
to extract data from. - calc_upper_bound_schema: custom schema used for the upper bound calculation.
- changelog_table: table of type changelog from which to extract data, when the extraction type is delta.
- partition_column: column used to split the extraction.
- latest_timestamp_data_location: data location (e.g., s3) containing the data to get the latest timestamp already loaded into bronze.
- latest_timestamp_data_format: the format of the dataset in latest_timestamp_data_location. Default: delta.
- extraction_type: type of extraction (delta or init). Default: "delta".
- driver: JDBC driver name. Default: "com.sap.db.jdbc.Driver".
- num_partitions: number of Spark partitions to split the extraction.
- lower_bound: lower bound to decide the partition stride.
- upper_bound: upper bound to decide the partition stride. If calculate_upper_bound is True, then upperBound will be derived by our upper bound optimizer, using the partition column.
- default_upper_bound: the value to use as default upper bound in case the result of the upper bound calculation is None. Default: "1".
- fetch_size: how many rows to fetch per round trip. Default: "100000".
- compress: enable network compression. Default: True.
- custom_schema: specify custom_schema for particular columns of the returned dataframe in the init/delta extraction of the source table.
- min_timestamp: min timestamp to consider to filter the changelog data. Default: None and automatically derived from the location provided. In case this one is provided it has precedence and the calculation is not done.
- max_timestamp: max timestamp to consider to filter the changelog data. Default: None and automatically derived from the table having information about the extraction requests, their timestamps and their status. In case this one is provided it has precedence and the calculation is not done.
- generate_predicates: whether to generate predicates automatically or not. Default: False.
- predicates: list containing all values to partition (if generate_predicates is used, the manual values provided are ignored). Default: None.
- predicates_add_null: whether to consider null on predicates list. Default: True.
- extraction_timestamp: the timestamp of the extraction. Default: current time following the format "%Y%m%d%H%M%S".
- max_timestamp_custom_schema: custom schema used on the max_timestamp derivation from the table holding the extraction requests information.
Source code in mkdocs/lakehouse_engine/packages/utils/extraction/jdbc_extraction_utils.py
JDBCExtractionType
¶
Bases: Enum
Standardize the types of extractions we can have from a JDBC source.
Source code in mkdocs/lakehouse_engine/packages/utils/extraction/jdbc_extraction_utils.py
JDBCExtractionUtils
¶
Bases: object
Utils for managing data extraction from particularly relevant JDBC sources.
Source code in mkdocs/lakehouse_engine/packages/utils/extraction/jdbc_extraction_utils.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
|
__init__(jdbc_extraction)
¶
Construct JDBCExtractionUtils.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
jdbc_extraction |
Any
|
JDBC Extraction configurations. Can be of type: JDBCExtraction, SAPB4Extraction or SAPBWExtraction. |
required |
Source code in mkdocs/lakehouse_engine/packages/utils/extraction/jdbc_extraction_utils.py
get_additional_spark_options(input_spec, options, ignore_options=None)
staticmethod
¶
Helper to get additional Spark Options initially passed.
If people provide additional Spark options, not covered by the util function arguments (get_spark_jdbc_options), we need to consider them. Thus, we update the options retrieved by the utils, by checking if there is any Spark option initially provided that is not yet considered in the retrieved options or function arguments and if the value for the key is not None. If these conditions are filled, we add the options and return the complete dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_spec |
InputSpec
|
the input specification. |
required |
options |
dict
|
dict with Spark options. |
required |
ignore_options |
List
|
list of options to be ignored by the process. Spark read has two different approaches to parallelize reading process, one of them is using upper/lower bound, another one is using predicates, those process can't be executed at the same time, you must choose one of them. By choosing predicates you can't pass lower and upper bound, also can't pass number of partitions and partition column otherwise spark will interpret the execution partitioned by upper and lower bound and will expect to fill all variables. To avoid fill all predicates hardcoded at the acon, there is a feature that automatically generates all predicates for init or delta load based on input partition column, but at the end of the process, partition column can't be passed to the options, because we are choosing predicates execution, that is why to generate predicates we need to pass some options to ignore. |
None
|
Returns:
Type | Description |
---|---|
dict
|
a dict with all the options passed as argument, plus the options that |
dict
|
were initially provided, but were not used in the util |
dict
|
(get_spark_jdbc_options). |
Source code in mkdocs/lakehouse_engine/packages/utils/extraction/jdbc_extraction_utils.py
get_predicates(predicates_query)
¶
Get the predicates list, based on a predicates query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predicates_query |
str
|
query to use as the basis to get the distinct values for a specified column, based on which predicates are generated. |
required |
Returns:
Type | Description |
---|---|
List
|
List containing the predicates to use to split the extraction from |
List
|
JDBC sources. |
Source code in mkdocs/lakehouse_engine/packages/utils/extraction/jdbc_extraction_utils.py
get_spark_jdbc_optimal_upper_bound()
¶
Get an optimal upperBound to properly split a Spark JDBC extraction.
Returns:
Type | Description |
---|---|
Any
|
Either an int, date or timestamp to serve as upperBound Spark JDBC option. |
Source code in mkdocs/lakehouse_engine/packages/utils/extraction/jdbc_extraction_utils.py
get_spark_jdbc_options()
¶
Get the Spark options to extract data from a JDBC source.
Returns:
Type | Description |
---|---|
dict
|
The Spark jdbc args dictionary, including the query to submit |
dict
|
and also options args dictionary. |