# Creating a new user in the database from math import ceil from sqlalchemy.orm import Session from schemas.user import OutputUser, UserCreate, DriverCreate from db.models.user import User from core.hashing import Hasher from db.models.drivetask import DriveTask def create_new_user(user: UserCreate, db: Session): print("Creating new user" + str(user)) if get_user_by_email(user.Email, db): return "userExists" user_object = User( Email=user.Email, Name=user.Name, MiddleName=user.MiddleName, LastName=user.LastName, GovernmentId=user.GovernmentId, Address=user.Address, ContactNumber=user.ContactNumber, Role=user.Role, HashedPassword=Hasher.get_password_hash(user.Password), ) db.add(user_object) db.commit() db.refresh(user_object) return user_object def create_new_driver(driver: DriverCreate, db: Session): print("Creating new driver" + str(driver)) if get_user_by_email(driver.Email, db): return "userExists" driver_object = User( Email=driver.Email, Name=driver.Name, MiddleName=driver.MiddleName, LastName=driver.LastName, GovernmentId=driver.GovernmentId, Address=driver.Address, ContactNumber=driver.ContactNumber, Role="Driver", HashedPassword=Hasher.get_password_hash(driver.Password), DrivingLicenseNumber=driver.DrivingLicenseNumber, ) db.add(driver_object) db.commit() db.refresh(driver_object) return driver_object def get_user_by_id(user_id: int, role: str, db: Session): print("Retrieving user by id: " + str(user_id)) user = db.query(User).filter(User.Id == user_id).first() if not user: return False if user.Role != role and role != "Any": return False return user def get_user_by_email(email: str, db: Session): print("Retrieving user by email: " + str(email)) user = db.query(User).filter(User.Email == email).first() return user def get_user_by_phone(phone: str, db: Session): print("Retrieving user by phone: " + str(phone)) user = db.query(User).filter(User.ContactNumber == phone).first() return user def verify_driver_exists(driver_id: int, db: Session): print("Verifying driver exists: " + str(driver_id)) driver = db.query(User).filter(User.Id == driver_id).first() if not driver: return False if driver.Role != "Driver": return False return True def get_car_driver(vehicle_id: int, db: Session): print("Retrieving driver for vehicle: " + str(vehicle_id)) driver = db.query(User).filter(User.AssignedVehicle == vehicle_id).first() if not driver: return False return driver def list_users(db: Session, role: str = "Any"): print("Listing users") users = db.query(User).filter((User.Role == role) | (role == "Any")).all() return users def get_users_by_name( db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20 ): print("Listing users by name") if role == "Admin": return None if role is None: users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role != "Admin") else: users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role == role) users = users.offset((page - 1) * per_page).limit(per_page).all() return users def replace_user_data(user_id: int, user_data: UserCreate, db: Session): print("Replacing user data" + str(user_data) + " for user id: " + str(user_id)) user = db.query(User).filter(User.Id == user_id).first() if not user: return "userNotFound" user.Email = user_data.Email user.Name = user_data.Name user.MiddleName = user_data.MiddleName user.LastName = user_data.LastName user.GovernmentId = user_data.GovernmentId user.Address = user_data.Address user.ContactNumber = user_data.ContactNumber user.Role = user_data.Role user.HashedPassword = Hasher.get_password_hash(user_data.Password) db.commit() db.refresh(user) return user def delete_user_data(id: int, db: Session): print("Deleting user by id: " + str(id)) user = db.query(User).filter(User.Id == id).first() if not user: return "userNotFound" drivetasks = db.query(DriveTask).filter(DriveTask.DriverId == id).all() for task in drivetasks: # delete all tasks assigned to this user db.delete(task) db.delete(user) db.commit() return "userDeleted" def user_search_query( db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20 ): if name == "" or name is None: query = db.query(User) else: query = db.query(User).filter(User.Name.like(f"{name}%")) if role is not None and role != "Admin": query = query.filter(User.Role == role) total_users = query.count() total_pages = ceil(total_users / per_page) users = query.offset((page - 1) * per_page).limit(per_page).all() output_users = [ OutputUser( id=user.Id, Name=user.Name, MiddleName=user.MiddleName, LastName=user.LastName, ContactNumber=user.ContactNumber, Address=user.Address, Email=user.Email, Role=user.Role, AssignedVehicle=None, ) for user in users ] return {"users": output_users, "total_pages": total_pages}