HTTP Client¶
Uvicore comes with the async aiohttp
HTTP client so you can talk to other API's
Note
Uvicore also comes with httpx
, another async HTTP client that is used during pytests. However uvicore does not provide a standard way to access it like it does for aiohttp
. So you are on your own there :)
Dependencies¶
The HTTP client is available to all levels of Uvicore, regardless of the options you chose when running the Uvicore Installer
You will notice in your your config/dependencies.py
file that HTTP client is enabled by default
dependencies = OrderedDict({
# ...
# HTTP async client based on aiohttp.
'uvicore.http_client': {
'provider': 'uvicore.http_client.package.provider.HttpClient',
},
# ...
Basic Usage¶
Basic usage is to make the HTTP Client form the IoC and use it like so
import uvicore
from uvicore.support.dumper import dump, dd
async def some_async_method():
http = uvicore.ioc.make('aiohttp')
url = 'https://petstore3.swagger.io/api/v3/pet/findByStatus?status=available'
async with http.get(url) as r:
if r.status == 200:
dump(r)
dump(await r.text())
dump(await r.json())
Example Basic Auth GET Helper¶
import json
import uvicore
import aiohttp
async def http_get(url: str, *, json: bool = True, basic_auth: str = None) -> tuple:
"""HTTP GET call using AIOHTTP"""
# Async HTTP Client (aiohttp)
http = uvicore.ioc.make('aiohttp')
# Optional Basic Auth
auth = None
if basic_auth:
(user, password) = basic_auth.split(':')
auth = aiohttp.BasicAuth(user, password)
# Make the call
code = 404
results = ""
async with http.get(url, auth=auth) as response:
code = response.status
if json:
# json() will fail if the content type is not json
# Some APIs return JSON strings but with content type plain/text
# So on failure, try json.loads
try:
results = await response.json()
except:
results = json_loads(await response.text())
else:
results = await response.text()
return (code, results)
Example Simple SugarCRM Client¶
Just a basic example of building your own SugarCRM client class to login and interact with the Sugar API. This example shows GET
method only, but you "get" the point :) This code saves the login token to a temp file to re-use it. If the token expires, the GET
request will attempt to login once before failing.
import uvicore
@uvicore.service('acme.wiki.sugar.client.SugarClient', aliases=['sugar-client'], singleton=True)
class SugarClient:
"""Generic Async AIOHTTP Rest client for Sugar"""
@property
def config(self):
# Your config/package.py file might have configs like this
# 'sugar': {
# 'url': env('SUGAR_URL', 'https://crm.example.com/rest/v10'),
# 'client': env('SUGAR_CLIENT', 'my-python-client'),
# 'secret': env('SUGAR_SECRET', 'xyz'),
# 'platform': env('SUGAR_PLATFORM', 'base'),
# 'username': env('SUGAR_USERNAME', 'me@example.com'),
# 'password': env('SUGAR_PASSWORD', 'xyz'),
# },
return uvicore.config.acme.wiki.sugar
@property
def http(self):
return uvicore.ioc.make('aiohttp')
async def login(self, refresh = False):
"""Login to Sugar /oauth2/token and get an access token"""
# Payload for /oauth2/token
url = self.config.url
payload = {
'grant_type': 'password',
'client_id': self.config.client,
'client_secret': self.config.secret,
'username': self.config.username,
'password': self.config.password,
'platform': self.config.platform,
}
headers = {
'Content-Type': 'application/json'
}
# Cache login token in a tmp file by payload hash
token_file = '/tmp/sugar.token.' + hash.md5(payload)
# If force re-login, remove token_file
if refresh: os.remove(token_file)
# If token_file exists, use that token instead of hitting Sugar again
if os.path.exists(token_file):
f = open(token_file, 'r')
token = str(f.readline())
f.close()
return token;
# Login to sugar via /oauth2/token
async with self.http.post(f"{url}/oauth2/token", json=payload, headers=headers) as r:
if r.status == 200:
json_response = await r.json()
token = json_response['access_token']
# Cache token in token_file
f = open(token_file, 'w')
f.write(token)
f.close()
# Return token
return token
async def get_enum(self, module, field):
"""HTTP GET a modules fields enum values"""
return await self._http_get(f"{self.config.url}/{module}/enum/{field}")
async def get(self, module,
*,
fields: List = None,
filter: Dict = None,
max_num: int = 25,
) -> dict:
"""HTTP GET a module with parameters from Sugar API"""
# Build API Payload
payload = {}
if fields: payload['fields'] = fields
if filter: payload['filter'] = filter
if max_num: payload['max_num'] = max_num
# HTTP GET
return await self._http_get(f"{self.config.url}/{module}", payload)
async def _http_get(self, url, payload = None):
# Get sugar oauth2 token
token = await self.login()
# Craft URL and headers
headers = {
'Content-Type': 'application/json',
'oauth-token': token
}
# HTTP GET Sugar API, retry login on 400 failure
async with self.http.get(f"{url}", json=payload, headers=headers) as r:
if r.status == 200:
return await r.json()
elif r.status == 401 or r.status == 400:
# Relogin and try again, token may have expired
token = await self.login(True)
headers = {
'Content-Type': 'application/json',
'oauth-token': token
}
async with self.http.get(f"{url}/{module}", json=payload, headers=headers) as r2:
if r2.status == 200:
return await r2.json()
else:
raise Exception(await r2.text() + ' - Status ' + str(r.status))
else:
raise Exception(await r.text() + ' - Status ' + str(r.status))
To use this SugarClient
import uvicore
from acme.wiki.sugar.client import SugarClient
# or SugarClient = uvicore.ioc.make('sugar-client')
accounts = (await SugarClient.get(
module='Accounts',
max_num=999,
fields=['id', 'name']
))['records']
Example FusionAuth Client¶
import uvicore
from uvicore.configuration import env
from uvicore.support.dumper import dump, dd
from uvicore.typing import Dict
from uvicore.exceptions import SmartException
@uvicore.service('mreschke.fusionauth.client.Client',
aliases=['fusionauth'],
singleton=True
)
class Client:
"""Generic Async AIOHTTP Rest client for FusionAuth"""
@property
def config(self):
return uvicore.config.mreschke.fusionauth
@property
def allowed_tenants(self):
return [k for (k,v) in self.config.tenant_keys.items() if v]
async def verify_tenant(self, tenant: str = None):
if tenant is None: tenant = self.config.default_tenant or 'default'
if tenant.lower() not in self.allowed_tenants:
await self.not_found('Invalid tenant. Must be one of {}'.format(str(self.allowed_tenants)))
return tenant.lower()
async def api_key(self, tenant: str, master_key: bool = False) -> str:
if master_key:
key = self.config.master_key
else:
key: str = self.config.tenant_keys[tenant]
if key: return key
await self.not_found('API key not found in config for tenant {}'.format(tenant))
async def url(self, path: str) -> str:
auth_url = self.config.url
if auth_url[-1] == '/': auth_url = auth_url[0:-1] # Remove trailing / from base
if path[0] == '/': path = path[1:] # Remove leading / from path
if auth_url: return auth_url + '/' + path
await self.not_found('FusionAuth URL not found in config')
async def get(self, path: str, tenant: str = None, master_key: bool = False):
# Get aiohttp client session from IoC singleton
http = uvicore.ioc.make('aiohttp')
# Get proper API key
key = await self.api_key(tenant, master_key)
# Get full URL
url = await self.url(path)
# Async aiohttp GET
async with http.get(url, headers={'Authorization': key}) as r:
#dump(r)
if r.status == 200:
return await r.json()
try:
detail = await r.json()
except Exception as e:
detail = await r.text()
await self.exception(detail or str(r.status) or 'Unknown error', status_code=r.status)
async def post(self, path: str, tenant: str = None, master_key: bool = False, json: Dict = None):
# Get aiohttp client session from IoC singleton
http = uvicore.ioc.make('aiohttp')
# Get proper API key
key = await self.api_key(tenant, master_key)
# Get full URL
url = await self.url(path)
async with http.post(url, json=json, headers={'Authorization': key}) as r:
#dump(r)
if r.status == 200:
return await r.json()
try:
#dump('x')
#detail='x'
detail = await r.json()
except Exception as e:
detail = await r.text()
await self.exception(detail or 'Not Found', status_code=r.status)
async def put(self, path: str, tenant: str = None, master_key: bool = False, json: Dict = None):
# Get aiohttp client session from IoC singleton
http = uvicore.ioc.make('aiohttp')
# Get proper API key
key = await self.api_key(tenant, master_key)
# Get full URL
url = await self.url(path)
async with http.put(url, json=json, headers={'Authorization': key}) as r:
#dump(r)
if r.status == 200:
return await r.json()
try:
#dump('x')
#detail='x'
detail = await r.json()
except Exception as e:
detail = await r.text()
await self.exception(detail or 'Not Found', status_code=r.status)
async def patch(self, path: str, tenant: str = None, master_key: bool = False, json: Dict = None):
# Get aiohttp client session from IoC singleton
http = uvicore.ioc.make('aiohttp')
# Get proper API key
key = await self.api_key(tenant, master_key)
# Get full URL
url = await self.url(path)
async with http.patch(url, json=json, headers={'Authorization': key}) as r:
#dump(r)
if r.status == 200:
return await r.json()
try:
#dump('x')
#detail='x'
detail = await r.json()
except Exception as e:
detail = await r.text()
await self.exception(detail or 'Not Found', status_code=r.status)
async def delete(self, path: str, tenant: str = None, master_key: bool = False):
# Get aiohttp client session from IoC singleton
http = uvicore.ioc.make('aiohttp')
# Get proper API key
key = await self.api_key(tenant, master_key)
# Get full URL
url = await self.url(path)
async with http.delete(url, headers={'Authorization': key}) as r:
if r.status == 200:
return await r.text()
#return await r.json()
try:
#dump('x')
#detail='x'
detail = await r.json()
except Exception as e:
detail = await r.text()
await self.exception(detail or 'Not Found', status_code=r.status)
async def not_found(self, message: str):
if uvicore.app.is_http:
from uvicore.http.exceptions import NotFound
raise NotFound(message)
else:
await uvicore.ioc.make('aiohttp').close()
raise Exception(message)
async def exception(self, message: str, *, status_code=503):
raise SmartException(message, status_code)
An Example repository/app.py
that adds a higher level "app" endpoint abstraction
import uvicore
from mreschke.fusionauth.client import Client as fa
from uvicore.support.dumper import dump, dd
from uvicore.typing import Dict, List, Optional
from uvicore.exceptions import SmartException
URL = 'api/application'
async def find(id_or_name: str, tenant: Optional[str] = None) -> Dict:
"""Get one application by ID or name"""
tenant = await fa.verify_tenant(tenant)
# Get application by name
# FusionAuth does not have an endpoint to get by name, so we'll get all and filter
is_guid = len(id_or_name) == 36 and id_or_name.count('-') == 4
if not is_guid:
apps = await list(tenant)
for app in apps:
if app.name == id_or_name:
return app
return None
url = URL + '/' + id_or_name
try:
async def query():
response = await fa.get(url, tenant)
return Dict(response['application'])
return await uvicore.cache.remember(tenant + '/' + url, query)
except SmartException as e:
raise SmartException(e.detail, message='Cannot query ' + url)
async def list(tenant: Optional[str] = None) -> List[Dict]:
"""Get all applications"""
tenant = await fa.verify_tenant(tenant)
url = URL
response = ''
try:
async def query():
response = await fa.get(url, tenant)
if not response: response['applications'] = []
return [Dict(x) for x in response.get('applications')]
return await uvicore.cache.remember(tenant + '/' + url, query)
except SmartException as e:
raise SmartException(e.detail, message='Cannot query ' + url)
async def upsert(params: Dict, tenant: Optional[str] = None) -> None:
tenant = await fa.verify_tenant(tenant)
params = Dict(params)
url = URL + '/' + params.id
role_url = url + '/role'
# Check if app exists by ID
exists = False
try:
existing = await find(params.id)
exists = True
except SmartException as e:
pass
try:
if exists:
# Update existing App
# Updating application.roles does not work. It works for initial POST,
# but not for updates. Instead we need to handle it manually
roles = None
if params.roles:
roles = params.roles
del params.roles;
# Patch the app
await fa.patch(url, tenant, json={'application': params})
# Add roles that do not already exist
for role in roles:
role_exists = False
for existing_role in existing.roles:
if role.lower() == existing_role.name.lower():
role_exists = True
break
if not role_exists:
await fa.post(role_url + '/' + role.id, tenant, json={'role': role})
# Remove roles that should no longer exist
for existing_role in existing.roles:
role_exists = False
for role in roles:
if role.lower() == existing_role.name.lower():
role_exists = True
break
if not role_exists:
await fa.delete(role_url + '/' + existing_role.id, tenant)
else:
# Insert new App
await fa.post(url, tenant, json={'application': params})
except SmartException as e:
raise SmartException(e.detail, message='Cannot query ' + url)
Usage of this app
level endpoint abstraction
from mreschke.fusionauth.repository import app
apps = await app.find('my-app', tenant='my-tenant')