# Creating a new user in the database from sqlalchemy.orm import Session from schemas.user import UserCreate, DriverCreate from db.models.user import User from core.hashing import Hasher from db.models.drivetask import DriveTask def create_new_user(user: UserCreate, db: Session): user_object = User( Email=user.Email, Name=user.Name, MiddleName=user.MiddleName, LastName=user.LastName, GovernmentId=user.GovernmentId, Address=user.Address, ContactNumber=user.ContactNumber, Role=user.Role, HashedPassword=Hasher.get_password_hash(user.Password), ) db.add(user_object) db.commit() db.refresh(user_object) return user_object def create_new_driver(driver: DriverCreate, db: Session): driver_object = User( Email=driver.Email, Name=driver.Name, MiddleName=driver.MiddleName, LastName=driver.LastName, GovernmentId=driver.GovernmentId, Address=driver.Address, ContactNumber=driver.ContactNumber, Role="Driver", HashedPassword=Hasher.get_password_hash(driver.Password), DrivingLicenseNumber=driver.DrivingLicenseNumber, ) db.add(driver_object) db.commit() db.refresh(driver_object) return driver_object def get_user_by_id(user_id: int, role: str, db: Session): user = db.query(User).filter(User.Id == user_id).first() if not user: return False if user.Role != role and role != "Any": return False return user def get_user_by_email(email: str, db: Session): user = db.query(User).filter(User.Email == email).first() return user def get_user_by_phone(phone: str, db: Session): user = db.query(User).filter(User.ContactNumber == phone).first() return user def verify_driver_exists(driver_id: int, db: Session): driver = db.query(User).filter(User.Id == driver_id).first() if not driver: return False if driver.Role != "Driver": return False return True def get_car_driver(vehicle_id: int, db: Session): driver = db.query(User).filter(User.AssignedVehicle == vehicle_id).first() if not driver: return False return driver def list_users(db: Session, role: str = "Any"): users = db.query(User).filter((User.Role == role) | (role == "Any")).all() return users def replace_user_data(user_id: int, user_data: UserCreate, db: Session): user = db.query(User).filter(User.Id == user_id).first() if not user: return "userNotFound" user.Email = user_data.Email user.Name = user_data.Name user.MiddleName = user_data.MiddleName user.LastName = user_data.LastName user.GovernmentId = user_data.GovernmentId user.Address = user_data.Address user.ContactNumber = user_data.ContactNumber user.Role = user_data.Role user.HashedPassword = Hasher.get_password_hash(user_data.Password) db.commit() db.refresh(user) return user def delete_user_data(id: int, db: Session): user = db.query(User).filter(User.Id == id).first() if not user: return "userNotFound" drivetasks = db.query(DriveTask).filter(DriveTask.DriverId == id).all() for task in drivetasks: # delete all tasks assigned to this user db.delete(task) db.delete(user) db.commit() return "userDeleted"