# Creating a new user in the database from math import ceil from sqlalchemy.orm import Session from schemas.user import OutputUser, UserCreate, DriverCreate from db.models.user import User from core.hashing import Hasher from db.models.drivetask import DriveTask def create_new_user(user: UserCreate, db: Session): if get_user_by_email(user.Email, db): return "userExists" user_object = User( Email=user.Email, Name=user.Name, MiddleName=user.MiddleName, LastName=user.LastName, GovernmentId=user.GovernmentId, Address=user.Address, ContactNumber=user.ContactNumber, Role=user.Role, HashedPassword=Hasher.get_password_hash(user.Password), ) db.add(user_object) db.commit() db.refresh(user_object) return user_object def create_new_driver(driver: DriverCreate, db: Session): if get_user_by_email(driver.Email, db): return "userExists" driver_object = User( Email=driver.Email, Name=driver.Name, MiddleName=driver.MiddleName, LastName=driver.LastName, GovernmentId=driver.GovernmentId, Address=driver.Address, ContactNumber=driver.ContactNumber, Role="Driver", HashedPassword=Hasher.get_password_hash(driver.Password), DrivingLicenseNumber=driver.DrivingLicenseNumber, ) db.add(driver_object) db.commit() db.refresh(driver_object) return driver_object def get_user_by_id(user_id: int, role: str, db: Session): user = db.query(User).filter(User.Id == user_id).first() if not user: return False if user.Role != role and role != "Any": return False return user def get_user_by_email(email: str, db: Session): user = db.query(User).filter(User.Email == email).first() return user def get_user_by_phone(phone: str, db: Session): user = db.query(User).filter(User.ContactNumber == phone).first() return user def verify_driver_exists(driver_id: int, db: Session): driver = db.query(User).filter(User.Id == driver_id).first() if not driver: return False if driver.Role != "Driver": return False return True def get_car_driver(vehicle_id: int, db: Session): driver = db.query(User).filter(User.AssignedVehicle == vehicle_id).first() if not driver: return False return driver def list_users(db: Session, role: str = "Any"): users = db.query(User).filter((User.Role == role) | (role == "Any")).all() return users def get_users_by_name( db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20 ): if role == "Admin": return None if role is None: users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role != "Admin") else: users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role == role) users = users.offset((page - 1) * per_page).limit(per_page).all() return users def replace_user_data(user_id: int, user_data: UserCreate, db: Session): user = db.query(User).filter(User.Id == user_id).first() if not user: return "userNotFound" user.Email = user_data.Email user.Name = user_data.Name user.MiddleName = user_data.MiddleName user.LastName = user_data.LastName user.GovernmentId = user_data.GovernmentId user.Address = user_data.Address user.ContactNumber = user_data.ContactNumber user.Role = user_data.Role user.HashedPassword = Hasher.get_password_hash(user_data.Password) db.commit() db.refresh(user) return user def delete_user_data(id: int, db: Session): user = db.query(User).filter(User.Id == id).first() if not user: return "userNotFound" drivetasks = db.query(DriveTask).filter(DriveTask.DriverId == id).all() for task in drivetasks: # delete all tasks assigned to this user db.delete(task) db.delete(user) db.commit() return "userDeleted" def user_search_query( db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20 ): if name == "" or name is None: query = db.query(User) else: query = db.query(User).filter(User.Name.like(f"{name}%")) if role is not None and role != "Admin": query = query.filter(User.Role == role) total_users = query.count() total_pages = ceil(total_users / per_page) users = query.offset((page - 1) * per_page).limit(per_page).all() output_users = [ OutputUser( id=user.Id, Name=user.Name, MiddleName=user.MiddleName, LastName=user.LastName, ContactNumber=user.ContactNumber, Address=user.Address, Email=user.Email, Role=user.Role, AssignedVehicle=None, ) for user in users ] return {"users": output_users, "total_pages": total_pages}