lakehouse_engine.io.writers.rest_api_writer

Module to define behaviour to write to REST APIs.

  1"""Module to define behaviour to write to REST APIs."""
  2
  3import json
  4from typing import Any, Callable, OrderedDict
  5
  6from pyspark.sql import DataFrame, Row
  7
  8from lakehouse_engine.core.definitions import OutputSpec
  9from lakehouse_engine.io.writer import Writer
 10from lakehouse_engine.utils.logging_handler import LoggingHandler
 11from lakehouse_engine.utils.rest_api import (
 12    RESTApiException,
 13    RestMethods,
 14    RestStatusCodes,
 15    execute_api_request,
 16)
 17
 18
 19class RestApiWriter(Writer):
 20    """Class to write data to a REST API."""
 21
 22    _logger = LoggingHandler(__name__).get_logger()
 23
 24    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 25        """Construct RestApiWriter instances.
 26
 27        Args:
 28            output_spec: output specification.
 29            df: dataframe to be written.
 30            data: list of all dfs generated on previous steps before writer.
 31        """
 32        super().__init__(output_spec, df, data)
 33
 34    def write(self) -> None:
 35        """Write data to REST API."""
 36        if not self._df.isStreaming:
 37            self._write_to_rest_api_in_batch_mode(self._df, self._output_spec)
 38        else:
 39            self._write_to_rest_api_in_streaming_mode(
 40                self._df, self._output_spec, self._data
 41            )
 42
 43    @staticmethod
 44    def _get_func_to_send_payload_to_rest_api(output_spec: OutputSpec) -> Callable:
 45        """Define and return a function to send the payload to the REST api.
 46
 47        Args:
 48            output_spec: Output Specification containing configurations to
 49                communicate with the REST api. Within the output_spec, the user
 50                can specify several options:
 51                    - rest_api_header: http headers.
 52                    - rest_api_basic_auth: basic http authentication details
 53                        (e.g., {"username": "x", "password": "y"}).
 54                    - rest_api_url: url of the api.
 55                    - rest_api_method: REST method (e.g., POST or PUT).
 56                    - rest_api_sleep_seconds: sleep seconds to avoid throttling.
 57                    - rest_api_is_file_payload: if the payload to be sent to the
 58                        api is in the format of a file using multipart encoding
 59                        upload. if this is true, then the payload will always be
 60                        sent using the "files" parameter in Python's requests
 61                        library.
 62                    - rest_api_file_payload_name: when rest_api_is_file_payload
 63                        is true, this option can be used to define the file
 64                        identifier in Python's requests library.
 65                    - extra_json_payload: when rest_api_file_payload_name is False,
 66                        can be used to provide additional JSON variables to add to
 67                        the original payload. This is useful to complement
 68                        the original payload with some extra input to better
 69                        configure the final payload to send to the REST api. An
 70                        example can be to add a constant configuration value to
 71                        add to the payload data.
 72
 73        Returns:
 74            Function to be called inside Spark dataframe.foreach.
 75        """
 76        headers = output_spec.options.get("rest_api_header", None)
 77        basic_auth_dict = output_spec.options.get("rest_api_basic_auth", None)
 78        url = output_spec.options["rest_api_url"]
 79        method = output_spec.options.get("rest_api_method", RestMethods.POST.value)
 80        sleep_seconds = output_spec.options.get("rest_api_sleep_seconds", 0)
 81        is_file_payload = output_spec.options.get("rest_api_is_file_payload", False)
 82        file_payload_name = output_spec.options.get(
 83            "rest_api_file_payload_name", "file"
 84        )
 85        extra_json_payload = output_spec.options.get(
 86            "rest_api_extra_json_payload", None
 87        )
 88        success_status_codes = output_spec.options.get(
 89            "rest_api_success_status_codes", RestStatusCodes.OK_STATUS_CODES.value
 90        )
 91
 92        def send_payload_to_rest_api(row: Row) -> Any:
 93            """Send payload to the REST API.
 94
 95            The payload needs to be prepared as a JSON string column in a dataframe.
 96            E.g., {"a": "a value", "b": "b value"}.
 97
 98            Args:
 99                row: a row in a dataframe.
100            """
101            if "payload" not in row:
102                raise ValueError("Input DataFrame must contain 'payload' column.")
103
104            str_payload = row.payload
105
106            payload = None
107            if not is_file_payload:
108                payload = json.loads(str_payload)
109            else:
110                payload = {file_payload_name: str_payload}
111
112            if extra_json_payload:
113                payload.update(extra_json_payload)
114
115            RestApiWriter._logger.debug(f"Original payload: {str_payload}")
116            RestApiWriter._logger.debug(f"Final payload: {payload}")
117
118            response = execute_api_request(
119                method=method,
120                url=url,
121                headers=headers,
122                basic_auth_dict=basic_auth_dict,
123                json=payload if not is_file_payload else None,
124                files=payload if is_file_payload else None,
125                sleep_seconds=sleep_seconds,
126            )
127
128            RestApiWriter._logger.debug(
129                f"Response: {response.status_code} - {response.text}"
130            )
131
132            if response.status_code not in success_status_codes:
133                raise RESTApiException(
134                    f"API response status code {response.status_code} is not in"
135                    f" {success_status_codes}. Got {response.text}"
136                )
137
138        return send_payload_to_rest_api
139
140    @staticmethod
141    def _write_to_rest_api_in_batch_mode(
142        df: DataFrame, output_spec: OutputSpec
143    ) -> None:
144        """Write to REST API in Spark batch mode.
145
146        This function uses the dataframe.foreach function to generate a payload
147        for each row of the dataframe and send it to the REST API endpoint.
148
149        Warning! Make sure your execution environment supports RDD api operations,
150        as there are environments where RDD operation may not be supported. As,
151        df.foreach() is a shorthand for df.rdd.foreach(), this can bring issues
152        in such environments.
153
154        Args:
155            df: dataframe to write.
156            output_spec: output specification.
157        """
158        df.foreach(RestApiWriter._get_func_to_send_payload_to_rest_api(output_spec))
159
160    @staticmethod
161    def _write_to_rest_api_in_streaming_mode(
162        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
163    ) -> None:
164        """Write to REST API in streaming mode.
165
166        Args:
167            df: dataframe to write.
168            output_spec: output specification.
169            data: list of all dfs generated on previous steps before writer.
170        """
171        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
172
173        stream_df = (
174            df_writer.options(**output_spec.options if output_spec.options else {})
175            .foreachBatch(
176                RestApiWriter._write_transformed_micro_batch(output_spec, data)
177            )
178            .start()
179        )
180
181        if output_spec.streaming_await_termination:
182            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
183
184    @staticmethod
185    def _write_transformed_micro_batch(  # type: ignore
186        output_spec: OutputSpec, data: OrderedDict
187    ) -> Callable:
188        """Define how to write a streaming micro batch after transforming it.
189
190        Args:
191            output_spec: output specification.
192            data: list of all dfs generated on previous steps before writer.
193
194        Returns:
195            A function to be executed in the foreachBatch spark write method.
196        """
197
198        def inner(batch_df: DataFrame, batch_id: int) -> None:
199            transformed_df = Writer.get_transformed_micro_batch(
200                output_spec, batch_df, batch_id, data
201            )
202
203            if output_spec.streaming_micro_batch_dq_processors:
204                transformed_df = Writer.run_micro_batch_dq_process(
205                    transformed_df, output_spec.streaming_micro_batch_dq_processors
206                )
207
208            RestApiWriter._write_to_rest_api_in_batch_mode(transformed_df, output_spec)
209
210        return inner
class RestApiWriter(lakehouse_engine.io.writer.Writer):
 20class RestApiWriter(Writer):
 21    """Class to write data to a REST API."""
 22
 23    _logger = LoggingHandler(__name__).get_logger()
 24
 25    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 26        """Construct RestApiWriter instances.
 27
 28        Args:
 29            output_spec: output specification.
 30            df: dataframe to be written.
 31            data: list of all dfs generated on previous steps before writer.
 32        """
 33        super().__init__(output_spec, df, data)
 34
 35    def write(self) -> None:
 36        """Write data to REST API."""
 37        if not self._df.isStreaming:
 38            self._write_to_rest_api_in_batch_mode(self._df, self._output_spec)
 39        else:
 40            self._write_to_rest_api_in_streaming_mode(
 41                self._df, self._output_spec, self._data
 42            )
 43
 44    @staticmethod
 45    def _get_func_to_send_payload_to_rest_api(output_spec: OutputSpec) -> Callable:
 46        """Define and return a function to send the payload to the REST api.
 47
 48        Args:
 49            output_spec: Output Specification containing configurations to
 50                communicate with the REST api. Within the output_spec, the user
 51                can specify several options:
 52                    - rest_api_header: http headers.
 53                    - rest_api_basic_auth: basic http authentication details
 54                        (e.g., {"username": "x", "password": "y"}).
 55                    - rest_api_url: url of the api.
 56                    - rest_api_method: REST method (e.g., POST or PUT).
 57                    - rest_api_sleep_seconds: sleep seconds to avoid throttling.
 58                    - rest_api_is_file_payload: if the payload to be sent to the
 59                        api is in the format of a file using multipart encoding
 60                        upload. if this is true, then the payload will always be
 61                        sent using the "files" parameter in Python's requests
 62                        library.
 63                    - rest_api_file_payload_name: when rest_api_is_file_payload
 64                        is true, this option can be used to define the file
 65                        identifier in Python's requests library.
 66                    - extra_json_payload: when rest_api_file_payload_name is False,
 67                        can be used to provide additional JSON variables to add to
 68                        the original payload. This is useful to complement
 69                        the original payload with some extra input to better
 70                        configure the final payload to send to the REST api. An
 71                        example can be to add a constant configuration value to
 72                        add to the payload data.
 73
 74        Returns:
 75            Function to be called inside Spark dataframe.foreach.
 76        """
 77        headers = output_spec.options.get("rest_api_header", None)
 78        basic_auth_dict = output_spec.options.get("rest_api_basic_auth", None)
 79        url = output_spec.options["rest_api_url"]
 80        method = output_spec.options.get("rest_api_method", RestMethods.POST.value)
 81        sleep_seconds = output_spec.options.get("rest_api_sleep_seconds", 0)
 82        is_file_payload = output_spec.options.get("rest_api_is_file_payload", False)
 83        file_payload_name = output_spec.options.get(
 84            "rest_api_file_payload_name", "file"
 85        )
 86        extra_json_payload = output_spec.options.get(
 87            "rest_api_extra_json_payload", None
 88        )
 89        success_status_codes = output_spec.options.get(
 90            "rest_api_success_status_codes", RestStatusCodes.OK_STATUS_CODES.value
 91        )
 92
 93        def send_payload_to_rest_api(row: Row) -> Any:
 94            """Send payload to the REST API.
 95
 96            The payload needs to be prepared as a JSON string column in a dataframe.
 97            E.g., {"a": "a value", "b": "b value"}.
 98
 99            Args:
100                row: a row in a dataframe.
101            """
102            if "payload" not in row:
103                raise ValueError("Input DataFrame must contain 'payload' column.")
104
105            str_payload = row.payload
106
107            payload = None
108            if not is_file_payload:
109                payload = json.loads(str_payload)
110            else:
111                payload = {file_payload_name: str_payload}
112
113            if extra_json_payload:
114                payload.update(extra_json_payload)
115
116            RestApiWriter._logger.debug(f"Original payload: {str_payload}")
117            RestApiWriter._logger.debug(f"Final payload: {payload}")
118
119            response = execute_api_request(
120                method=method,
121                url=url,
122                headers=headers,
123                basic_auth_dict=basic_auth_dict,
124                json=payload if not is_file_payload else None,
125                files=payload if is_file_payload else None,
126                sleep_seconds=sleep_seconds,
127            )
128
129            RestApiWriter._logger.debug(
130                f"Response: {response.status_code} - {response.text}"
131            )
132
133            if response.status_code not in success_status_codes:
134                raise RESTApiException(
135                    f"API response status code {response.status_code} is not in"
136                    f" {success_status_codes}. Got {response.text}"
137                )
138
139        return send_payload_to_rest_api
140
141    @staticmethod
142    def _write_to_rest_api_in_batch_mode(
143        df: DataFrame, output_spec: OutputSpec
144    ) -> None:
145        """Write to REST API in Spark batch mode.
146
147        This function uses the dataframe.foreach function to generate a payload
148        for each row of the dataframe and send it to the REST API endpoint.
149
150        Warning! Make sure your execution environment supports RDD api operations,
151        as there are environments where RDD operation may not be supported. As,
152        df.foreach() is a shorthand for df.rdd.foreach(), this can bring issues
153        in such environments.
154
155        Args:
156            df: dataframe to write.
157            output_spec: output specification.
158        """
159        df.foreach(RestApiWriter._get_func_to_send_payload_to_rest_api(output_spec))
160
161    @staticmethod
162    def _write_to_rest_api_in_streaming_mode(
163        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
164    ) -> None:
165        """Write to REST API in streaming mode.
166
167        Args:
168            df: dataframe to write.
169            output_spec: output specification.
170            data: list of all dfs generated on previous steps before writer.
171        """
172        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
173
174        stream_df = (
175            df_writer.options(**output_spec.options if output_spec.options else {})
176            .foreachBatch(
177                RestApiWriter._write_transformed_micro_batch(output_spec, data)
178            )
179            .start()
180        )
181
182        if output_spec.streaming_await_termination:
183            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
184
185    @staticmethod
186    def _write_transformed_micro_batch(  # type: ignore
187        output_spec: OutputSpec, data: OrderedDict
188    ) -> Callable:
189        """Define how to write a streaming micro batch after transforming it.
190
191        Args:
192            output_spec: output specification.
193            data: list of all dfs generated on previous steps before writer.
194
195        Returns:
196            A function to be executed in the foreachBatch spark write method.
197        """
198
199        def inner(batch_df: DataFrame, batch_id: int) -> None:
200            transformed_df = Writer.get_transformed_micro_batch(
201                output_spec, batch_df, batch_id, data
202            )
203
204            if output_spec.streaming_micro_batch_dq_processors:
205                transformed_df = Writer.run_micro_batch_dq_process(
206                    transformed_df, output_spec.streaming_micro_batch_dq_processors
207                )
208
209            RestApiWriter._write_to_rest_api_in_batch_mode(transformed_df, output_spec)
210
211        return inner

Class to write data to a REST API.

RestApiWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
25    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
26        """Construct RestApiWriter instances.
27
28        Args:
29            output_spec: output specification.
30            df: dataframe to be written.
31            data: list of all dfs generated on previous steps before writer.
32        """
33        super().__init__(output_spec, df, data)

Construct RestApiWriter instances.

Arguments:
  • output_spec: output specification.
  • df: dataframe to be written.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> None:
35    def write(self) -> None:
36        """Write data to REST API."""
37        if not self._df.isStreaming:
38            self._write_to_rest_api_in_batch_mode(self._df, self._output_spec)
39        else:
40            self._write_to_rest_api_in_streaming_mode(
41                self._df, self._output_spec, self._data
42            )

Write data to REST API.