lakehouse_engine.io.readers.sap_b4_reader
Module to define behaviour to read from SAP B4 sources.
1"""Module to define behaviour to read from SAP B4 sources.""" 2 3from logging import Logger 4from typing import Tuple 5 6from pyspark.sql import DataFrame 7 8from lakehouse_engine.core.definitions import InputSpec 9from lakehouse_engine.core.exec_env import ExecEnv 10from lakehouse_engine.io.reader import Reader 11from lakehouse_engine.utils.extraction.sap_b4_extraction_utils import ( 12 ADSOTypes, 13 SAPB4Extraction, 14 SAPB4ExtractionUtils, 15) 16from lakehouse_engine.utils.logging_handler import LoggingHandler 17 18 19class SAPB4Reader(Reader): 20 """Class to read from SAP B4 source.""" 21 22 _LOGGER: Logger = LoggingHandler(__name__).get_logger() 23 24 def __init__(self, input_spec: InputSpec): 25 """Construct SAPB4Reader instances. 26 27 Args: 28 input_spec: input specification. 29 """ 30 super().__init__(input_spec) 31 self.jdbc_utils = self._get_jdbc_utils() 32 33 def read(self) -> DataFrame: 34 """Read data from SAP B4 source. 35 36 Returns: 37 A dataframe containing the data from the SAP B4 source. 38 """ 39 options_args, jdbc_args = self._get_options() 40 return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args) 41 42 def _get_jdbc_utils(self) -> SAPB4ExtractionUtils: 43 jdbc_extraction = SAPB4Extraction( 44 user=self._input_spec.options["user"], 45 password=self._input_spec.options["password"], 46 url=self._input_spec.options["url"], 47 dbtable=self._input_spec.options["dbtable"], 48 adso_type=self._input_spec.options["adso_type"], 49 request_status_tbl=self._input_spec.options.get( 50 "request_status_tbl", SAPB4Extraction.request_status_tbl 51 ), 52 changelog_table=self._input_spec.options.get( 53 "changelog_table", 54 ( 55 self._input_spec.options["dbtable"] 56 if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value 57 else self._input_spec.options["changelog_table"] 58 ), 59 ), 60 data_target=SAPB4ExtractionUtils.get_data_target(self._input_spec.options), 61 act_req_join_condition=self._input_spec.options.get( 62 "act_req_join_condition", SAPB4Extraction.act_req_join_condition 63 ), 64 latest_timestamp_data_location=self._input_spec.options.get( 65 "latest_timestamp_data_location", 66 SAPB4Extraction.latest_timestamp_data_location, 67 ), 68 latest_timestamp_input_col=self._input_spec.options.get( 69 "latest_timestamp_input_col", 70 SAPB4Extraction.latest_timestamp_input_col, 71 ), 72 latest_timestamp_data_format=self._input_spec.options.get( 73 "latest_timestamp_data_format", 74 SAPB4Extraction.latest_timestamp_data_format, 75 ), 76 extraction_type=self._input_spec.options.get( 77 "extraction_type", SAPB4Extraction.extraction_type 78 ), 79 driver=self._input_spec.options.get("driver", SAPB4Extraction.driver), 80 num_partitions=self._input_spec.options.get( 81 "numPartitions", SAPB4Extraction.num_partitions 82 ), 83 partition_column=self._input_spec.options.get( 84 "partitionColumn", SAPB4Extraction.partition_column 85 ), 86 lower_bound=self._input_spec.options.get( 87 "lowerBound", SAPB4Extraction.lower_bound 88 ), 89 upper_bound=self._input_spec.options.get( 90 "upperBound", SAPB4Extraction.upper_bound 91 ), 92 default_upper_bound=self._input_spec.options.get( 93 "default_upper_bound", SAPB4Extraction.default_upper_bound 94 ), 95 fetch_size=self._input_spec.options.get( 96 "fetchSize", SAPB4Extraction.fetch_size 97 ), 98 compress=self._input_spec.options.get("compress", SAPB4Extraction.compress), 99 custom_schema=self._input_spec.options.get( 100 "customSchema", SAPB4Extraction.custom_schema 101 ), 102 extraction_timestamp=self._input_spec.options.get( 103 "extraction_timestamp", 104 SAPB4Extraction.extraction_timestamp, 105 ), 106 min_timestamp=self._input_spec.options.get( 107 "min_timestamp", SAPB4Extraction.min_timestamp 108 ), 109 max_timestamp=self._input_spec.options.get( 110 "max_timestamp", SAPB4Extraction.max_timestamp 111 ), 112 default_max_timestamp=self._input_spec.options.get( 113 "default_max_timestamp", SAPB4Extraction.default_max_timestamp 114 ), 115 max_timestamp_custom_schema=self._input_spec.options.get( 116 "max_timestamp_custom_schema", 117 SAPB4Extraction.max_timestamp_custom_schema, 118 ), 119 generate_predicates=self._input_spec.generate_predicates, 120 predicates=self._input_spec.options.get( 121 "predicates", SAPB4Extraction.predicates 122 ), 123 predicates_add_null=self._input_spec.predicates_add_null, 124 extra_cols_req_status_tbl=self._input_spec.options.get( 125 "extra_cols_req_status_tbl", SAPB4Extraction.extra_cols_req_status_tbl 126 ), 127 calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema, 128 include_changelog_tech_cols=self._input_spec.options.get( 129 "include_changelog_tech_cols", 130 ( 131 False 132 if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value 133 else True 134 ), 135 ), 136 ) 137 return SAPB4ExtractionUtils(jdbc_extraction) 138 139 def _get_options(self) -> Tuple[dict, dict]: 140 """Get Spark Options using JDBC utilities. 141 142 Returns: 143 A tuple dict containing the options args and 144 jdbc args to be passed to Spark. 145 """ 146 self._LOGGER.info( 147 f"Initial options passed to the SAP B4 Reader: {self._input_spec.options}" 148 ) 149 150 options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options() 151 152 if self._input_spec.generate_predicates or self._input_spec.options.get( 153 "predicates", None 154 ): 155 options_args.update( 156 self.jdbc_utils.get_additional_spark_options( 157 self._input_spec, 158 options_args, 159 ["partitionColumn", "numPartitions", "lowerBound", "upperBound"], 160 ) 161 ) 162 else: 163 if self._input_spec.calculate_upper_bound: 164 options_args["upperBound"] = ( 165 self.jdbc_utils.get_spark_jdbc_optimal_upper_bound() 166 ) 167 168 options_args.update( 169 self.jdbc_utils.get_additional_spark_options( 170 self._input_spec, options_args 171 ) 172 ) 173 174 self._LOGGER.info( 175 f"Final options to fill SAP B4 Reader Options: {options_args}" 176 ) 177 self._LOGGER.info(f"Final jdbc args to fill SAP B4 Reader JDBC: {jdbc_args}") 178 return options_args, jdbc_args
20class SAPB4Reader(Reader): 21 """Class to read from SAP B4 source.""" 22 23 _LOGGER: Logger = LoggingHandler(__name__).get_logger() 24 25 def __init__(self, input_spec: InputSpec): 26 """Construct SAPB4Reader instances. 27 28 Args: 29 input_spec: input specification. 30 """ 31 super().__init__(input_spec) 32 self.jdbc_utils = self._get_jdbc_utils() 33 34 def read(self) -> DataFrame: 35 """Read data from SAP B4 source. 36 37 Returns: 38 A dataframe containing the data from the SAP B4 source. 39 """ 40 options_args, jdbc_args = self._get_options() 41 return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args) 42 43 def _get_jdbc_utils(self) -> SAPB4ExtractionUtils: 44 jdbc_extraction = SAPB4Extraction( 45 user=self._input_spec.options["user"], 46 password=self._input_spec.options["password"], 47 url=self._input_spec.options["url"], 48 dbtable=self._input_spec.options["dbtable"], 49 adso_type=self._input_spec.options["adso_type"], 50 request_status_tbl=self._input_spec.options.get( 51 "request_status_tbl", SAPB4Extraction.request_status_tbl 52 ), 53 changelog_table=self._input_spec.options.get( 54 "changelog_table", 55 ( 56 self._input_spec.options["dbtable"] 57 if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value 58 else self._input_spec.options["changelog_table"] 59 ), 60 ), 61 data_target=SAPB4ExtractionUtils.get_data_target(self._input_spec.options), 62 act_req_join_condition=self._input_spec.options.get( 63 "act_req_join_condition", SAPB4Extraction.act_req_join_condition 64 ), 65 latest_timestamp_data_location=self._input_spec.options.get( 66 "latest_timestamp_data_location", 67 SAPB4Extraction.latest_timestamp_data_location, 68 ), 69 latest_timestamp_input_col=self._input_spec.options.get( 70 "latest_timestamp_input_col", 71 SAPB4Extraction.latest_timestamp_input_col, 72 ), 73 latest_timestamp_data_format=self._input_spec.options.get( 74 "latest_timestamp_data_format", 75 SAPB4Extraction.latest_timestamp_data_format, 76 ), 77 extraction_type=self._input_spec.options.get( 78 "extraction_type", SAPB4Extraction.extraction_type 79 ), 80 driver=self._input_spec.options.get("driver", SAPB4Extraction.driver), 81 num_partitions=self._input_spec.options.get( 82 "numPartitions", SAPB4Extraction.num_partitions 83 ), 84 partition_column=self._input_spec.options.get( 85 "partitionColumn", SAPB4Extraction.partition_column 86 ), 87 lower_bound=self._input_spec.options.get( 88 "lowerBound", SAPB4Extraction.lower_bound 89 ), 90 upper_bound=self._input_spec.options.get( 91 "upperBound", SAPB4Extraction.upper_bound 92 ), 93 default_upper_bound=self._input_spec.options.get( 94 "default_upper_bound", SAPB4Extraction.default_upper_bound 95 ), 96 fetch_size=self._input_spec.options.get( 97 "fetchSize", SAPB4Extraction.fetch_size 98 ), 99 compress=self._input_spec.options.get("compress", SAPB4Extraction.compress), 100 custom_schema=self._input_spec.options.get( 101 "customSchema", SAPB4Extraction.custom_schema 102 ), 103 extraction_timestamp=self._input_spec.options.get( 104 "extraction_timestamp", 105 SAPB4Extraction.extraction_timestamp, 106 ), 107 min_timestamp=self._input_spec.options.get( 108 "min_timestamp", SAPB4Extraction.min_timestamp 109 ), 110 max_timestamp=self._input_spec.options.get( 111 "max_timestamp", SAPB4Extraction.max_timestamp 112 ), 113 default_max_timestamp=self._input_spec.options.get( 114 "default_max_timestamp", SAPB4Extraction.default_max_timestamp 115 ), 116 max_timestamp_custom_schema=self._input_spec.options.get( 117 "max_timestamp_custom_schema", 118 SAPB4Extraction.max_timestamp_custom_schema, 119 ), 120 generate_predicates=self._input_spec.generate_predicates, 121 predicates=self._input_spec.options.get( 122 "predicates", SAPB4Extraction.predicates 123 ), 124 predicates_add_null=self._input_spec.predicates_add_null, 125 extra_cols_req_status_tbl=self._input_spec.options.get( 126 "extra_cols_req_status_tbl", SAPB4Extraction.extra_cols_req_status_tbl 127 ), 128 calc_upper_bound_schema=self._input_spec.calc_upper_bound_schema, 129 include_changelog_tech_cols=self._input_spec.options.get( 130 "include_changelog_tech_cols", 131 ( 132 False 133 if self._input_spec.options["adso_type"] == ADSOTypes.AQ.value 134 else True 135 ), 136 ), 137 ) 138 return SAPB4ExtractionUtils(jdbc_extraction) 139 140 def _get_options(self) -> Tuple[dict, dict]: 141 """Get Spark Options using JDBC utilities. 142 143 Returns: 144 A tuple dict containing the options args and 145 jdbc args to be passed to Spark. 146 """ 147 self._LOGGER.info( 148 f"Initial options passed to the SAP B4 Reader: {self._input_spec.options}" 149 ) 150 151 options_args, jdbc_args = self.jdbc_utils.get_spark_jdbc_options() 152 153 if self._input_spec.generate_predicates or self._input_spec.options.get( 154 "predicates", None 155 ): 156 options_args.update( 157 self.jdbc_utils.get_additional_spark_options( 158 self._input_spec, 159 options_args, 160 ["partitionColumn", "numPartitions", "lowerBound", "upperBound"], 161 ) 162 ) 163 else: 164 if self._input_spec.calculate_upper_bound: 165 options_args["upperBound"] = ( 166 self.jdbc_utils.get_spark_jdbc_optimal_upper_bound() 167 ) 168 169 options_args.update( 170 self.jdbc_utils.get_additional_spark_options( 171 self._input_spec, options_args 172 ) 173 ) 174 175 self._LOGGER.info( 176 f"Final options to fill SAP B4 Reader Options: {options_args}" 177 ) 178 self._LOGGER.info(f"Final jdbc args to fill SAP B4 Reader JDBC: {jdbc_args}") 179 return options_args, jdbc_args
Class to read from SAP B4 source.
SAPB4Reader(input_spec: lakehouse_engine.core.definitions.InputSpec)
25 def __init__(self, input_spec: InputSpec): 26 """Construct SAPB4Reader instances. 27 28 Args: 29 input_spec: input specification. 30 """ 31 super().__init__(input_spec) 32 self.jdbc_utils = self._get_jdbc_utils()
Construct SAPB4Reader instances.
Arguments:
- input_spec: input specification.
def
read(self) -> pyspark.sql.dataframe.DataFrame:
34 def read(self) -> DataFrame: 35 """Read data from SAP B4 source. 36 37 Returns: 38 A dataframe containing the data from the SAP B4 source. 39 """ 40 options_args, jdbc_args = self._get_options() 41 return ExecEnv.SESSION.read.options(**options_args).jdbc(**jdbc_args)
Read data from SAP B4 source.
Returns:
A dataframe containing the data from the SAP B4 source.