Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

user.py 4.7 KiB

il y a 1 an
il y a 1 an
il y a 1 an
il y a 1 an
il y a 1 an
il y a 1 an
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # Creating a new user in the database
  2. from math import ceil
  3. from sqlalchemy.orm import Session
  4. from schemas.user import OutputUser, UserCreate, DriverCreate
  5. from db.models.user import User
  6. from core.hashing import Hasher
  7. from db.models.drivetask import DriveTask
  8. def create_new_user(user: UserCreate, db: Session):
  9. user_object = User(
  10. Email=user.Email,
  11. Name=user.Name,
  12. MiddleName=user.MiddleName,
  13. LastName=user.LastName,
  14. GovernmentId=user.GovernmentId,
  15. Address=user.Address,
  16. ContactNumber=user.ContactNumber,
  17. Role=user.Role,
  18. HashedPassword=Hasher.get_password_hash(user.Password),
  19. )
  20. db.add(user_object)
  21. db.commit()
  22. db.refresh(user_object)
  23. return user_object
  24. def create_new_driver(driver: DriverCreate, db: Session):
  25. driver_object = User(
  26. Email=driver.Email,
  27. Name=driver.Name,
  28. MiddleName=driver.MiddleName,
  29. LastName=driver.LastName,
  30. GovernmentId=driver.GovernmentId,
  31. Address=driver.Address,
  32. ContactNumber=driver.ContactNumber,
  33. Role="Driver",
  34. HashedPassword=Hasher.get_password_hash(driver.Password),
  35. DrivingLicenseNumber=driver.DrivingLicenseNumber,
  36. )
  37. db.add(driver_object)
  38. db.commit()
  39. db.refresh(driver_object)
  40. return driver_object
  41. def get_user_by_id(user_id: int, role: str, db: Session):
  42. user = db.query(User).filter(User.Id == user_id).first()
  43. if not user:
  44. return False
  45. if user.Role != role and role != "Any":
  46. return False
  47. return user
  48. def get_user_by_email(email: str, db: Session):
  49. user = db.query(User).filter(User.Email == email).first()
  50. return user
  51. def get_user_by_phone(phone: str, db: Session):
  52. user = db.query(User).filter(User.ContactNumber == phone).first()
  53. return user
  54. def verify_driver_exists(driver_id: int, db: Session):
  55. driver = db.query(User).filter(User.Id == driver_id).first()
  56. if not driver:
  57. return False
  58. if driver.Role != "Driver":
  59. return False
  60. return True
  61. def get_car_driver(vehicle_id: int, db: Session):
  62. driver = db.query(User).filter(User.AssignedVehicle == vehicle_id).first()
  63. if not driver:
  64. return False
  65. return driver
  66. def list_users(db: Session, role: str = "Any"):
  67. users = db.query(User).filter((User.Role == role) | (role == "Any")).all()
  68. return users
  69. def get_users_by_name(
  70. db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20
  71. ):
  72. if role == "Admin":
  73. return None
  74. if role is None:
  75. users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role != "Admin")
  76. else:
  77. users = db.query(User).filter(User.Name.like(f"{name}%"), User.Role == role)
  78. users = users.offset((page - 1) * per_page).limit(per_page).all()
  79. return users
  80. def replace_user_data(user_id: int, user_data: UserCreate, db: Session):
  81. user = db.query(User).filter(User.Id == user_id).first()
  82. if not user:
  83. return "userNotFound"
  84. user.Email = user_data.Email
  85. user.Name = user_data.Name
  86. user.MiddleName = user_data.MiddleName
  87. user.LastName = user_data.LastName
  88. user.GovernmentId = user_data.GovernmentId
  89. user.Address = user_data.Address
  90. user.ContactNumber = user_data.ContactNumber
  91. user.Role = user_data.Role
  92. user.HashedPassword = Hasher.get_password_hash(user_data.Password)
  93. db.commit()
  94. db.refresh(user)
  95. return user
  96. def delete_user_data(id: int, db: Session):
  97. user = db.query(User).filter(User.Id == id).first()
  98. if not user:
  99. return "userNotFound"
  100. drivetasks = db.query(DriveTask).filter(DriveTask.DriverId == id).all()
  101. for task in drivetasks: # delete all tasks assigned to this user
  102. db.delete(task)
  103. db.delete(user)
  104. db.commit()
  105. return "userDeleted"
  106. def user_search_query(
  107. db: Session, name: str = "", role: str = None, page: int = 1, per_page: int = 20
  108. ):
  109. if name == "" or name is None:
  110. query = db.query(User)
  111. else:
  112. query = db.query(User).filter(User.Name.like(f"{name}%"))
  113. if role is not None and role != "Admin":
  114. query = query.filter(User.Role == role)
  115. total_users = query.count()
  116. total_pages = ceil(total_users / per_page)
  117. users = query.offset((page - 1) * per_page).limit(per_page).all()
  118. output_users = [
  119. OutputUser(
  120. id=user.Id,
  121. Name=user.Name,
  122. MiddleName=user.MiddleName,
  123. LastName=user.LastName,
  124. ContactNumber=user.ContactNumber,
  125. Address=user.Address,
  126. Email=user.Email,
  127. Role=user.Role,
  128. AssignedVehicle=None,
  129. )
  130. for user in users
  131. ]
  132. return {"users": output_users, "total_pages": total_pages}