Source code for auth0.v3.management.rest

import base64
import json
import platform
import sys
import requests

from ..exceptions import Auth0Error, RateLimitError
from time import sleep
from random import randint

UNKNOWN_ERROR = 'a0.sdk.internal.unknown'

[docs]class RestClientOptions(object): """Configuration object for RestClient. Used for configuring additional RestClient options, such as rate-limit retries. Args: telemetry (bool, optional): Enable or disable Telemetry (defaults to True) timeout (float or tuple, optional): Change the requests connect and read timeout. Pass a tuple to specify both values separately or a float to set both to it. (defaults to 5.0 for both) retries (integer): In the event an API request returns a 429 response header (indicating rate-limit has been hit), the RestClient will retry the request this many times using an exponential backoff strategy, before raising a RateLimitError exception. 10 retries max. (defaults to 3) """ def __init__(self, telemetry=None, timeout=None, retries=None): self.telemetry = True self.timeout = 5.0 self.retries = 3 if telemetry is not None: self.telemetry = telemetry if timeout is not None: self.timeout = timeout if retries is not None: self.retries = retries
[docs]class RestClient(object): """Provides simple methods for handling all RESTful api endpoints. Args: telemetry (bool, optional): Enable or disable Telemetry (defaults to True) timeout (float or tuple, optional): Change the requests connect and read timeout. Pass a tuple to specify both values separately or a float to set both to it. (defaults to 5.0 for both) options (RestClientOptions): Pass an instance of RestClientOptions to configure additional RestClient options, such as rate-limit retries. Overrides matching options passed to the constructor. (defaults to 3) """ def __init__(self, jwt, telemetry=True, timeout=5.0, options=None): if options is None: options = RestClientOptions(telemetry=telemetry, timeout=timeout) self.options = options self.jwt = jwt self._metrics = {'retries': 0, 'backoff': []} self._skip_sleep = False self.base_headers = { 'Authorization': 'Bearer {}'.format(self.jwt), 'Content-Type': 'application/json', } if options.telemetry: py_version = platform.python_version() version = sys.modules['auth0'].__version__ auth0_client = json.dumps({ 'name': 'auth0-python', 'version': version, 'env': { 'python': py_version, } }).encode('utf-8') self.base_headers.update({ 'User-Agent': 'Python/{}'.format(py_version), 'Auth0-Client': base64.b64encode(auth0_client), }) # For backwards compatibility reasons only # TODO: Deprecate in the next major so we can prune these arguments. Guidance should be to use RestClient.options.* self.telemetry = options.telemetry self.timeout = options.timeout # Returns a hard cap for the maximum number of retries allowed (10)
[docs] def MAX_REQUEST_RETRIES(self): return 10
# Returns the maximum amount of jitter to introduce in milliseconds (100ms)
[docs] def MAX_REQUEST_RETRY_JITTER(self): return 100
# Returns the maximum delay window allowed (1000ms)
[docs] def MAX_REQUEST_RETRY_DELAY(self): return 1000
# Returns the minimum delay window allowed (100ms)
[docs] def MIN_REQUEST_RETRY_DELAY(self): return 100
[docs] def get(self, url, params=None): headers = self.base_headers.copy() # Track the API request attempt number attempt = 0 # Reset the metrics tracker self._metrics = {'retries': 0, 'backoff': []} # Cap the maximum number of retries to 10 or fewer. Floor the retries at 0. retries = min(self.MAX_REQUEST_RETRIES(), max(0, self.options.retries)) while True: # Increment attempt number attempt += 1 # Issue the request response = requests.get(url, params=params, headers=headers, timeout=self.options.timeout); # If the response did not have a 429 header, or the retries were configured at 0, or the attempt number is equal to or greater than the configured retries, break if response.status_code != 429 or retries <= 0 or attempt > retries: break # Retry the request. Apply a exponential backoff for subsequent attempts, using this formula: # max(MIN_REQUEST_RETRY_DELAY, min(MAX_REQUEST_RETRY_DELAY, (100ms * (2 ** attempt - 1)) + random_between(1, MAX_REQUEST_RETRY_JITTER))) # Increases base delay by (100ms * (2 ** attempt - 1)) wait = 100 * 2 ** (attempt - 1) # Introduces jitter to the base delay; increases delay between 1ms to MAX_REQUEST_RETRY_JITTER (100ms) wait += randint(1, self.MAX_REQUEST_RETRY_JITTER()) # Is never more than MAX_REQUEST_RETRY_DELAY (1s) wait = min(self.MAX_REQUEST_RETRY_DELAY(), wait) # Is never less than MIN_REQUEST_RETRY_DELAY (100ms) wait = max(self.MIN_REQUEST_RETRY_DELAY(), wait) self._metrics['retries'] = attempt self._metrics['backoff'].append(wait) # Skip calling sleep() when running unit tests if self._skip_sleep is False: # sleep() functions in seconds, so convert the milliseconds formula above accordingly sleep(wait / 1000) # Return the final Response return self._process_response(response)
[docs] def post(self, url, data=None): headers = self.base_headers.copy() response = requests.post(url, json=data, headers=headers, timeout=self.options.timeout) return self._process_response(response)
[docs] def file_post(self, url, data=None, files=None): headers = self.base_headers.copy() headers.pop('Content-Type', None) response = requests.post(url, data=data, files=files, headers=headers, timeout=self.options.timeout) return self._process_response(response)
[docs] def patch(self, url, data=None): headers = self.base_headers.copy() response = requests.patch(url, json=data, headers=headers, timeout=self.options.timeout) return self._process_response(response)
[docs] def put(self, url, data=None): headers = self.base_headers.copy() response = requests.put(url, json=data, headers=headers, timeout=self.options.timeout) return self._process_response(response)
[docs] def delete(self, url, params=None, data=None): headers = self.base_headers.copy() response = requests.delete(url, headers=headers, params=params or {}, json=data, timeout=self.options.timeout) return self._process_response(response)
def _process_response(self, response): return self._parse(response).content() def _parse(self, response): if not response.text: return EmptyResponse(response.status_code) try: return JsonResponse(response) except ValueError: return PlainResponse(response)
[docs]class Response(object): def __init__(self, status_code, content, headers): self._status_code = status_code self._content = content self._headers = headers
[docs] def content(self): if self._is_error(): if self._status_code == 429: reset_at = int(self._headers.get('x-ratelimit-reset', '-1')) raise RateLimitError(error_code=self._error_code(), message=self._error_message(), reset_at=reset_at) raise Auth0Error(status_code=self._status_code, error_code=self._error_code(), message=self._error_message()) else: return self._content
def _is_error(self): return self._status_code is None or self._status_code >= 400 # Adding these methods to force implementation in subclasses because they are references in this parent class def _error_code(self): raise NotImplementedError def _error_message(self): raise NotImplementedError
[docs]class JsonResponse(Response): def __init__(self, response): content = json.loads(response.text) super(JsonResponse, self).__init__(response.status_code, content, response.headers) def _error_code(self): if 'errorCode' in self._content: return self._content.get('errorCode') elif 'error' in self._content: return self._content.get('error') else: return UNKNOWN_ERROR def _error_message(self): message = self._content.get('message', '') if message is not None and message != '': return message return self._content.get('error', '')
[docs]class PlainResponse(Response): def __init__(self, response): super(PlainResponse, self).__init__(response.status_code, response.text, response.headers) def _error_code(self): return UNKNOWN_ERROR def _error_message(self): return self._content
[docs]class EmptyResponse(Response): def __init__(self, status_code): super(EmptyResponse, self).__init__(status_code, '', {}) def _error_code(self): return UNKNOWN_ERROR def _error_message(self): return ''