lakehouse_engine.io.readers.sap_bw_reader

Module to define behaviour to read from SAP BW sources.

  1"""Module to define behaviour to read from SAP BW sources."""
  2
  3from logging import Logger
  4from typing import Tuple
  5
  6from pyspark.sql import DataFrame
  7
  8from lakehouse_engine.core.definitions import InputSpec
  9from lakehouse_engine.core.exec_env import ExecEnv
 10from lakehouse_engine.io.reader import Reader
 11from lakehouse_engine.utils.extraction.sap_bw_extraction_utils import (
 12    SAPBWExtraction,
 13    SAPBWExtractionUtils,
 14)
 15from lakehouse_engine.utils.logging_handler import LoggingHandler
 16
 17
 18class SAPBWReader(Reader):
 19    """Class to read from SAP BW source."""
 20
 21    _LOGGER: Logger = LoggingHandler(__name__).get_logger()
 22
 23    def __init__(self, input_spec: InputSpec):
 24        """Construct SAPBWReader instances.
 25
 26        Args:
 27            input_spec: input specification.
 28        """
 29        super().__init__(input_spec)
 30        self.jdbc_utils = self._get_jdbc_utils()
 31
 32    def read(self) -> DataFrame:
 33        """Read data from SAP BW source.
 34
 35        Returns:
 36            A dataframe containing the data from the SAP BW source.
 37        """
 38        options_args, jdbc_args = self._get_options()
 39        return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)
 40
 41    def _get_jdbc_utils(self) -> SAPBWExtractionUtils:
 42        jdbc_extraction = SAPBWExtraction(
 43            user=self._input_spec.options["user"],
 44            password=self._input_spec.options["password"],
 45            url=self._input_spec.options["url"],
 46            dbtable=self._input_spec.options["dbtable"],
 47            latest_timestamp_data_location=self._input_spec.options.get(
 48                "latest_timestamp_data_location",
 49                SAPBWExtraction.latest_timestamp_data_location,
 50            ),
 51            latest_timestamp_input_col=self._input_spec.options.get(
 52                "latest_timestamp_input_col", SAPBWExtraction.latest_timestamp_input_col
 53            ),
 54            latest_timestamp_data_format=self._input_spec.options.get(
 55                "latest_timestamp_data_format",
 56                SAPBWExtraction.latest_timestamp_data_format,
 57            ),
 58            extraction_type=self._input_spec.options.get(
 59                "extraction_type", SAPBWExtraction.extraction_type
 60            ),
 61            act_request_table=self._input_spec.options.get(
 62                "act_request_table", SAPBWExtraction.act_request_table
 63            ),
 64            request_col_name=self._input_spec.options.get(
 65                "request_col_name", SAPBWExtraction.request_col_name
 66            ),
 67            act_req_join_condition=self._input_spec.options.get(
 68                "act_req_join_condition", SAPBWExtraction.act_req_join_condition
 69            ),
 70            driver=self._input_spec.options.get("driver", SAPBWExtraction.driver),
 71            changelog_table=self._input_spec.options.get(
 72                "changelog_table", SAPBWExtraction.changelog_table
 73            ),
 74            num_partitions=self._input_spec.options.get(
 75                "numPartitions", SAPBWExtraction.num_partitions
 76            ),
 77            partition_column=self._input_spec.options.get(
 78                "partitionColumn", SAPBWExtraction.partition_column
 79            ),
 80            lower_bound=self._input_spec.options.get(
 81                "lowerBound", SAPBWExtraction.lower_bound
 82            ),
 83            upper_bound=self._input_spec.options.get(
 84                "upperBound", SAPBWExtraction.upper_bound
 85            ),
 86            default_upper_bound=self._input_spec.options.get(
 87                "default_upper_bound", SAPBWExtraction.default_upper_bound
 88            ),
 89            fetch_size=self._input_spec.options.get(
 90                "fetchSize", SAPBWExtraction.fetch_size
 91            ),
 92            compress=self._input_spec.options.get("compress", SAPBWExtraction.compress),
 93            custom_schema=self._input_spec.options.get(
 94                "customSchema", SAPBWExtraction.custom_schema
 95            ),
 96            extraction_timestamp=self._input_spec.options.get(
 97                "extraction_timestamp",
 98                SAPBWExtraction.extraction_timestamp,
 99            ),
100            odsobject=self._input_spec.options.get(
101                "odsobject",
102                SAPBWExtractionUtils.get_odsobject(self._input_spec.options),
103            ),
104            min_timestamp=self._input_spec.options.get(
105                "min_timestamp", SAPBWExtraction.min_timestamp
106            ),
107            max_timestamp=self._input_spec.options.get(
108                "max_timestamp", SAPBWExtraction.max_timestamp
109            ),
110            default_max_timestamp=self._input_spec.options.get(
111                "default_max_timestamp", SAPBWExtraction.default_max_timestamp
112            ),
113            max_timestamp_custom_schema=self._input_spec.options.get(
114                "max_timestamp_custom_schema",
115                SAPBWExtraction.max_timestamp_custom_schema,
116            ),
117            include_changelog_tech_cols=self._input_spec.options.get(
118                "include_changelog_tech_cols",
119                SAPBWExtraction.include_changelog_tech_cols,
120            ),
121            generate_predicates=self._input_spec.generate_predicates,
122            predicates=self._input_spec.options.get(
123                "predicates", SAPBWExtraction.predicates
124            ),
125            predicates_add_null=self._input_spec.predicates_add_null,
126            extra_cols_act_request=self._input_spec.options.get(
127                "extra_cols_act_request", SAPBWExtraction.extra_cols_act_request
128            ),
129            get_timestamp_from_act_request=self._input_spec.options.get(
130                "get_timestamp_from_act_request",
131                SAPBWExtraction.get_timestamp_from_act_request,
132            ),
133            calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema,
134        )
135        return SAPBWExtractionUtils(jdbc_extraction)
136
137    def _get_options(self) -> Tuple[dict, dict]:
138        """Get Spark Options using JDBC utilities.
139
140        Returns:
141            A tuple dict containing the options args and
142            jdbc args to be passed to Spark.
143        """
144        self._LOGGER.info(
145            f"Initial options passed to the SAP BW Reader: {self._input_spec.options}"
146        )
147
148        options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options()
149
150        if self._input_spec.generate_predicates or self._input_spec.options.get(
151            "predicates", None
152        ):
153            options_args.update(
154                self.jdbc_utils.get_additional_spark_options(
155                    self._input_spec,
156                    options_args,
157                    ["partitionColumn", "numPartitions", "lowerBound", "upperBound"],
158                )
159            )
160        else:
161            if self._input_spec.calculate_upper_bound:
162                options_args["upperBound"] = (
163                    self.jdbc_utils.get_spark_jdbc_optimal_upper_bound()
164                )
165
166            options_args.update(
167                self.jdbc_utils.get_additional_spark_options(
168                    self._input_spec, options_args
169                )
170            )
171
172        self._LOGGER.info(
173            f"Final options to fill SAP BW Reader Options: {options_args}"
174        )
175        self._LOGGER.info(f"Final jdbc args to fill SAP BW Reader JDBC: {jdbc_args}")
176        return options_args, jdbc_args
class SAPBWReader(lakehouse_engine.io.reader.Reader):
 19class SAPBWReader(Reader):
 20    """Class to read from SAP BW source."""
 21
 22    _LOGGER: Logger = LoggingHandler(__name__).get_logger()
 23
 24    def __init__(self, input_spec: InputSpec):
 25        """Construct SAPBWReader instances.
 26
 27        Args:
 28            input_spec: input specification.
 29        """
 30        super().__init__(input_spec)
 31        self.jdbc_utils = self._get_jdbc_utils()
 32
 33    def read(self) -> DataFrame:
 34        """Read data from SAP BW source.
 35
 36        Returns:
 37            A dataframe containing the data from the SAP BW source.
 38        """
 39        options_args, jdbc_args = self._get_options()
 40        return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)
 41
 42    def _get_jdbc_utils(self) -> SAPBWExtractionUtils:
 43        jdbc_extraction = SAPBWExtraction(
 44            user=self._input_spec.options["user"],
 45            password=self._input_spec.options["password"],
 46            url=self._input_spec.options["url"],
 47            dbtable=self._input_spec.options["dbtable"],
 48            latest_timestamp_data_location=self._input_spec.options.get(
 49                "latest_timestamp_data_location",
 50                SAPBWExtraction.latest_timestamp_data_location,
 51            ),
 52            latest_timestamp_input_col=self._input_spec.options.get(
 53                "latest_timestamp_input_col", SAPBWExtraction.latest_timestamp_input_col
 54            ),
 55            latest_timestamp_data_format=self._input_spec.options.get(
 56                "latest_timestamp_data_format",
 57                SAPBWExtraction.latest_timestamp_data_format,
 58            ),
 59            extraction_type=self._input_spec.options.get(
 60                "extraction_type", SAPBWExtraction.extraction_type
 61            ),
 62            act_request_table=self._input_spec.options.get(
 63                "act_request_table", SAPBWExtraction.act_request_table
 64            ),
 65            request_col_name=self._input_spec.options.get(
 66                "request_col_name", SAPBWExtraction.request_col_name
 67            ),
 68            act_req_join_condition=self._input_spec.options.get(
 69                "act_req_join_condition", SAPBWExtraction.act_req_join_condition
 70            ),
 71            driver=self._input_spec.options.get("driver", SAPBWExtraction.driver),
 72            changelog_table=self._input_spec.options.get(
 73                "changelog_table", SAPBWExtraction.changelog_table
 74            ),
 75            num_partitions=self._input_spec.options.get(
 76                "numPartitions", SAPBWExtraction.num_partitions
 77            ),
 78            partition_column=self._input_spec.options.get(
 79                "partitionColumn", SAPBWExtraction.partition_column
 80            ),
 81            lower_bound=self._input_spec.options.get(
 82                "lowerBound", SAPBWExtraction.lower_bound
 83            ),
 84            upper_bound=self._input_spec.options.get(
 85                "upperBound", SAPBWExtraction.upper_bound
 86            ),
 87            default_upper_bound=self._input_spec.options.get(
 88                "default_upper_bound", SAPBWExtraction.default_upper_bound
 89            ),
 90            fetch_size=self._input_spec.options.get(
 91                "fetchSize", SAPBWExtraction.fetch_size
 92            ),
 93            compress=self._input_spec.options.get("compress", SAPBWExtraction.compress),
 94            custom_schema=self._input_spec.options.get(
 95                "customSchema", SAPBWExtraction.custom_schema
 96            ),
 97            extraction_timestamp=self._input_spec.options.get(
 98                "extraction_timestamp",
 99                SAPBWExtraction.extraction_timestamp,
100            ),
101            odsobject=self._input_spec.options.get(
102                "odsobject",
103                SAPBWExtractionUtils.get_odsobject(self._input_spec.options),
104            ),
105            min_timestamp=self._input_spec.options.get(
106                "min_timestamp", SAPBWExtraction.min_timestamp
107            ),
108            max_timestamp=self._input_spec.options.get(
109                "max_timestamp", SAPBWExtraction.max_timestamp
110            ),
111            default_max_timestamp=self._input_spec.options.get(
112                "default_max_timestamp", SAPBWExtraction.default_max_timestamp
113            ),
114            max_timestamp_custom_schema=self._input_spec.options.get(
115                "max_timestamp_custom_schema",
116                SAPBWExtraction.max_timestamp_custom_schema,
117            ),
118            include_changelog_tech_cols=self._input_spec.options.get(
119                "include_changelog_tech_cols",
120                SAPBWExtraction.include_changelog_tech_cols,
121            ),
122            generate_predicates=self._input_spec.generate_predicates,
123            predicates=self._input_spec.options.get(
124                "predicates", SAPBWExtraction.predicates
125            ),
126            predicates_add_null=self._input_spec.predicates_add_null,
127            extra_cols_act_request=self._input_spec.options.get(
128                "extra_cols_act_request", SAPBWExtraction.extra_cols_act_request
129            ),
130            get_timestamp_from_act_request=self._input_spec.options.get(
131                "get_timestamp_from_act_request",
132                SAPBWExtraction.get_timestamp_from_act_request,
133            ),
134            calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema,
135        )
136        return SAPBWExtractionUtils(jdbc_extraction)
137
138    def _get_options(self) -> Tuple[dict, dict]:
139        """Get Spark Options using JDBC utilities.
140
141        Returns:
142            A tuple dict containing the options args and
143            jdbc args to be passed to Spark.
144        """
145        self._LOGGER.info(
146            f"Initial options passed to the SAP BW Reader: {self._input_spec.options}"
147        )
148
149        options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options()
150
151        if self._input_spec.generate_predicates or self._input_spec.options.get(
152            "predicates", None
153        ):
154            options_args.update(
155                self.jdbc_utils.get_additional_spark_options(
156                    self._input_spec,
157                    options_args,
158                    ["partitionColumn", "numPartitions", "lowerBound", "upperBound"],
159                )
160            )
161        else:
162            if self._input_spec.calculate_upper_bound:
163                options_args["upperBound"] = (
164                    self.jdbc_utils.get_spark_jdbc_optimal_upper_bound()
165                )
166
167            options_args.update(
168                self.jdbc_utils.get_additional_spark_options(
169                    self._input_spec, options_args
170                )
171            )
172
173        self._LOGGER.info(
174            f"Final options to fill SAP BW Reader Options: {options_args}"
175        )
176        self._LOGGER.info(f"Final jdbc args to fill SAP BW Reader JDBC: {jdbc_args}")
177        return options_args, jdbc_args

Class to read from SAP BW source.

SAPBWReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
24    def __init__(self, input_spec: InputSpec):
25        """Construct SAPBWReader instances.
26
27        Args:
28            input_spec: input specification.
29        """
30        super().__init__(input_spec)
31        self.jdbc_utils = self._get_jdbc_utils()

Construct SAPBWReader instances.

Arguments:
  • input_spec: input specification.
jdbc_utils
def read(self) -> pyspark.sql.dataframe.DataFrame:
33    def read(self) -> DataFrame:
34        """Read data from SAP BW source.
35
36        Returns:
37            A dataframe containing the data from the SAP BW source.
38        """
39        options_args, jdbc_args = self._get_options()
40        return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)

Read data from SAP BW source.

Returns:

A dataframe containing the data from the SAP BW source.