ba-thesis/app/withings/api.py

117 lines
4.1 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta
2023-07-28 17:23:49 +01:00
from random import randint
import requests
import pytz
2023-07-28 17:23:49 +01:00
from django.conf import settings
2023-07-28 22:27:12 +01:00
from django.utils import timezone
from django.contrib.auth.models import User
2023-07-28 17:23:49 +01:00
from urllib.parse import urlencode
from medwings import models as mm
2023-07-29 02:41:45 +01:00
def fetch_initial_tokens(authorization_code, redirect_uri):
data = {
2023-07-28 17:23:49 +01:00
'action': 'requesttoken',
'client_id': settings.WITHINGS_CONFIG['CLIENT_ID'],
'client_secret': settings.WITHINGS_CONFIG['CLIENT_SECRET'],
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': redirect_uri
}
2023-07-29 02:41:45 +01:00
response = requests.post(
url=settings.WITHINGS_CONFIG['ENDPOINT_URL_OAUTH2'],
json=data
)
if response is not None:
response.raise_for_status()
2023-07-28 17:23:49 +01:00
return response.json()
2023-07-29 02:41:45 +01:00
def mock_fetch_initial_tokens(authorization_code, redirect_uri):
2023-07-28 17:23:49 +01:00
response = {
"status": 0,
"body": {
"userid": f"{randint(1, 5000)}",
"access_token": "a075f8c14fb8df40b08ebc8508533dc332a6910a",
"refresh_token": "f631236f02b991810feb774765b6ae8e6c6839ca",
"expires_in": 10800,
"scope": "user.info,user.metrics",
"csrf_token": "PACnnxwHTaBQOzF7bQqwFUUotIuvtzSM",
"token_type": "Bearer"
}
}
return response
def save_tokens_to_session(request, response_data):
request.session['withings_userid'] = response_data['body']['userid']
request.session['withings_access_token'] = response_data['body']['access_token']
request.session['withings_refresh_token'] = response_data['body']['refresh_token']
2023-07-28 22:27:12 +01:00
now = timezone.now()
request.session['withings_access_token_expiry'] = (now + timedelta(seconds=response_data['body']['expires_in'])).isoformat()
request.session['withings_refresh_token_expiry'] = (now + timedelta(days=365)).isoformat()
def parse_getmeas_response(response_body: dict, user: User) -> list:
body = response_body['body']
records = []
timezone = pytz.timezone(body['timezone'])
for measure_group in body['measuregrps']:
recorded = timezone.localize(datetime.fromtimestamp(measure_group['date']))
blood_pressure_systolic_value = None
blood_pressure_diastolic_value = None
body_temperature_value = None
heart_rate_value = None
spo2_level_value = None
for measure in measure_group['measures']:
measure_type = measure['type']
measure_value = measure['value']
measure_unit = measure['unit']
measure_value_adjusted = measure_value * (10 ** measure_unit)
if measure_type == 9:
blood_pressure_diastolic_value = measure_value_adjusted
elif measure_type == 10:
blood_pressure_systolic_value = measure_value_adjusted
elif measure_type == 11:
heart_rate_value = measure_value_adjusted
elif measure_type == 54:
spo2_level_value = measure_value_adjusted
elif measure_type == 71:
body_temperature_value = measure_value_adjusted
if blood_pressure_systolic_value and blood_pressure_diastolic_value:
records.append(mm.BloodPressureRecord(
user=user,
recorded=recorded,
value_systolic_mmhg=blood_pressure_systolic_value,
value_diastolic_mmhg=blood_pressure_diastolic_value
))
if body_temperature_value:
records.append(mm.BodyTempRecord(
user=user,
recorded=recorded,
value_celsius=body_temperature_value
))
if heart_rate_value:
records.append(mm.HeartRateRecord(
user=user,
recorded=recorded,
value_bpm=heart_rate_value
))
if spo2_level_value:
records.append(mm.Spo2LevelRecord(
user=user,
recorded=recorded,
value_percent=spo2_level_value
))
return records