"""This module handles CRUD operations for users in the database, based on pydanctic schemas.""" from datetime import datetime from sqlalchemy.orm import Session from todo.models import users as usermodel from todo.schemas import users as userschema from todo.models.common import SortOrder from todo.utils.exceptions import NotFoundException, InvalidFilterParameterException def hash_password(password: str) -> str: """This is a placeholder for a secure password hashing algorithm. It will convert a plaintext password into a secure, salted hash, for storage in the database. """ # TODO actually hash the password! return password def create_user(db: Session, user: userschema.UserCreate) -> userschema.User: """Creates the specified user in the database.""" db_user = usermodel.User( email=user.email, first_name=user.first_name, last_name=user.last_name, password=hash_password(user.password), ) db.add(db_user) db.commit() db.refresh(db_user) return userschema.User.from_orm(db_user) def read_user(db: Session, id: int) -> userschema.User: """Queries the db for a user with the specified id and returns them.""" db_user = db.query(usermodel.User).filter(usermodel.User.id == id).first() if not db_user: raise NotFoundException(f"User with id '{id}' not found.") return userschema.User.from_orm(db_user) def read_user_by_email(db: Session, email: str) -> userschema.User: """Queries the db for a user with the specified email and returns them.""" db_user = db.query(usermodel.User).filter(usermodel.User.email == email).first() if not db_user: raise NotFoundException(f"User with email '{email}' not found.") return userschema.User.from_orm(db_user) def read_users( db: Session, skip: int = 0, limit: int = 100, sortby: usermodel.SortableUserField = usermodel.SortableUserField('id'), sortorder: SortOrder = SortOrder['asc'], ) -> list[userschema.User]: """Returns an range of users from the database.""" for parameter in [skip, limit]: if not isinstance(parameter, int): raise InvalidFilterParameterException(f"Parameter '{parameter}' must be an integer.") if parameter < 0: raise InvalidFilterParameterException(f"Parameter '{parameter}' cannot be smaller than zero.") db_users = db.query(usermodel.User).order_by(sortorder.call(sortby.field)).offset(skip).limit(limit).all() return [userschema.User.from_orm(db_user) for db_user in db_users] def read_users_count(db: Session) -> int: """Returns the total number of users currently the database.""" return db.query(usermodel.User).count() def update_user(db: Session, user: userschema.UserUpdate, id: int) -> userschema.User: """Updates the user with the provided id with all non-None fields from the input user.""" db_user = db.query(usermodel.User).filter(usermodel.User.id == id).first() if not db_user: raise NotFoundException(f"User with id '{id}' not found.") for key in ['email', 'first_name', 'last_name']: value = getattr(user, key) if value is not None: setattr(db_user, key, value) if user.password is not None: setattr(db_user, "password", hash_password(user.password)) db.commit() db.refresh(db_user) return userschema.User.from_orm(db_user) def delete_user(db: Session, id: int) -> userschema.User: """Deletes the user with the provided id from the db.""" db_user = db.query(usermodel.User).filter(usermodel.User.id == id).first() if not db_user: raise NotFoundException(f"User with id '{id}' not found.") user_copy = userschema.User.from_orm(db_user) db.delete(db_user) db.commit() user_copy.updated = datetime.now(user_copy.updated.tzinfo) return user_copy