You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

164 lines
4.8 KiB

  1. # Creating a new user in the database
  2. from math import ceil
  3. from sqlalchemy.orm import Session
  4. from schemas.user import OutputUser, UserCreate, DriverCreate
  5. from db.models.user import User
  6. from core.hashing import Hasher
  7. from db.models.drivetask import DriveTask
  8. def create_new_user(user: UserCreate, db: Session):
  9. if get_user_by_email(user.Email, db):
  10. return "userExists"
  11. user_object = User(
  12. Email=user.Email,
  13. Name=user.Name,
  14. MiddleName=user.MiddleName,
  15. LastName=user.LastName,
  16. GovernmentId=user.GovernmentId,
  17. Address=user.Address,
  18. ContactNumber=user.ContactNumber,
  19. Role=user.Role,
  20. HashedPassword=Hasher.get_password_hash(user.Password),
  21. )
  22. db.add(user_object)
  23. db.commit()
  24. db.refresh(user_object)
  25. return user_object
  26. def create_new_driver(driver: DriverCreate, db: Session):
  27. if get_user_by_email(driver.Email, db):
  28. return "userExists"
  29. driver_object = User(
  30. Email=driver.Email,
  31. Name=driver.Name,
  32. MiddleName=driver.MiddleName,
  33. LastName=driver.LastName,
  34. GovernmentId=driver.GovernmentId,
  35. Address=driver.Address,
  36. ContactNumber=driver.ContactNumber,
  37. Role="Driver",
  38. HashedPassword=Hasher.get_password_hash(driver.Password),
  39. DrivingLicenseNumber=driver.DrivingLicenseNumber,
  40. )
  41. db.add(driver_object)
  42. db.commit()
  43. db.refresh(driver_object)
  44. return driver_object
  45. def get_user_by_id(user_id: int, role: str, db: Session):
  46. user = db.query(User).filter(User.Id == user_id).first()
  47. if not user:
  48. return False
  49. if user.Role != role and role != "Any":
  50. return False
  51. return user
  52. def get_user_by_email(email: str, db: Session):
  53. user = db.query(User).filter(User.Email == email).first()
  54. return user
  55. def get_user_by_phone(phone: str, db: Session):
  56. user = db.query(User).filter(User.ContactNumber == phone).first()
  57. return user
  58. def verify_driver_exists(driver_id: int, db: Session):
  59. driver = db.query(User).filter(User.Id == driver_id).first()
  60. if not driver:
  61. return False
  62. if driver.Role != "Driver":
  63. return False
  64. return True
  65. def get_car_driver(vehicle_id: int, db: Session):
  66. driver = db.query(User).filter(User.AssignedVehicle == vehicle_id).first()
  67. if not driver:
  68. return False
  69. return driver
  70. def list_users(db: Session, role: str = "Any"):
  71. users = db.query(User).filter((User.Role == role) | (role == "Any")).all()
  72. return users
  73. def get_users_by_name(
  74. db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20
  75. ):
  76. if role == "Admin":
  77. return None
  78. if role is None:
  79. users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role != "Admin")
  80. else:
  81. users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role == role)
  82. users = users.offset((page - 1) * per_page).limit(per_page).all()
  83. return users
  84. def replace_user_data(user_id: int, user_data: UserCreate, db: Session):
  85. user = db.query(User).filter(User.Id == user_id).first()
  86. if not user:
  87. return "userNotFound"
  88. user.Email = user_data.Email
  89. user.Name = user_data.Name
  90. user.MiddleName = user_data.MiddleName
  91. user.LastName = user_data.LastName
  92. user.GovernmentId = user_data.GovernmentId
  93. user.Address = user_data.Address
  94. user.ContactNumber = user_data.ContactNumber
  95. user.Role = user_data.Role
  96. user.HashedPassword = Hasher.get_password_hash(user_data.Password)
  97. db.commit()
  98. db.refresh(user)
  99. return user
  100. def delete_user_data(id: int, db: Session):
  101. user = db.query(User).filter(User.Id == id).first()
  102. if not user:
  103. return "userNotFound"
  104. drivetasks = db.query(DriveTask).filter(DriveTask.DriverId == id).all()
  105. for task in drivetasks: # delete all tasks assigned to this user
  106. db.delete(task)
  107. db.delete(user)
  108. db.commit()
  109. return "userDeleted"
  110. def user_search_query(
  111. db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20
  112. ):
  113. if name == "" or name is None:
  114. query = db.query(User)
  115. else:
  116. query = db.query(User).filter(User.Name.like(f"{name}%"))
  117. if role is not None and role != "Admin":
  118. query = query.filter(User.Role == role)
  119. total_users = query.count()
  120. total_pages = ceil(total_users / per_page)
  121. users = query.offset((page - 1) * per_page).limit(per_page).all()
  122. output_users = [
  123. OutputUser(
  124. id=user.Id,
  125. Name=user.Name,
  126. MiddleName=user.MiddleName,
  127. LastName=user.LastName,
  128. ContactNumber=user.ContactNumber,
  129. Address=user.Address,
  130. Email=user.Email,
  131. Role=user.Role,
  132. AssignedVehicle=None,
  133. )
  134. for user in users
  135. ]
  136. return {"users": output_users, "total_pages": total_pages}