lakehouse_engine.io.readers.jdbc_reader

Module to define behaviour to read from JDBC sources.

 1"""Module to define behaviour to read from JDBC sources."""
 2
 3from pyspark.sql import DataFrame
 4
 5from lakehouse_engine.core.definitions import InputFormat, InputSpec
 6from lakehouse_engine.core.exec_env import ExecEnv
 7from lakehouse_engine.io.reader import Reader
 8from lakehouse_engine.transformers.exceptions import WrongArgumentsException
 9from lakehouse_engine.utils.extraction.jdbc_extraction_utils import (
10    JDBCExtraction,
11    JDBCExtractionUtils,
12)
13
14
15class JDBCReader(Reader):
16    """Class to read from JDBC source."""
17
18    def __init__(self, input_spec: InputSpec):
19        """Construct JDBCReader instances.
20
21        Args:
22            input_spec: input specification.
23        """
24        super().__init__(input_spec)
25
26    def read(self) -> DataFrame:
27        """Read data from JDBC source.
28
29        Returns:
30            A dataframe containing the data from the JDBC source.
31        """
32        if (
33            self._input_spec.options is not None
34            and self._input_spec.options.get("predicates", None) is not None
35        ):
36            raise WrongArgumentsException("Predicates can only be used with jdbc_args.")
37
38        options = self._input_spec.options if self._input_spec.options else {}
39        if self._input_spec.calculate_upper_bound:
40            jdbc_util = JDBCExtractionUtils(
41                JDBCExtraction(
42                    user=options["user"],
43                    password=options["password"],
44                    url=options["url"],
45                    dbtable=options["dbtable"],
46                    extraction_type=options.get(
47                        "extraction_type", JDBCExtraction.extraction_type
48                    ),
49                    partition_column=options["partitionColumn"],
50                    calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema,
51                    default_upper_bound=options.get(
52                        "default_upper_bound", JDBCExtraction.default_upper_bound
53                    ),
54                )
55            )  # type: ignore
56            options["upperBound"] = jdbc_util.get_spark_jdbc_optimal_upper_bound()
57
58        if self._input_spec.jdbc_args:
59            return ExecEnv.SESSION.read.options(**options).jdbc(
60                **self._input_spec.jdbc_args
61            )
62        else:
63            return (
64                ExecEnv.SESSION.read.format(InputFormat.JDBC.value)
65                .options(**options)
66                .load()
67            )
class JDBCReader(lakehouse_engine.io.reader.Reader):
16class JDBCReader(Reader):
17    """Class to read from JDBC source."""
18
19    def __init__(self, input_spec: InputSpec):
20        """Construct JDBCReader instances.
21
22        Args:
23            input_spec: input specification.
24        """
25        super().__init__(input_spec)
26
27    def read(self) -> DataFrame:
28        """Read data from JDBC source.
29
30        Returns:
31            A dataframe containing the data from the JDBC source.
32        """
33        if (
34            self._input_spec.options is not None
35            and self._input_spec.options.get("predicates", None) is not None
36        ):
37            raise WrongArgumentsException("Predicates can only be used with jdbc_args.")
38
39        options = self._input_spec.options if self._input_spec.options else {}
40        if self._input_spec.calculate_upper_bound:
41            jdbc_util = JDBCExtractionUtils(
42                JDBCExtraction(
43                    user=options["user"],
44                    password=options["password"],
45                    url=options["url"],
46                    dbtable=options["dbtable"],
47                    extraction_type=options.get(
48                        "extraction_type", JDBCExtraction.extraction_type
49                    ),
50                    partition_column=options["partitionColumn"],
51                    calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema,
52                    default_upper_bound=options.get(
53                        "default_upper_bound", JDBCExtraction.default_upper_bound
54                    ),
55                )
56            )  # type: ignore
57            options["upperBound"] = jdbc_util.get_spark_jdbc_optimal_upper_bound()
58
59        if self._input_spec.jdbc_args:
60            return ExecEnv.SESSION.read.options(**options).jdbc(
61                **self._input_spec.jdbc_args
62            )
63        else:
64            return (
65                ExecEnv.SESSION.read.format(InputFormat.JDBC.value)
66                .options(**options)
67                .load()
68            )

Class to read from JDBC source.

JDBCReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
19    def __init__(self, input_spec: InputSpec):
20        """Construct JDBCReader instances.
21
22        Args:
23            input_spec: input specification.
24        """
25        super().__init__(input_spec)

Construct JDBCReader instances.

Arguments:
  • input_spec: input specification.
def read(self) -> pyspark.sql.dataframe.DataFrame:
27    def read(self) -> DataFrame:
28        """Read data from JDBC source.
29
30        Returns:
31            A dataframe containing the data from the JDBC source.
32        """
33        if (
34            self._input_spec.options is not None
35            and self._input_spec.options.get("predicates", None) is not None
36        ):
37            raise WrongArgumentsException("Predicates can only be used with jdbc_args.")
38
39        options = self._input_spec.options if self._input_spec.options else {}
40        if self._input_spec.calculate_upper_bound:
41            jdbc_util = JDBCExtractionUtils(
42                JDBCExtraction(
43                    user=options["user"],
44                    password=options["password"],
45                    url=options["url"],
46                    dbtable=options["dbtable"],
47                    extraction_type=options.get(
48                        "extraction_type", JDBCExtraction.extraction_type
49                    ),
50                    partition_column=options["partitionColumn"],
51                    calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema,
52                    default_upper_bound=options.get(
53                        "default_upper_bound", JDBCExtraction.default_upper_bound
54                    ),
55                )
56            )  # type: ignore
57            options["upperBound"] = jdbc_util.get_spark_jdbc_optimal_upper_bound()
58
59        if self._input_spec.jdbc_args:
60            return ExecEnv.SESSION.read.options(**options).jdbc(
61                **self._input_spec.jdbc_args
62            )
63        else:
64            return (
65                ExecEnv.SESSION.read.format(InputFormat.JDBC.value)
66                .options(**options)
67                .load()
68            )

Read data from JDBC source.

Returns:

A dataframe containing the data from the JDBC source.