You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

115 lines
3.3 KiB

  1. # Creating a new user in the database
  2. from sqlalchemy.orm import Session
  3. from schemas.user import UserCreate, DriverCreate
  4. from db.models.user import User
  5. from core.hashing import Hasher
  6. from db.models.drivetask import DriveTask
  7. def create_new_user(user: UserCreate, db: Session):
  8. user_object = User(
  9. Email=user.Email,
  10. Name=user.Name,
  11. MiddleName=user.MiddleName,
  12. LastName=user.LastName,
  13. GovernmentId=user.GovernmentId,
  14. Address=user.Address,
  15. ContactNumber=user.ContactNumber,
  16. Role=user.Role,
  17. HashedPassword=Hasher.get_password_hash(user.Password),
  18. )
  19. db.add(user_object)
  20. db.commit()
  21. db.refresh(user_object)
  22. return user_object
  23. def create_new_driver(driver: DriverCreate, db: Session):
  24. driver_object = User(
  25. Email=driver.Email,
  26. Name=driver.Name,
  27. MiddleName=driver.MiddleName,
  28. LastName=driver.LastName,
  29. GovernmentId=driver.GovernmentId,
  30. Address=driver.Address,
  31. ContactNumber=driver.ContactNumber,
  32. Role="Driver",
  33. HashedPassword=Hasher.get_password_hash(driver.Password),
  34. DrivingLicenseNumber=driver.DrivingLicenseNumber,
  35. )
  36. db.add(driver_object)
  37. db.commit()
  38. db.refresh(driver_object)
  39. return driver_object
  40. def get_user_by_id(user_id: int, role: str, db: Session):
  41. user = db.query(User).filter(User.Id == user_id).first()
  42. if not user:
  43. return False
  44. if user.Role != role and role != "Any":
  45. return False
  46. return user
  47. def get_user_by_email(email: str, db: Session):
  48. user = db.query(User).filter(User.Email == email).first()
  49. return user
  50. def get_user_by_phone(phone: str, db: Session):
  51. user = db.query(User).filter(User.ContactNumber == phone).first()
  52. return user
  53. def verify_driver_exists(driver_id: int, db: Session):
  54. driver = db.query(User).filter(User.Id == driver_id).first()
  55. if not driver:
  56. return False
  57. if driver.Role != "Driver":
  58. return False
  59. return True
  60. def get_car_driver(vehicle_id: int, db: Session):
  61. driver = db.query(User).filter(User.AssignedVehicle == vehicle_id).first()
  62. if not driver:
  63. return False
  64. return driver
  65. def list_users(db: Session, role: str = "Any"):
  66. users = db.query(User).filter((User.Role == role) | (role == "Any")).all()
  67. return users
  68. def replace_user_data(user_id: int, user_data: UserCreate, db: Session):
  69. user = db.query(User).filter(User.Id == user_id).first()
  70. if not user:
  71. return "userNotFound"
  72. user.Email = user_data.Email
  73. user.Name = user_data.Name
  74. user.MiddleName = user_data.MiddleName
  75. user.LastName = user_data.LastName
  76. user.GovernmentId = user_data.GovernmentId
  77. user.Address = user_data.Address
  78. user.ContactNumber = user_data.ContactNumber
  79. user.Role = user_data.Role
  80. user.HashedPassword = Hasher.get_password_hash(user_data.Password)
  81. db.commit()
  82. db.refresh(user)
  83. return user
  84. def delete_user_data(id: int, db: Session):
  85. user = db.query(User).filter(User.Id == id).first()
  86. if not user:
  87. return "userNotFound"
  88. drivetasks = db.query(DriveTask).filter(DriveTask.DriverId == id).all()
  89. for task in drivetasks: # delete all tasks assigned to this user
  90. db.delete(task)
  91. db.delete(user)
  92. db.commit()
  93. return "userDeleted"