lakehouse_engine.io.readers.jdbc_reader
Module to define behaviour to read from JDBC sources.
1"""Module to define behaviour to read from JDBC sources.""" 2 3from pyspark.sql import DataFrame 4 5from lakehouse_engine.core.definitions import InputFormat, InputSpec 6from lakehouse_engine.core.exec_env import ExecEnv 7from lakehouse_engine.io.reader import Reader 8from lakehouse_engine.transformers.exceptions import WrongArgumentsException 9from lakehouse_engine.utils.extraction.jdbc_extraction_utils import ( 10 JDBCExtraction, 11 JDBCExtractionUtils, 12) 13 14 15class JDBCReader(Reader): 16 """Class to read from JDBC source.""" 17 18 def __init__(self, input_spec: InputSpec): 19 """Construct JDBCReader instances. 20 21 Args: 22 input_spec: input specification. 23 """ 24 super().__init__(input_spec) 25 26 def read(self) -> DataFrame: 27 """Read data from JDBC source. 28 29 Returns: 30 A dataframe containing the data from the JDBC source. 31 """ 32 if ( 33 self._input_spec.options is not None 34 and self._input_spec.options.get("predicates", None) is not None 35 ): 36 raise WrongArgumentsException("Predicates can only be used with jdbc_args.") 37 38 options = self._input_spec.options if self._input_spec.options else {} 39 if self._input_spec.calculate_upper_bound: 40 jdbc_util = JDBCExtractionUtils( 41 JDBCExtraction( 42 user=options["user"], 43 password=options["password"], 44 url=options["url"], 45 dbtable=options["dbtable"], 46 extraction_type=options.get( 47 "extraction_type", JDBCExtraction.extraction_type 48 ), 49 partition_column=options["partitionColumn"], 50 calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema, 51 default_upper_bound=options.get( 52 "default_upper_bound", JDBCExtraction.default_upper_bound 53 ), 54 ) 55 ) # type: ignore 56 options["upperBound"] = jdbc_util.get_spark_jdbc_optimal_upper_bound() 57 58 if self._input_spec.jdbc_args: 59 return ExecEnv.SESSION.read.options(**options).jdbc( 60 **self._input_spec.jdbc_args 61 ) 62 else: 63 return ( 64 ExecEnv.SESSION.read.format(InputFormat.JDBC.value) 65 .options(**options) 66 .load() 67 )
16class JDBCReader(Reader): 17 """Class to read from JDBC source.""" 18 19 def __init__(self, input_spec: InputSpec): 20 """Construct JDBCReader instances. 21 22 Args: 23 input_spec: input specification. 24 """ 25 super().__init__(input_spec) 26 27 def read(self) -> DataFrame: 28 """Read data from JDBC source. 29 30 Returns: 31 A dataframe containing the data from the JDBC source. 32 """ 33 if ( 34 self._input_spec.options is not None 35 and self._input_spec.options.get("predicates", None) is not None 36 ): 37 raise WrongArgumentsException("Predicates can only be used with jdbc_args.") 38 39 options = self._input_spec.options if self._input_spec.options else {} 40 if self._input_spec.calculate_upper_bound: 41 jdbc_util = JDBCExtractionUtils( 42 JDBCExtraction( 43 user=options["user"], 44 password=options["password"], 45 url=options["url"], 46 dbtable=options["dbtable"], 47 extraction_type=options.get( 48 "extraction_type", JDBCExtraction.extraction_type 49 ), 50 partition_column=options["partitionColumn"], 51 calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema, 52 default_upper_bound=options.get( 53 "default_upper_bound", JDBCExtraction.default_upper_bound 54 ), 55 ) 56 ) # type: ignore 57 options["upperBound"] = jdbc_util.get_spark_jdbc_optimal_upper_bound() 58 59 if self._input_spec.jdbc_args: 60 return ExecEnv.SESSION.read.options(**options).jdbc( 61 **self._input_spec.jdbc_args 62 ) 63 else: 64 return ( 65 ExecEnv.SESSION.read.format(InputFormat.JDBC.value) 66 .options(**options) 67 .load() 68 )
Class to read from JDBC source.
JDBCReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
19 def __init__(self, input_spec: InputSpec): 20 """Construct JDBCReader instances. 21 22 Args: 23 input_spec: input specification. 24 """ 25 super().__init__(input_spec)
Construct JDBCReader instances.
Arguments:
- input_spec: input specification.
def
read(self) -> pyspark.sql.dataframe.DataFrame:
27 def read(self) -> DataFrame: 28 """Read data from JDBC source. 29 30 Returns: 31 A dataframe containing the data from the JDBC source. 32 """ 33 if ( 34 self._input_spec.options is not None 35 and self._input_spec.options.get("predicates", None) is not None 36 ): 37 raise WrongArgumentsException("Predicates can only be used with jdbc_args.") 38 39 options = self._input_spec.options if self._input_spec.options else {} 40 if self._input_spec.calculate_upper_bound: 41 jdbc_util = JDBCExtractionUtils( 42 JDBCExtraction( 43 user=options["user"], 44 password=options["password"], 45 url=options["url"], 46 dbtable=options["dbtable"], 47 extraction_type=options.get( 48 "extraction_type", JDBCExtraction.extraction_type 49 ), 50 partition_column=options["partitionColumn"], 51 calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema, 52 default_upper_bound=options.get( 53 "default_upper_bound", JDBCExtraction.default_upper_bound 54 ), 55 ) 56 ) # type: ignore 57 options["upperBound"] = jdbc_util.get_spark_jdbc_optimal_upper_bound() 58 59 if self._input_spec.jdbc_args: 60 return ExecEnv.SESSION.read.options(**options).jdbc( 61 **self._input_spec.jdbc_args 62 ) 63 else: 64 return ( 65 ExecEnv.SESSION.read.format(InputFormat.JDBC.value) 66 .options(**options) 67 .load() 68 )
Read data from JDBC source.
Returns:
A dataframe containing the data from the JDBC source.