lakehouse_engine.io.writers.rest_api_writer
Module to define behaviour to write to REST APIs.
1"""Module to define behaviour to write to REST APIs.""" 2 3import json 4from typing import Any, Callable, OrderedDict 5 6from pyspark.sql import DataFrame, Row 7 8from lakehouse_engine.core.definitions import OutputSpec 9from lakehouse_engine.io.writer import Writer 10from lakehouse_engine.utils.logging_handler import LoggingHandler 11from lakehouse_engine.utils.rest_api import ( 12 RESTApiException, 13 RestMethods, 14 RestStatusCodes, 15 execute_api_request, 16) 17 18 19class RestApiWriter(Writer): 20 """Class to write data to a REST API.""" 21 22 _logger = LoggingHandler(__name__).get_logger() 23 24 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 25 """Construct RestApiWriter instances. 26 27 Args: 28 output_spec: output specification. 29 df: dataframe to be written. 30 data: list of all dfs generated on previous steps before writer. 31 """ 32 super().__init__(output_spec, df, data) 33 34 def write(self) -> None: 35 """Write data to REST API.""" 36 if not self._df.isStreaming: 37 self._write_to_rest_api_in_batch_mode(self._df, self._output_spec) 38 else: 39 self._write_to_rest_api_in_streaming_mode( 40 self._df, self._output_spec, self._data 41 ) 42 43 @staticmethod 44 def _get_func_to_send_payload_to_rest_api(output_spec: OutputSpec) -> Callable: 45 """Define and return a function to send the payload to the REST api. 46 47 Args: 48 output_spec: Output Specification containing configurations to 49 communicate with the REST api. Within the output_spec, the user 50 can specify several options: 51 - rest_api_header: http headers. 52 - rest_api_basic_auth: basic http authentication details 53 (e.g., {"username": "x", "password": "y"}). 54 - rest_api_url: url of the api. 55 - rest_api_method: REST method (e.g., POST or PUT). 56 - rest_api_sleep_seconds: sleep seconds to avoid throttling. 57 - rest_api_is_file_payload: if the payload to be sent to the 58 api is in the format of a file using multipart encoding 59 upload. if this is true, then the payload will always be 60 sent using the "files" parameter in Python's requests 61 library. 62 - rest_api_file_payload_name: when rest_api_is_file_payload 63 is true, this option can be used to define the file 64 identifier in Python's requests library. 65 - extra_json_payload: when rest_api_file_payload_name is False, 66 can be used to provide additional JSON variables to add to 67 the original payload. This is useful to complement 68 the original payload with some extra input to better 69 configure the final payload to send to the REST api. An 70 example can be to add a constant configuration value to 71 add to the payload data. 72 73 Returns: 74 Function to be called inside Spark dataframe.foreach. 75 """ 76 headers = output_spec.options.get("rest_api_header", None) 77 basic_auth_dict = output_spec.options.get("rest_api_basic_auth", None) 78 url = output_spec.options["rest_api_url"] 79 method = output_spec.options.get("rest_api_method", RestMethods.POST.value) 80 sleep_seconds = output_spec.options.get("rest_api_sleep_seconds", 0) 81 is_file_payload = output_spec.options.get("rest_api_is_file_payload", False) 82 file_payload_name = output_spec.options.get( 83 "rest_api_file_payload_name", "file" 84 ) 85 extra_json_payload = output_spec.options.get( 86 "rest_api_extra_json_payload", None 87 ) 88 success_status_codes = output_spec.options.get( 89 "rest_api_success_status_codes", RestStatusCodes.OK_STATUS_CODES.value 90 ) 91 92 def send_payload_to_rest_api(row: Row) -> Any: 93 """Send payload to the REST API. 94 95 The payload needs to be prepared as a JSON string column in a dataframe. 96 E.g., {"a": "a value", "b": "b value"}. 97 98 Args: 99 row: a row in a dataframe. 100 """ 101 if "payload" not in row: 102 raise ValueError("Input DataFrame must contain 'payload' column.") 103 104 str_payload = row.payload 105 106 payload = None 107 if not is_file_payload: 108 payload = json.loads(str_payload) 109 else: 110 payload = {file_payload_name: str_payload} 111 112 if extra_json_payload: 113 payload.update(extra_json_payload) 114 115 RestApiWriter._logger.debug(f"Original payload: {str_payload}") 116 RestApiWriter._logger.debug(f"Final payload: {payload}") 117 118 response = execute_api_request( 119 method=method, 120 url=url, 121 headers=headers, 122 basic_auth_dict=basic_auth_dict, 123 json=payload if not is_file_payload else None, 124 files=payload if is_file_payload else None, 125 sleep_seconds=sleep_seconds, 126 ) 127 128 RestApiWriter._logger.debug( 129 f"Response: {response.status_code} - {response.text}" 130 ) 131 132 if response.status_code not in success_status_codes: 133 raise RESTApiException( 134 f"API response status code {response.status_code} is not in" 135 f" {success_status_codes}. Got {response.text}" 136 ) 137 138 return send_payload_to_rest_api 139 140 @staticmethod 141 def _write_to_rest_api_in_batch_mode( 142 df: DataFrame, output_spec: OutputSpec 143 ) -> None: 144 """Write to REST API in Spark batch mode. 145 146 This function uses the dataframe.foreach function to generate a payload 147 for each row of the dataframe and send it to the REST API endpoint. 148 149 Warning! Make sure your execution environment supports RDD api operations, 150 as there are environments where RDD operation may not be supported. As, 151 df.foreach() is a shorthand for df.rdd.foreach(), this can bring issues 152 in such environments. 153 154 Args: 155 df: dataframe to write. 156 output_spec: output specification. 157 """ 158 df.foreach(RestApiWriter._get_func_to_send_payload_to_rest_api(output_spec)) 159 160 @staticmethod 161 def _write_to_rest_api_in_streaming_mode( 162 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 163 ) -> None: 164 """Write to REST API in streaming mode. 165 166 Args: 167 df: dataframe to write. 168 output_spec: output specification. 169 data: list of all dfs generated on previous steps before writer. 170 """ 171 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 172 173 stream_df = ( 174 df_writer.options(**output_spec.options if output_spec.options else {}) 175 .foreachBatch( 176 RestApiWriter._write_transformed_micro_batch(output_spec, data) 177 ) 178 .start() 179 ) 180 181 if output_spec.streaming_await_termination: 182 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 183 184 @staticmethod 185 def _write_transformed_micro_batch( # type: ignore 186 output_spec: OutputSpec, data: OrderedDict 187 ) -> Callable: 188 """Define how to write a streaming micro batch after transforming it. 189 190 Args: 191 output_spec: output specification. 192 data: list of all dfs generated on previous steps before writer. 193 194 Returns: 195 A function to be executed in the foreachBatch spark write method. 196 """ 197 198 def inner(batch_df: DataFrame, batch_id: int) -> None: 199 transformed_df = Writer.get_transformed_micro_batch( 200 output_spec, batch_df, batch_id, data 201 ) 202 203 if output_spec.streaming_micro_batch_dq_processors: 204 transformed_df = Writer.run_micro_batch_dq_process( 205 transformed_df, output_spec.streaming_micro_batch_dq_processors 206 ) 207 208 RestApiWriter._write_to_rest_api_in_batch_mode(transformed_df, output_spec) 209 210 return inner
20class RestApiWriter(Writer): 21 """Class to write data to a REST API.""" 22 23 _logger = LoggingHandler(__name__).get_logger() 24 25 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 26 """Construct RestApiWriter instances. 27 28 Args: 29 output_spec: output specification. 30 df: dataframe to be written. 31 data: list of all dfs generated on previous steps before writer. 32 """ 33 super().__init__(output_spec, df, data) 34 35 def write(self) -> None: 36 """Write data to REST API.""" 37 if not self._df.isStreaming: 38 self._write_to_rest_api_in_batch_mode(self._df, self._output_spec) 39 else: 40 self._write_to_rest_api_in_streaming_mode( 41 self._df, self._output_spec, self._data 42 ) 43 44 @staticmethod 45 def _get_func_to_send_payload_to_rest_api(output_spec: OutputSpec) -> Callable: 46 """Define and return a function to send the payload to the REST api. 47 48 Args: 49 output_spec: Output Specification containing configurations to 50 communicate with the REST api. Within the output_spec, the user 51 can specify several options: 52 - rest_api_header: http headers. 53 - rest_api_basic_auth: basic http authentication details 54 (e.g., {"username": "x", "password": "y"}). 55 - rest_api_url: url of the api. 56 - rest_api_method: REST method (e.g., POST or PUT). 57 - rest_api_sleep_seconds: sleep seconds to avoid throttling. 58 - rest_api_is_file_payload: if the payload to be sent to the 59 api is in the format of a file using multipart encoding 60 upload. if this is true, then the payload will always be 61 sent using the "files" parameter in Python's requests 62 library. 63 - rest_api_file_payload_name: when rest_api_is_file_payload 64 is true, this option can be used to define the file 65 identifier in Python's requests library. 66 - extra_json_payload: when rest_api_file_payload_name is False, 67 can be used to provide additional JSON variables to add to 68 the original payload. This is useful to complement 69 the original payload with some extra input to better 70 configure the final payload to send to the REST api. An 71 example can be to add a constant configuration value to 72 add to the payload data. 73 74 Returns: 75 Function to be called inside Spark dataframe.foreach. 76 """ 77 headers = output_spec.options.get("rest_api_header", None) 78 basic_auth_dict = output_spec.options.get("rest_api_basic_auth", None) 79 url = output_spec.options["rest_api_url"] 80 method = output_spec.options.get("rest_api_method", RestMethods.POST.value) 81 sleep_seconds = output_spec.options.get("rest_api_sleep_seconds", 0) 82 is_file_payload = output_spec.options.get("rest_api_is_file_payload", False) 83 file_payload_name = output_spec.options.get( 84 "rest_api_file_payload_name", "file" 85 ) 86 extra_json_payload = output_spec.options.get( 87 "rest_api_extra_json_payload", None 88 ) 89 success_status_codes = output_spec.options.get( 90 "rest_api_success_status_codes", RestStatusCodes.OK_STATUS_CODES.value 91 ) 92 93 def send_payload_to_rest_api(row: Row) -> Any: 94 """Send payload to the REST API. 95 96 The payload needs to be prepared as a JSON string column in a dataframe. 97 E.g., {"a": "a value", "b": "b value"}. 98 99 Args: 100 row: a row in a dataframe. 101 """ 102 if "payload" not in row: 103 raise ValueError("Input DataFrame must contain 'payload' column.") 104 105 str_payload = row.payload 106 107 payload = None 108 if not is_file_payload: 109 payload = json.loads(str_payload) 110 else: 111 payload = {file_payload_name: str_payload} 112 113 if extra_json_payload: 114 payload.update(extra_json_payload) 115 116 RestApiWriter._logger.debug(f"Original payload: {str_payload}") 117 RestApiWriter._logger.debug(f"Final payload: {payload}") 118 119 response = execute_api_request( 120 method=method, 121 url=url, 122 headers=headers, 123 basic_auth_dict=basic_auth_dict, 124 json=payload if not is_file_payload else None, 125 files=payload if is_file_payload else None, 126 sleep_seconds=sleep_seconds, 127 ) 128 129 RestApiWriter._logger.debug( 130 f"Response: {response.status_code} - {response.text}" 131 ) 132 133 if response.status_code not in success_status_codes: 134 raise RESTApiException( 135 f"API response status code {response.status_code} is not in" 136 f" {success_status_codes}. Got {response.text}" 137 ) 138 139 return send_payload_to_rest_api 140 141 @staticmethod 142 def _write_to_rest_api_in_batch_mode( 143 df: DataFrame, output_spec: OutputSpec 144 ) -> None: 145 """Write to REST API in Spark batch mode. 146 147 This function uses the dataframe.foreach function to generate a payload 148 for each row of the dataframe and send it to the REST API endpoint. 149 150 Warning! Make sure your execution environment supports RDD api operations, 151 as there are environments where RDD operation may not be supported. As, 152 df.foreach() is a shorthand for df.rdd.foreach(), this can bring issues 153 in such environments. 154 155 Args: 156 df: dataframe to write. 157 output_spec: output specification. 158 """ 159 df.foreach(RestApiWriter._get_func_to_send_payload_to_rest_api(output_spec)) 160 161 @staticmethod 162 def _write_to_rest_api_in_streaming_mode( 163 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 164 ) -> None: 165 """Write to REST API in streaming mode. 166 167 Args: 168 df: dataframe to write. 169 output_spec: output specification. 170 data: list of all dfs generated on previous steps before writer. 171 """ 172 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 173 174 stream_df = ( 175 df_writer.options(**output_spec.options if output_spec.options else {}) 176 .foreachBatch( 177 RestApiWriter._write_transformed_micro_batch(output_spec, data) 178 ) 179 .start() 180 ) 181 182 if output_spec.streaming_await_termination: 183 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 184 185 @staticmethod 186 def _write_transformed_micro_batch( # type: ignore 187 output_spec: OutputSpec, data: OrderedDict 188 ) -> Callable: 189 """Define how to write a streaming micro batch after transforming it. 190 191 Args: 192 output_spec: output specification. 193 data: list of all dfs generated on previous steps before writer. 194 195 Returns: 196 A function to be executed in the foreachBatch spark write method. 197 """ 198 199 def inner(batch_df: DataFrame, batch_id: int) -> None: 200 transformed_df = Writer.get_transformed_micro_batch( 201 output_spec, batch_df, batch_id, data 202 ) 203 204 if output_spec.streaming_micro_batch_dq_processors: 205 transformed_df = Writer.run_micro_batch_dq_process( 206 transformed_df, output_spec.streaming_micro_batch_dq_processors 207 ) 208 209 RestApiWriter._write_to_rest_api_in_batch_mode(transformed_df, output_spec) 210 211 return inner
Class to write data to a REST API.
RestApiWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
25 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 26 """Construct RestApiWriter instances. 27 28 Args: 29 output_spec: output specification. 30 df: dataframe to be written. 31 data: list of all dfs generated on previous steps before writer. 32 """ 33 super().__init__(output_spec, df, data)
Construct RestApiWriter instances.
Arguments:
- output_spec: output specification.
- df: dataframe to be written.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> None:
35 def write(self) -> None: 36 """Write data to REST API.""" 37 if not self._df.isStreaming: 38 self._write_to_rest_api_in_batch_mode(self._df, self._output_spec) 39 else: 40 self._write_to_rest_api_in_streaming_mode( 41 self._df, self._output_spec, self._data 42 )
Write data to REST API.