lakehouse_engine.utils.rest_api
Module to handle REST API operations.
1"""Module to handle REST API operations.""" 2 3import time 4from enum import Enum 5 6import requests 7from requests.adapters import HTTPAdapter 8from urllib3.util.retry import Retry 9 10from lakehouse_engine.utils.logging_handler import LoggingHandler 11 12LOG = LoggingHandler(__name__).get_logger() 13DEFAULT_CONTENT_TYPE = "application/json" 14 15 16class RestMethods(Enum): 17 """Methods for REST API calls.""" 18 19 POST = "POST" 20 PUT = "PUT" 21 ALLOWED_METHODS = ["POST", "PUT"] 22 23 24class RestStatusCodes(Enum): 25 """REST Status Code.""" 26 27 RETRY_STATUS_CODES = [429, 500, 502, 503, 504] 28 OK_STATUS_CODES = [200] 29 30 31class RESTApiException(requests.RequestException): 32 """Class representing any possible REST API Exception.""" 33 34 def __init__(self, message: str) -> None: 35 """Construct RESTApiException instances. 36 37 Args: 38 message: message to display on exception event. 39 """ 40 super().__init__(message) 41 42 43def get_basic_auth(username: str, password: str) -> requests.auth.HTTPBasicAuth: 44 """Get the basic authentication object to authenticate REST requests. 45 46 Args: 47 username: username. 48 password: password. 49 50 Returns: 51 requests.auth.HTTPBasicAuth: the HTTPBasicAuth object. 52 """ 53 return requests.auth.HTTPBasicAuth(username, password) 54 55 56def get_configured_session( 57 sleep_seconds: float = 0.2, 58 total_retries: int = 5, 59 backoff_factor: int = 2, 60 retry_status_codes: list = None, 61 allowed_methods: list = None, 62 protocol: str = "https://", 63) -> requests.Session: 64 """Get a configured requests Session with exponential backoff. 65 66 Args: 67 sleep_seconds: seconds to sleep before each request to avoid rate limits. 68 total_retries: number of times to retry. 69 backoff_factor: factor for the exponential backoff. 70 retry_status_codes: list of status code that triggers a retry. 71 allowed_methods: http methods that are allowed for retry. 72 protocol: http:// or https://. 73 74 Returns 75 requests.Session: the configured session. 76 """ 77 retry_status_codes = ( 78 retry_status_codes 79 if retry_status_codes 80 else RestStatusCodes.RETRY_STATUS_CODES.value 81 ) 82 allowed_methods = ( 83 allowed_methods if allowed_methods else RestMethods.ALLOWED_METHODS.value 84 ) 85 time.sleep(sleep_seconds) 86 session = requests.Session() 87 retries = Retry( 88 total=total_retries, 89 backoff_factor=backoff_factor, 90 status_forcelist=retry_status_codes, 91 allowed_methods=allowed_methods, 92 ) 93 session.mount(protocol, HTTPAdapter(max_retries=retries)) 94 return session 95 96 97def execute_api_request( 98 method: str, 99 url: str, 100 headers: dict = None, 101 basic_auth_dict: dict = None, 102 json: dict = None, 103 files: dict = None, 104 sleep_seconds: float = 0.2, 105) -> requests.Response: 106 """Execute a REST API request. 107 108 Args: 109 method: REST method (e.g., POST or PUT). 110 url: url of the api. 111 headers: request headers. 112 basic_auth_dict: basic http authentication details 113 (e.g., {"username": "x", "password": "y"}). 114 json: json payload to send in the request. 115 files: files payload to send in the request. 116 sleep_seconds: for how many seconds to sleep to avoid error 429. 117 118 Returns: 119 response from the HTTP request. 120 """ 121 basic_auth: requests.auth.HTTPBasicAuth = None 122 if basic_auth_dict: 123 basic_auth = get_basic_auth( 124 basic_auth_dict["username"], basic_auth_dict["password"] 125 ) 126 127 return get_configured_session(sleep_seconds=sleep_seconds).request( 128 method=method, 129 url=url, 130 headers=headers, 131 auth=basic_auth, 132 json=json, 133 files=files, 134 )
LOG =
<Logger lakehouse_engine.utils.rest_api (DEBUG)>
DEFAULT_CONTENT_TYPE =
'application/json'
class
RestMethods(enum.Enum):
17class RestMethods(Enum): 18 """Methods for REST API calls.""" 19 20 POST = "POST" 21 PUT = "PUT" 22 ALLOWED_METHODS = ["POST", "PUT"]
Methods for REST API calls.
POST =
<RestMethods.POST: 'POST'>
PUT =
<RestMethods.PUT: 'PUT'>
ALLOWED_METHODS =
<RestMethods.ALLOWED_METHODS: ['POST', 'PUT']>
Inherited Members
- enum.Enum
- name
- value
class
RestStatusCodes(enum.Enum):
25class RestStatusCodes(Enum): 26 """REST Status Code.""" 27 28 RETRY_STATUS_CODES = [429, 500, 502, 503, 504] 29 OK_STATUS_CODES = [200]
REST Status Code.
RETRY_STATUS_CODES =
<RestStatusCodes.RETRY_STATUS_CODES: [429, 500, 502, 503, 504]>
OK_STATUS_CODES =
<RestStatusCodes.OK_STATUS_CODES: [200]>
Inherited Members
- enum.Enum
- name
- value
class
RESTApiException(requests.exceptions.RequestException):
32class RESTApiException(requests.RequestException): 33 """Class representing any possible REST API Exception.""" 34 35 def __init__(self, message: str) -> None: 36 """Construct RESTApiException instances. 37 38 Args: 39 message: message to display on exception event. 40 """ 41 super().__init__(message)
Class representing any possible REST API Exception.
RESTApiException(message: str)
35 def __init__(self, message: str) -> None: 36 """Construct RESTApiException instances. 37 38 Args: 39 message: message to display on exception event. 40 """ 41 super().__init__(message)
Construct RESTApiException instances.
Arguments:
- message: message to display on exception event.
Inherited Members
- requests.exceptions.RequestException
- response
- request
- builtins.OSError
- errno
- strerror
- filename
- filename2
- characters_written
- builtins.BaseException
- with_traceback
- add_note
- args
def
get_basic_auth(username: str, password: str) -> requests.auth.HTTPBasicAuth:
44def get_basic_auth(username: str, password: str) -> requests.auth.HTTPBasicAuth: 45 """Get the basic authentication object to authenticate REST requests. 46 47 Args: 48 username: username. 49 password: password. 50 51 Returns: 52 requests.auth.HTTPBasicAuth: the HTTPBasicAuth object. 53 """ 54 return requests.auth.HTTPBasicAuth(username, password)
Get the basic authentication object to authenticate REST requests.
Arguments:
- username: username.
- password: password.
Returns:
requests.auth.HTTPBasicAuth: the HTTPBasicAuth object.
def
get_configured_session( sleep_seconds: float = 0.2, total_retries: int = 5, backoff_factor: int = 2, retry_status_codes: list = None, allowed_methods: list = None, protocol: str = 'https://') -> requests.sessions.Session:
57def get_configured_session( 58 sleep_seconds: float = 0.2, 59 total_retries: int = 5, 60 backoff_factor: int = 2, 61 retry_status_codes: list = None, 62 allowed_methods: list = None, 63 protocol: str = "https://", 64) -> requests.Session: 65 """Get a configured requests Session with exponential backoff. 66 67 Args: 68 sleep_seconds: seconds to sleep before each request to avoid rate limits. 69 total_retries: number of times to retry. 70 backoff_factor: factor for the exponential backoff. 71 retry_status_codes: list of status code that triggers a retry. 72 allowed_methods: http methods that are allowed for retry. 73 protocol: http:// or https://. 74 75 Returns 76 requests.Session: the configured session. 77 """ 78 retry_status_codes = ( 79 retry_status_codes 80 if retry_status_codes 81 else RestStatusCodes.RETRY_STATUS_CODES.value 82 ) 83 allowed_methods = ( 84 allowed_methods if allowed_methods else RestMethods.ALLOWED_METHODS.value 85 ) 86 time.sleep(sleep_seconds) 87 session = requests.Session() 88 retries = Retry( 89 total=total_retries, 90 backoff_factor=backoff_factor, 91 status_forcelist=retry_status_codes, 92 allowed_methods=allowed_methods, 93 ) 94 session.mount(protocol, HTTPAdapter(max_retries=retries)) 95 return session
Get a configured requests Session with exponential backoff.
Arguments:
- sleep_seconds: seconds to sleep before each request to avoid rate limits.
- total_retries: number of times to retry.
- backoff_factor: factor for the exponential backoff.
- retry_status_codes: list of status code that triggers a retry.
- allowed_methods: http methods that are allowed for retry.
- protocol: http:// or https://.
Returns requests.Session: the configured session.
def
execute_api_request( method: str, url: str, headers: dict = None, basic_auth_dict: dict = None, json: dict = None, files: dict = None, sleep_seconds: float = 0.2) -> requests.models.Response:
98def execute_api_request( 99 method: str, 100 url: str, 101 headers: dict = None, 102 basic_auth_dict: dict = None, 103 json: dict = None, 104 files: dict = None, 105 sleep_seconds: float = 0.2, 106) -> requests.Response: 107 """Execute a REST API request. 108 109 Args: 110 method: REST method (e.g., POST or PUT). 111 url: url of the api. 112 headers: request headers. 113 basic_auth_dict: basic http authentication details 114 (e.g., {"username": "x", "password": "y"}). 115 json: json payload to send in the request. 116 files: files payload to send in the request. 117 sleep_seconds: for how many seconds to sleep to avoid error 429. 118 119 Returns: 120 response from the HTTP request. 121 """ 122 basic_auth: requests.auth.HTTPBasicAuth = None 123 if basic_auth_dict: 124 basic_auth = get_basic_auth( 125 basic_auth_dict["username"], basic_auth_dict["password"] 126 ) 127 128 return get_configured_session(sleep_seconds=sleep_seconds).request( 129 method=method, 130 url=url, 131 headers=headers, 132 auth=basic_auth, 133 json=json, 134 files=files, 135 )
Execute a REST API request.
Arguments:
- method: REST method (e.g., POST or PUT).
- url: url of the api.
- headers: request headers.
- basic_auth_dict: basic http authentication details (e.g., {"username": "x", "password": "y"}).
- json: json payload to send in the request.
- files: files payload to send in the request.
- sleep_seconds: for how many seconds to sleep to avoid error 429.
Returns:
response from the HTTP request.