feat: implement token refresh

This commit is contained in:
Julian Lobbes 2023-07-29 03:41:45 +02:00
parent b43a2ffbdc
commit 1792f3346e
4 changed files with 69 additions and 28 deletions

View File

@ -1,5 +1,6 @@
from urllib.parse import urlencode
from uuid import uuid4
import logging
from django.shortcuts import redirect, render
from django.conf import settings
@ -21,6 +22,7 @@ def register_init(request):
raise PermissionDenied('You are already registered and logged in.')
# Generate a unique token and save it for later
request.session.flush()
spoof_protection_token = str(uuid4())
request.session['spoof_protection_token'] = spoof_protection_token
@ -42,23 +44,17 @@ def register_init(request):
def register_continue(request):
if request.user.is_authenticated:
raise PermissionDenied('You are already registered and logged in.')
authorization_code = request.GET.get('code')
authorization_state = request.GET.get('state')
if not authorization_code:
return HttpResponseBadRequest()
if not authorization_state:
return HttpResponseBadRequest()
# TODO enable this when not mocking
# if not request.session.get('spoof_protection_token', None) == authorization_state:
# return HttpResponseBadRequest()
# Fetch access and refresh tokens and save them to session storage
redirect_uri = request.build_absolute_uri(reverse('register-continue'))
# DEBUG use an API mock
response_data = withings.api.mock_fetch_withings_tokens(authorization_code, redirect_uri)
if response_data['status'] != 0:
if not request.session.get('spoof_protection_token', None) == authorization_state:
return HttpResponseBadRequest()
withings.api.save_tokens_to_session(request, response_data)
if request.method == 'POST':
user_form = UserCreationForm(request.POST)
@ -69,6 +65,13 @@ def register_continue(request):
profile = profile_form.save(commit=False)
profile.user = user
# Fetch access and refresh tokens and save them to session storage
redirect_uri = request.build_absolute_uri(reverse('register-continue'))
response_data = withings.api.fetch_initial_tokens(authorization_code, redirect_uri)
if response_data['status'] != 0:
return HttpResponseBadRequest()
withings.api.save_tokens_to_session(request, response_data)
user_password = request.POST.get('password1')
gotify_user_info = gotify.api.create_user(user.username, user_password)
gotify_app_info = gotify.api.create_application(user.username, user_password)
@ -103,6 +106,7 @@ def register_continue(request):
withings_api_account, withings_access_token, withings_refresh_token
]:
instance.save()
request.session.flush()
# TODO sync withings health data
# TODO redirect user to some other page and ask them to log in
@ -121,6 +125,9 @@ def register_continue(request):
def register_finalize(request):
if request.user.is_authenticated:
raise PermissionDenied('You are already registered and logged in.')
# TODO implement
return render(request, 'authentication/register-finalize.html')

View File

@ -127,7 +127,8 @@ DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
WITHINGS_CONFIG = {
'CLIENT_ID': getenv('WITHINGS_CLIENT_ID'),
'CLIENT_SECRET': getenv('WITHINGS_CLIENT_SECRET')
'CLIENT_SECRET': getenv('WITHINGS_CLIENT_SECRET'),
'ENDPOINT_URL_OAUTH2': 'https://wbsapi.withings.net/v2/oauth2'
}
GOTIFY_CONFIG = {
'USERNAME': getenv('GOTIFY_USER'),

View File

@ -6,9 +6,8 @@ from django.conf import settings
from django.utils import timezone
from urllib.parse import urlencode
def fetch_withings_tokens(authorization_code, redirect_uri):
token_url_base = "https://wbsapi.withings.net/v2/oauth2"
token_url_params = {
def fetch_initial_tokens(authorization_code, redirect_uri):
data = {
'action': 'requesttoken',
'client_id': settings.WITHINGS_CONFIG['CLIENT_ID'],
'client_secret': settings.WITHINGS_CONFIG['CLIENT_SECRET'],
@ -16,14 +15,17 @@ def fetch_withings_tokens(authorization_code, redirect_uri):
'code': authorization_code,
'redirect_uri': redirect_uri
}
token_url = f"{token_url_base}?{urlencode(token_url_params)}"
response = requests.get(token_url)
response.raise_for_status()
response = requests.post(
url=settings.WITHINGS_CONFIG['ENDPOINT_URL_OAUTH2'],
json=data
)
if response is not None:
response.raise_for_status()
return response.json()
def mock_fetch_withings_tokens(authorization_code, redirect_uri):
def mock_fetch_initial_tokens(authorization_code, redirect_uri):
response = {
"status": 0,
"body": {

View File

@ -1,20 +1,51 @@
from django.db import models
from datetime import timedelta
import logging
from django.db import models
from django.contrib.auth.models import User
from django.conf import settings
from django.utils import timezone
import requests
class AccessToken(models.Model):
account = models.OneToOneField("ApiAccount", on_delete=models.CASCADE, primary_key=True)
value = models.CharField(max_length=256, verbose_name="Withings API Access Token")
expires = models.DateTimeField(verbose_name="Time of expiration")
class RefreshToken(models.Model):
account = models.OneToOneField("ApiAccount", on_delete=models.CASCADE, primary_key=True)
value = models.CharField(max_length=256, verbose_name="Withings API Refresh Token")
expires = models.DateTimeField(verbose_name="Time of expiration")
class ApiAccount(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE, primary_key=True)
userid = models.PositiveIntegerField(verbose_name="Withings API User ID")
def refresh_tokens(self):
data = {
'action': 'requesttoken',
'client_id': settings.WITHINGS_CONFIG['CLIENT_ID'],
'client_secret': settings.WITHINGS_CONFIG['CLIENT_SECRET'],
'grant_type': 'refresh_token',
'refresh_token': self.refreshtoken.value
}
response = requests.post(
url=settings.WITHINGS_CONFIG['ENDPOINT_URL_OAUTH2'],
json=data
)
class AccessToken(models.Model):
account = models.OneToOneField(ApiAccount, on_delete=models.CASCADE, primary_key=True)
value = models.CharField(max_length=256, verbose_name="Withings API Access Token")
expires = models.DateTimeField(verbose_name="Time of expiration")
if response is not None:
response.raise_for_status()
response_data = response.json()
class RefreshToken(models.Model):
account = models.OneToOneField(ApiAccount, on_delete=models.CASCADE, primary_key=True)
value = models.CharField(max_length=256, verbose_name="Withings API Refresh Token")
expires = models.DateTimeField(verbose_name="Time of expiration")
now = timezone.now()
self.accesstoken.value = response_data['body']['access_token']
self.accesstoken.expires = now + timedelta(seconds=response_data['body']['expires_in'])
self.refreshtoken.value = response_data['body']['refresh_token']
self.refreshtoken.expires = now + timedelta(days=365)
self.accesstoken.save()
self.refreshtoken.save()