111 lines
3.6 KiB
Python
111 lines
3.6 KiB
Python
"""This module handles CRUD operations for users in the database, based on pydanctic schemas."""
|
|
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from todo.models import users as usermodel
|
|
from todo.schemas import users as userschema
|
|
from todo.utils.exceptions import NotFoundException, InvalidFilterParameterException
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""This is a placeholder for a secure password hashing algorithm.
|
|
|
|
It will convert a plaintext password into a secure, salted hash, for storage
|
|
in the database.
|
|
"""
|
|
|
|
# TODO actually hash the password!
|
|
return password
|
|
|
|
|
|
def create_user(db: Session, user: userschema.UserCreate) -> userschema.User:
|
|
"""Creates the specified user in the database."""
|
|
|
|
db_user = usermodel.User(
|
|
email=user.email,
|
|
first_name=user.first_name,
|
|
last_name=user.last_name,
|
|
password=hash_password(user.password),
|
|
)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return userschema.User.from_orm(db_user)
|
|
|
|
|
|
def read_user(db: Session, id: int) -> userschema.User:
|
|
"""Queries the db for a user with the specified id and returns them."""
|
|
|
|
db_user = db.query(usermodel.User).filter(usermodel.User.id == id).first()
|
|
if not db_user:
|
|
raise NotFoundException(f"User with id '{id}' not found.")
|
|
|
|
return userschema.User.from_orm(db_user)
|
|
|
|
|
|
def read_user_by_email(db: Session, email: str) -> userschema.User:
|
|
"""Queries the db for a user with the specified email and returns them."""
|
|
|
|
db_user = db.query(usermodel.User).filter(usermodel.User.email == email).first()
|
|
if not db_user:
|
|
raise NotFoundException(f"User with email '{email}' not found.")
|
|
|
|
return userschema.User.from_orm(db_user)
|
|
|
|
|
|
def read_users(db: Session, skip: int = 0, limit: int = 100) -> list[userschema.User]:
|
|
"""Returns an range of users from the database."""
|
|
|
|
for parameter in [skip, limit]:
|
|
if not isinstance(parameter, int):
|
|
raise InvalidFilterParameterException(f"Parameter '{parameter}' must be an integer.")
|
|
if parameter < 0:
|
|
raise InvalidFilterParameterException(f"Parameter '{parameter}' cannot be smaller than zero.")
|
|
|
|
db_users = db.query(usermodel.User).offset(skip).limit(limit).all()
|
|
|
|
return [userschema.User.from_orm(db_user) for db_user in db_users]
|
|
|
|
|
|
def read_users_count(db: Session) -> int:
|
|
"""Returns the total number of users currently the database."""
|
|
|
|
return db.query(usermodel.User).count()
|
|
|
|
|
|
def update_user(db: Session, user: userschema.UserUpdate, id: int) -> userschema.User:
|
|
"""Updates the user with the provided id with all non-None fields from the input user."""
|
|
|
|
db_user = db.query(usermodel.User).filter(usermodel.User.id == id).first()
|
|
if not db_user:
|
|
raise NotFoundException(f"User with id '{id}' not found.")
|
|
|
|
for key in ['email', 'first_name', 'last_name']:
|
|
value = getattr(user, key)
|
|
if value is not None:
|
|
setattr(db_user, key, value)
|
|
if user.password is not None:
|
|
setattr(db_user, "password", hash_password(user.password))
|
|
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return userschema.User.from_orm(db_user)
|
|
|
|
|
|
def delete_user(db: Session, id: int) -> userschema.User:
|
|
"""Deletes the user with the provided id from the db."""
|
|
|
|
db_user = db.query(usermodel.User).filter(usermodel.User.id == id).first()
|
|
if not db_user:
|
|
raise NotFoundException(f"User with id '{id}' not found.")
|
|
user_copy = userschema.User.from_orm(db_user)
|
|
|
|
db.delete(db_user)
|
|
db.commit()
|
|
|
|
user_copy.updated = datetime.now(user_copy.updated.tzinfo)
|
|
return user_copy
|