lakehouse_engine.io.readers.sap_bw_reader
Module to define behaviour to read from SAP BW sources.
1"""Module to define behaviour to read from SAP BW sources.""" 2 3from logging import Logger 4from typing import Tuple 5 6from pyspark.sql import DataFrame 7 8from lakehouse_engine.core.definitions import InputSpec 9from lakehouse_engine.core.exec_env import ExecEnv 10from lakehouse_engine.io.reader import Reader 11from lakehouse_engine.utils.extraction.sap_bw_extraction_utils import ( 12 SAPBWExtraction, 13 SAPBWExtractionUtils, 14) 15from lakehouse_engine.utils.logging_handler import LoggingHandler 16 17 18class SAPBWReader(Reader): 19 """Class to read from SAP BW source.""" 20 21 _LOGGER: Logger = LoggingHandler(__name__).get_logger() 22 23 def __init__(self, input_spec: InputSpec): 24 """Construct SAPBWReader instances. 25 26 Args: 27 input_spec: input specification. 28 """ 29 super().__init__(input_spec) 30 self.jdbc_utils = self._get_jdbc_utils() 31 32 def read(self) -> DataFrame: 33 """Read data from SAP BW source. 34 35 Returns: 36 A dataframe containing the data from the SAP BW source. 37 """ 38 options_args, jdbc_args = self._get_options() 39 return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args) 40 41 def _get_jdbc_utils(self) -> SAPBWExtractionUtils: 42 jdbc_extraction = SAPBWExtraction( 43 user=self._input_spec.options["user"], 44 password=self._input_spec.options["password"], 45 url=self._input_spec.options["url"], 46 dbtable=self._input_spec.options["dbtable"], 47 latest_timestamp_data_location=self._input_spec.options.get( 48 "latest_timestamp_data_location", 49 SAPBWExtraction.latest_timestamp_data_location, 50 ), 51 latest_timestamp_input_col=self._input_spec.options.get( 52 "latest_timestamp_input_col", SAPBWExtraction.latest_timestamp_input_col 53 ), 54 latest_timestamp_data_format=self._input_spec.options.get( 55 "latest_timestamp_data_format", 56 SAPBWExtraction.latest_timestamp_data_format, 57 ), 58 extraction_type=self._input_spec.options.get( 59 "extraction_type", SAPBWExtraction.extraction_type 60 ), 61 act_request_table=self._input_spec.options.get( 62 "act_request_table", SAPBWExtraction.act_request_table 63 ), 64 request_col_name=self._input_spec.options.get( 65 "request_col_name", SAPBWExtraction.request_col_name 66 ), 67 act_req_join_condition=self._input_spec.options.get( 68 "act_req_join_condition", SAPBWExtraction.act_req_join_condition 69 ), 70 driver=self._input_spec.options.get("driver", SAPBWExtraction.driver), 71 changelog_table=self._input_spec.options.get( 72 "changelog_table", SAPBWExtraction.changelog_table 73 ), 74 num_partitions=self._input_spec.options.get( 75 "numPartitions", SAPBWExtraction.num_partitions 76 ), 77 partition_column=self._input_spec.options.get( 78 "partitionColumn", SAPBWExtraction.partition_column 79 ), 80 lower_bound=self._input_spec.options.get( 81 "lowerBound", SAPBWExtraction.lower_bound 82 ), 83 upper_bound=self._input_spec.options.get( 84 "upperBound", SAPBWExtraction.upper_bound 85 ), 86 default_upper_bound=self._input_spec.options.get( 87 "default_upper_bound", SAPBWExtraction.default_upper_bound 88 ), 89 fetch_size=self._input_spec.options.get( 90 "fetchSize", SAPBWExtraction.fetch_size 91 ), 92 compress=self._input_spec.options.get("compress", SAPBWExtraction.compress), 93 custom_schema=self._input_spec.options.get( 94 "customSchema", SAPBWExtraction.custom_schema 95 ), 96 extraction_timestamp=self._input_spec.options.get( 97 "extraction_timestamp", 98 SAPBWExtraction.extraction_timestamp, 99 ), 100 odsobject=self._input_spec.options.get( 101 "odsobject", 102 SAPBWExtractionUtils.get_odsobject(self._input_spec.options), 103 ), 104 min_timestamp=self._input_spec.options.get( 105 "min_timestamp", SAPBWExtraction.min_timestamp 106 ), 107 max_timestamp=self._input_spec.options.get( 108 "max_timestamp", SAPBWExtraction.max_timestamp 109 ), 110 default_max_timestamp=self._input_spec.options.get( 111 "default_max_timestamp", SAPBWExtraction.default_max_timestamp 112 ), 113 max_timestamp_custom_schema=self._input_spec.options.get( 114 "max_timestamp_custom_schema", 115 SAPBWExtraction.max_timestamp_custom_schema, 116 ), 117 include_changelog_tech_cols=self._input_spec.options.get( 118 "include_changelog_tech_cols", 119 SAPBWExtraction.include_changelog_tech_cols, 120 ), 121 generate_predicates=self._input_spec.generate_predicates, 122 predicates=self._input_spec.options.get( 123 "predicates", SAPBWExtraction.predicates 124 ), 125 predicates_add_null=self._input_spec.predicates_add_null, 126 extra_cols_act_request=self._input_spec.options.get( 127 "extra_cols_act_request", SAPBWExtraction.extra_cols_act_request 128 ), 129 get_timestamp_from_act_request=self._input_spec.options.get( 130 "get_timestamp_from_act_request", 131 SAPBWExtraction.get_timestamp_from_act_request, 132 ), 133 calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema, 134 ) 135 return SAPBWExtractionUtils(jdbc_extraction) 136 137 def _get_options(self) -> Tuple[dict, dict]: 138 """Get Spark Options using JDBC utilities. 139 140 Returns: 141 A tuple dict containing the options args and 142 jdbc args to be passed to Spark. 143 """ 144 self._LOGGER.info( 145 f"Initial options passed to the SAP BW Reader: {self._input_spec.options}" 146 ) 147 148 options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options() 149 150 if self._input_spec.generate_predicates or self._input_spec.options.get( 151 "predicates", None 152 ): 153 options_args.update( 154 self.jdbc_utils.get_additional_spark_options( 155 self._input_spec, 156 options_args, 157 ["partitionColumn", "numPartitions", "lowerBound", "upperBound"], 158 ) 159 ) 160 else: 161 if self._input_spec.calculate_upper_bound: 162 options_args["upperBound"] = ( 163 self.jdbc_utils.get_spark_jdbc_optimal_upper_bound() 164 ) 165 166 options_args.update( 167 self.jdbc_utils.get_additional_spark_options( 168 self._input_spec, options_args 169 ) 170 ) 171 172 self._LOGGER.info( 173 f"Final options to fill SAP BW Reader Options: {options_args}" 174 ) 175 self._LOGGER.info(f"Final jdbc args to fill SAP BW Reader JDBC: {jdbc_args}") 176 return options_args, jdbc_args
19class SAPBWReader(Reader): 20 """Class to read from SAP BW source.""" 21 22 _LOGGER: Logger = LoggingHandler(__name__).get_logger() 23 24 def __init__(self, input_spec: InputSpec): 25 """Construct SAPBWReader instances. 26 27 Args: 28 input_spec: input specification. 29 """ 30 super().__init__(input_spec) 31 self.jdbc_utils = self._get_jdbc_utils() 32 33 def read(self) -> DataFrame: 34 """Read data from SAP BW source. 35 36 Returns: 37 A dataframe containing the data from the SAP BW source. 38 """ 39 options_args, jdbc_args = self._get_options() 40 return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args) 41 42 def _get_jdbc_utils(self) -> SAPBWExtractionUtils: 43 jdbc_extraction = SAPBWExtraction( 44 user=self._input_spec.options["user"], 45 password=self._input_spec.options["password"], 46 url=self._input_spec.options["url"], 47 dbtable=self._input_spec.options["dbtable"], 48 latest_timestamp_data_location=self._input_spec.options.get( 49 "latest_timestamp_data_location", 50 SAPBWExtraction.latest_timestamp_data_location, 51 ), 52 latest_timestamp_input_col=self._input_spec.options.get( 53 "latest_timestamp_input_col", SAPBWExtraction.latest_timestamp_input_col 54 ), 55 latest_timestamp_data_format=self._input_spec.options.get( 56 "latest_timestamp_data_format", 57 SAPBWExtraction.latest_timestamp_data_format, 58 ), 59 extraction_type=self._input_spec.options.get( 60 "extraction_type", SAPBWExtraction.extraction_type 61 ), 62 act_request_table=self._input_spec.options.get( 63 "act_request_table", SAPBWExtraction.act_request_table 64 ), 65 request_col_name=self._input_spec.options.get( 66 "request_col_name", SAPBWExtraction.request_col_name 67 ), 68 act_req_join_condition=self._input_spec.options.get( 69 "act_req_join_condition", SAPBWExtraction.act_req_join_condition 70 ), 71 driver=self._input_spec.options.get("driver", SAPBWExtraction.driver), 72 changelog_table=self._input_spec.options.get( 73 "changelog_table", SAPBWExtraction.changelog_table 74 ), 75 num_partitions=self._input_spec.options.get( 76 "numPartitions", SAPBWExtraction.num_partitions 77 ), 78 partition_column=self._input_spec.options.get( 79 "partitionColumn", SAPBWExtraction.partition_column 80 ), 81 lower_bound=self._input_spec.options.get( 82 "lowerBound", SAPBWExtraction.lower_bound 83 ), 84 upper_bound=self._input_spec.options.get( 85 "upperBound", SAPBWExtraction.upper_bound 86 ), 87 default_upper_bound=self._input_spec.options.get( 88 "default_upper_bound", SAPBWExtraction.default_upper_bound 89 ), 90 fetch_size=self._input_spec.options.get( 91 "fetchSize", SAPBWExtraction.fetch_size 92 ), 93 compress=self._input_spec.options.get("compress", SAPBWExtraction.compress), 94 custom_schema=self._input_spec.options.get( 95 "customSchema", SAPBWExtraction.custom_schema 96 ), 97 extraction_timestamp=self._input_spec.options.get( 98 "extraction_timestamp", 99 SAPBWExtraction.extraction_timestamp, 100 ), 101 odsobject=self._input_spec.options.get( 102 "odsobject", 103 SAPBWExtractionUtils.get_odsobject(self._input_spec.options), 104 ), 105 min_timestamp=self._input_spec.options.get( 106 "min_timestamp", SAPBWExtraction.min_timestamp 107 ), 108 max_timestamp=self._input_spec.options.get( 109 "max_timestamp", SAPBWExtraction.max_timestamp 110 ), 111 default_max_timestamp=self._input_spec.options.get( 112 "default_max_timestamp", SAPBWExtraction.default_max_timestamp 113 ), 114 max_timestamp_custom_schema=self._input_spec.options.get( 115 "max_timestamp_custom_schema", 116 SAPBWExtraction.max_timestamp_custom_schema, 117 ), 118 include_changelog_tech_cols=self._input_spec.options.get( 119 "include_changelog_tech_cols", 120 SAPBWExtraction.include_changelog_tech_cols, 121 ), 122 generate_predicates=self._input_spec.generate_predicates, 123 predicates=self._input_spec.options.get( 124 "predicates", SAPBWExtraction.predicates 125 ), 126 predicates_add_null=self._input_spec.predicates_add_null, 127 extra_cols_act_request=self._input_spec.options.get( 128 "extra_cols_act_request", SAPBWExtraction.extra_cols_act_request 129 ), 130 get_timestamp_from_act_request=self._input_spec.options.get( 131 "get_timestamp_from_act_request", 132 SAPBWExtraction.get_timestamp_from_act_request, 133 ), 134 calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema, 135 ) 136 return SAPBWExtractionUtils(jdbc_extraction) 137 138 def _get_options(self) -> Tuple[dict, dict]: 139 """Get Spark Options using JDBC utilities. 140 141 Returns: 142 A tuple dict containing the options args and 143 jdbc args to be passed to Spark. 144 """ 145 self._LOGGER.info( 146 f"Initial options passed to the SAP BW Reader: {self._input_spec.options}" 147 ) 148 149 options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options() 150 151 if self._input_spec.generate_predicates or self._input_spec.options.get( 152 "predicates", None 153 ): 154 options_args.update( 155 self.jdbc_utils.get_additional_spark_options( 156 self._input_spec, 157 options_args, 158 ["partitionColumn", "numPartitions", "lowerBound", "upperBound"], 159 ) 160 ) 161 else: 162 if self._input_spec.calculate_upper_bound: 163 options_args["upperBound"] = ( 164 self.jdbc_utils.get_spark_jdbc_optimal_upper_bound() 165 ) 166 167 options_args.update( 168 self.jdbc_utils.get_additional_spark_options( 169 self._input_spec, options_args 170 ) 171 ) 172 173 self._LOGGER.info( 174 f"Final options to fill SAP BW Reader Options: {options_args}" 175 ) 176 self._LOGGER.info(f"Final jdbc args to fill SAP BW Reader JDBC: {jdbc_args}") 177 return options_args, jdbc_args
Class to read from SAP BW source.
SAPBWReader(input_spec: lakehouse_engine.core.definitions.InputSpec)
24 def __init__(self, input_spec: InputSpec): 25 """Construct SAPBWReader instances. 26 27 Args: 28 input_spec: input specification. 29 """ 30 super().__init__(input_spec) 31 self.jdbc_utils = self._get_jdbc_utils()
Construct SAPBWReader instances.
Arguments:
- input_spec: input specification.
def
read(self) -> pyspark.sql.dataframe.DataFrame:
33 def read(self) -> DataFrame: 34 """Read data from SAP BW source. 35 36 Returns: 37 A dataframe containing the data from the SAP BW source. 38 """ 39 options_args, jdbc_args = self._get_options() 40 return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)
Read data from SAP BW source.
Returns:
A dataframe containing the data from the SAP BW source.