ba-thesis/app/withings/api.py

117 lines
4.1 KiB
Python

from datetime import datetime, timedelta
from random import randint
import requests
import pytz
from django.conf import settings
from django.utils import timezone
from django.contrib.auth.models import User
from urllib.parse import urlencode
from medwings import models as mm
def fetch_initial_tokens(authorization_code, redirect_uri):
data = {
'action': 'requesttoken',
'client_id': settings.WITHINGS_CONFIG['CLIENT_ID'],
'client_secret': settings.WITHINGS_CONFIG['CLIENT_SECRET'],
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': redirect_uri
}
response = requests.post(
url=settings.WITHINGS_CONFIG['ENDPOINT_URL_OAUTH2'],
json=data
)
if response is not None:
response.raise_for_status()
return response.json()
def mock_fetch_initial_tokens(authorization_code, redirect_uri):
response = {
"status": 0,
"body": {
"userid": f"{randint(1, 5000)}",
"access_token": "a075f8c14fb8df40b08ebc8508533dc332a6910a",
"refresh_token": "f631236f02b991810feb774765b6ae8e6c6839ca",
"expires_in": 10800,
"scope": "user.info,user.metrics",
"csrf_token": "PACnnxwHTaBQOzF7bQqwFUUotIuvtzSM",
"token_type": "Bearer"
}
}
return response
def save_tokens_to_session(request, response_data):
request.session['withings_userid'] = response_data['body']['userid']
request.session['withings_access_token'] = response_data['body']['access_token']
request.session['withings_refresh_token'] = response_data['body']['refresh_token']
now = timezone.now()
request.session['withings_access_token_expiry'] = (now + timedelta(seconds=response_data['body']['expires_in'])).isoformat()
request.session['withings_refresh_token_expiry'] = (now + timedelta(days=365)).isoformat()
def parse_getmeas_response(response_body: dict, user: User) -> list:
body = response_body['body']
records = []
timezone = pytz.timezone(body['timezone'])
for measure_group in body['measuregrps']:
recorded = timezone.localize(datetime.fromtimestamp(measure_group['date']))
blood_pressure_systolic_value = None
blood_pressure_diastolic_value = None
body_temperature_value = None
heart_rate_value = None
spo2_level_value = None
for measure in measure_group['measures']:
measure_type = measure['type']
measure_value = measure['value']
measure_unit = measure['unit']
measure_value_adjusted = measure_value * (10 ** measure_unit)
if measure_type == 9:
blood_pressure_diastolic_value = measure_value_adjusted
elif measure_type == 10:
blood_pressure_systolic_value = measure_value_adjusted
elif measure_type == 11:
heart_rate_value = measure_value_adjusted
elif measure_type == 54:
spo2_level_value = measure_value_adjusted
elif measure_type == 71:
body_temperature_value = measure_value_adjusted
if blood_pressure_systolic_value and blood_pressure_diastolic_value:
records.append(mm.BloodPressureRecord(
user=user,
recorded=recorded,
value_systolic_mmhg=blood_pressure_systolic_value,
value_diastolic_mmhg=blood_pressure_diastolic_value
))
if body_temperature_value:
records.append(mm.BodyTempRecord(
user=user,
recorded=recorded,
value_celsius=body_temperature_value
))
if heart_rate_value:
records.append(mm.HeartRateRecord(
user=user,
recorded=recorded,
value_bpm=heart_rate_value
))
if spo2_level_value:
records.append(mm.Spo2LevelRecord(
user=user,
recorded=recorded,
value_percent=spo2_level_value
))
return records