lakehouse_engine.io.readers.sap_b4_reader

Module to define behaviour to read from SAP B4 sources.

  1"""Module to define behaviour to read from SAP B4 sources."""
  2
  3from logging import Logger
  4from typing import Tuple
  5
  6from pyspark.sql import DataFrame
  7
  8from lakehouse_engine.core.definitions import InputSpec
  9from lakehouse_engine.core.exec_env import ExecEnv
 10from lakehouse_engine.io.reader import Reader
 11from lakehouse_engine.utils.extraction.sap_b4_extraction_utils import (
 12    ADSOTypes,
 13    SAPB4Extraction,
 14    SAPB4ExtractionUtils,
 15)
 16from lakehouse_engine.utils.logging_handler import LoggingHandler
 17
 18
 19class SAPB4Reader(Reader):
 20    """Class to read from SAP B4 source."""
 21
 22    _LOGGER: Logger = LoggingHandler(__name__).get_logger()
 23
 24    def __init__(self, input_spec: InputSpec):
 25        """Construct SAPB4Reader instances.
 26
 27        Args:
 28            input_spec: input specification.
 29        """
 30        super().__init__(input_spec)
 31        self.jdbc_utils = self._get_jdbc_utils()
 32
 33    def read(self) -> DataFrame:
 34        """Read data from SAP B4 source.
 35
 36        Returns:
 37            A dataframe containing the data from the SAP B4 source.
 38        """
 39        options_args, jdbc_args = self._get_options()
 40        return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)
 41
 42    def _get_jdbc_utils(self) -> SAPB4ExtractionUtils:
 43        jdbc_extraction = SAPB4Extraction(
 44            user=self._input_spec.options["user"],
 45            password=self._input_spec.options["password"],
 46            url=self._input_spec.options["url"],
 47            dbtable=self._input_spec.options["dbtable"],
 48            adso_type=self._input_spec.options["adso_type"],
 49            request_status_tbl=self._input_spec.options.get(
 50                "request_status_tbl", SAPB4Extraction.request_status_tbl
 51            ),
 52            changelog_table=self._input_spec.options.get(
 53                "changelog_table",
 54                (
 55                    self._input_spec.options["dbtable"]
 56                    if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value
 57                    else self._input_spec.options["changelog_table"]
 58                ),
 59            ),
 60            data_target=SAPB4ExtractionUtils.get_data_target(self._input_spec.options),
 61            act_req_join_condition=self._input_spec.options.get(
 62                "act_req_join_condition", SAPB4Extraction.act_req_join_condition
 63            ),
 64            latest_timestamp_data_location=self._input_spec.options.get(
 65                "latest_timestamp_data_location",
 66                SAPB4Extraction.latest_timestamp_data_location,
 67            ),
 68            latest_timestamp_input_col=self._input_spec.options.get(
 69                "latest_timestamp_input_col",
 70                SAPB4Extraction.latest_timestamp_input_col,
 71            ),
 72            latest_timestamp_data_format=self._input_spec.options.get(
 73                "latest_timestamp_data_format",
 74                SAPB4Extraction.latest_timestamp_data_format,
 75            ),
 76            extraction_type=self._input_spec.options.get(
 77                "extraction_type", SAPB4Extraction.extraction_type
 78            ),
 79            driver=self._input_spec.options.get("driver", SAPB4Extraction.driver),
 80            num_partitions=self._input_spec.options.get(
 81                "numPartitions", SAPB4Extraction.num_partitions
 82            ),
 83            partition_column=self._input_spec.options.get(
 84                "partitionColumn", SAPB4Extraction.partition_column
 85            ),
 86            lower_bound=self._input_spec.options.get(
 87                "lowerBound", SAPB4Extraction.lower_bound
 88            ),
 89            upper_bound=self._input_spec.options.get(
 90                "upperBound", SAPB4Extraction.upper_bound
 91            ),
 92            default_upper_bound=self._input_spec.options.get(
 93                "default_upper_bound", SAPB4Extraction.default_upper_bound
 94            ),
 95            fetch_size=self._input_spec.options.get(
 96                "fetchSize", SAPB4Extraction.fetch_size
 97            ),
 98            compress=self._input_spec.options.get("compress", SAPB4Extraction.compress),
 99            custom_schema=self._input_spec.options.get(
100                "customSchema", SAPB4Extraction.custom_schema
101            ),
102            extraction_timestamp=self._input_spec.options.get(
103                "extraction_timestamp",
104                SAPB4Extraction.extraction_timestamp,
105            ),
106            min_timestamp=self._input_spec.options.get(
107                "min_timestamp", SAPB4Extraction.min_timestamp
108            ),
109            max_timestamp=self._input_spec.options.get(
110                "max_timestamp", SAPB4Extraction.max_timestamp
111            ),
112            default_max_timestamp=self._input_spec.options.get(
113                "default_max_timestamp", SAPB4Extraction.default_max_timestamp
114            ),
115            max_timestamp_custom_schema=self._input_spec.options.get(
116                "max_timestamp_custom_schema",
117                SAPB4Extraction.max_timestamp_custom_schema,
118            ),
119            generate_predicates=self._input_spec.generate_predicates,
120            predicates=self._input_spec.options.get(
121                "predicates", SAPB4Extraction.predicates
122            ),
123            predicates_add_null=self._input_spec.predicates_add_null,
124            extra_cols_req_status_tbl=self._input_spec.options.get(
125                "extra_cols_req_status_tbl", SAPB4Extraction.extra_cols_req_status_tbl
126            ),
127            calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema,
128            include_changelog_tech_cols=self._input_spec.options.get(
129                "include_changelog_tech_cols",
130                (
131                    False
132                    if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value
133                    else True
134                ),
135            ),
136        )
137        return SAPB4ExtractionUtils(jdbc_extraction)
138
139    def _get_options(self) -> Tuple[dict, dict]:
140        """Get Spark Options using JDBC utilities.
141
142        Returns:
143            A tuple dict containing the options args and
144            jdbc args to be passed to Spark.
145        """
146        self._LOGGER.info(
147            f"Initial options passed to the SAP B4 Reader: {self._input_spec.options}"
148        )
149
150        options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options()
151
152        if self._input_spec.generate_predicates or self._input_spec.options.get(
153            "predicates", None
154        ):
155            options_args.update(
156                self.jdbc_utils.get_additional_spark_options(
157                    self._input_spec,
158                    options_args,
159                    ["partitionColumn", "numPartitions", "lowerBound", "upperBound"],
160                )
161            )
162        else:
163            if self._input_spec.calculate_upper_bound:
164                options_args["upperBound"] = (
165                    self.jdbc_utils.get_spark_jdbc_optimal_upper_bound()
166                )
167
168            options_args.update(
169                self.jdbc_utils.get_additional_spark_options(
170                    self._input_spec, options_args
171                )
172            )
173
174        self._LOGGER.info(
175            f"Final options to fill SAP B4 Reader Options: {options_args}"
176        )
177        self._LOGGER.info(f"Final jdbc args to fill SAP B4 Reader JDBC: {jdbc_args}")
178        return options_args, jdbc_args
class SAPB4Reader(lakehouse_engine.io.reader.Reader):
 20class SAPB4Reader(Reader):
 21    """Class to read from SAP B4 source."""
 22
 23    _LOGGER: Logger = LoggingHandler(__name__).get_logger()
 24
 25    def __init__(self, input_spec: InputSpec):
 26        """Construct SAPB4Reader instances.
 27
 28        Args:
 29            input_spec: input specification.
 30        """
 31        super().__init__(input_spec)
 32        self.jdbc_utils = self._get_jdbc_utils()
 33
 34    def read(self) -> DataFrame:
 35        """Read data from SAP B4 source.
 36
 37        Returns:
 38            A dataframe containing the data from the SAP B4 source.
 39        """
 40        options_args, jdbc_args = self._get_options()
 41        return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)
 42
 43    def _get_jdbc_utils(self) -> SAPB4ExtractionUtils:
 44        jdbc_extraction = SAPB4Extraction(
 45            user=self._input_spec.options["user"],
 46            password=self._input_spec.options["password"],
 47            url=self._input_spec.options["url"],
 48            dbtable=self._input_spec.options["dbtable"],
 49            adso_type=self._input_spec.options["adso_type"],
 50            request_status_tbl=self._input_spec.options.get(
 51                "request_status_tbl", SAPB4Extraction.request_status_tbl
 52            ),
 53            changelog_table=self._input_spec.options.get(
 54                "changelog_table",
 55                (
 56                    self._input_spec.options["dbtable"]
 57                    if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value
 58                    else self._input_spec.options["changelog_table"]
 59                ),
 60            ),
 61            data_target=SAPB4ExtractionUtils.get_data_target(self._input_spec.options),
 62            act_req_join_condition=self._input_spec.options.get(
 63                "act_req_join_condition", SAPB4Extraction.act_req_join_condition
 64            ),
 65            latest_timestamp_data_location=self._input_spec.options.get(
 66                "latest_timestamp_data_location",
 67                SAPB4Extraction.latest_timestamp_data_location,
 68            ),
 69            latest_timestamp_input_col=self._input_spec.options.get(
 70                "latest_timestamp_input_col",
 71                SAPB4Extraction.latest_timestamp_input_col,
 72            ),
 73            latest_timestamp_data_format=self._input_spec.options.get(
 74                "latest_timestamp_data_format",
 75                SAPB4Extraction.latest_timestamp_data_format,
 76            ),
 77            extraction_type=self._input_spec.options.get(
 78                "extraction_type", SAPB4Extraction.extraction_type
 79            ),
 80            driver=self._input_spec.options.get("driver", SAPB4Extraction.driver),
 81            num_partitions=self._input_spec.options.get(
 82                "numPartitions", SAPB4Extraction.num_partitions
 83            ),
 84            partition_column=self._input_spec.options.get(
 85                "partitionColumn", SAPB4Extraction.partition_column
 86            ),
 87            lower_bound=self._input_spec.options.get(
 88                "lowerBound", SAPB4Extraction.lower_bound
 89            ),
 90            upper_bound=self._input_spec.options.get(
 91                "upperBound", SAPB4Extraction.upper_bound
 92            ),
 93            default_upper_bound=self._input_spec.options.get(
 94                "default_upper_bound", SAPB4Extraction.default_upper_bound
 95            ),
 96            fetch_size=self._input_spec.options.get(
 97                "fetchSize", SAPB4Extraction.fetch_size
 98            ),
 99            compress=self._input_spec.options.get("compress", SAPB4Extraction.compress),
100            custom_schema=self._input_spec.options.get(
101                "customSchema", SAPB4Extraction.custom_schema
102            ),
103            extraction_timestamp=self._input_spec.options.get(
104                "extraction_timestamp",
105                SAPB4Extraction.extraction_timestamp,
106            ),
107            min_timestamp=self._input_spec.options.get(
108                "min_timestamp", SAPB4Extraction.min_timestamp
109            ),
110            max_timestamp=self._input_spec.options.get(
111                "max_timestamp", SAPB4Extraction.max_timestamp
112            ),
113            default_max_timestamp=self._input_spec.options.get(
114                "default_max_timestamp", SAPB4Extraction.default_max_timestamp
115            ),
116            max_timestamp_custom_schema=self._input_spec.options.get(
117                "max_timestamp_custom_schema",
118                SAPB4Extraction.max_timestamp_custom_schema,
119            ),
120            generate_predicates=self._input_spec.generate_predicates,
121            predicates=self._input_spec.options.get(
122                "predicates", SAPB4Extraction.predicates
123            ),
124            predicates_add_null=self._input_spec.predicates_add_null,
125            extra_cols_req_status_tbl=self._input_spec.options.get(
126                "extra_cols_req_status_tbl", SAPB4Extraction.extra_cols_req_status_tbl
127            ),
128            calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema,
129            include_changelog_tech_cols=self._input_spec.options.get(
130                "include_changelog_tech_cols",
131                (
132                    False
133                    if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value
134                    else True
135                ),
136            ),
137        )
138        return SAPB4ExtractionUtils(jdbc_extraction)
139
140    def _get_options(self) -> Tuple[dict, dict]:
141        """Get Spark Options using JDBC utilities.
142
143        Returns:
144            A tuple dict containing the options args and
145            jdbc args to be passed to Spark.
146        """
147        self._LOGGER.info(
148            f"Initial options passed to the SAP B4 Reader: {self._input_spec.options}"
149        )
150
151        options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options()
152
153        if self._input_spec.generate_predicates or self._input_spec.options.get(
154            "predicates", None
155        ):
156            options_args.update(
157                self.jdbc_utils.get_additional_spark_options(
158                    self._input_spec,
159                    options_args,
160                    ["partitionColumn", "numPartitions", "lowerBound", "upperBound"],
161                )
162            )
163        else:
164            if self._input_spec.calculate_upper_bound:
165                options_args["upperBound"] = (
166                    self.jdbc_utils.get_spark_jdbc_optimal_upper_bound()
167                )
168
169            options_args.update(
170                self.jdbc_utils.get_additional_spark_options(
171                    self._input_spec, options_args
172                )
173            )
174
175        self._LOGGER.info(
176            f"Final options to fill SAP B4 Reader Options: {options_args}"
177        )
178        self._LOGGER.info(f"Final jdbc args to fill SAP B4 Reader JDBC: {jdbc_args}")
179        return options_args, jdbc_args

Class to read from SAP B4 source.

SAPB4Reader(input_spec: lakehouse_engine.core.definitions.InputSpec)
25    def __init__(self, input_spec: InputSpec):
26        """Construct SAPB4Reader instances.
27
28        Args:
29            input_spec: input specification.
30        """
31        super().__init__(input_spec)
32        self.jdbc_utils = self._get_jdbc_utils()

Construct SAPB4Reader instances.

Arguments:
  • input_spec: input specification.
jdbc_utils
def read(self) -> pyspark.sql.dataframe.DataFrame:
34    def read(self) -> DataFrame:
35        """Read data from SAP B4 source.
36
37        Returns:
38            A dataframe containing the data from the SAP B4 source.
39        """
40        options_args, jdbc_args = self._get_options()
41        return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)

Read data from SAP B4 source.

Returns:

A dataframe containing the data from the SAP B4 source.