You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

112 lines
3.4 KiB

  1. from sqlalchemy.orm import Session
  2. from schemas.drivetask import CreateTask
  3. from db.models.drivetask import DriveTask
  4. from db.repository.user import get_user_by_id
  5. def create_new_task(task: CreateTask, db: Session):
  6. driver = get_user_by_id(task.DriverId, "Driver", db)
  7. if driver is None:
  8. return "notdriver"
  9. elif driver.Role != "Driver":
  10. return "notdriver"
  11. task_object = DriveTask(
  12. DriverId=task.DriverId,
  13. Description=task.Description,
  14. Status="Pending",
  15. StartLocation=task.StartLocation,
  16. EndLocation=task.EndLocation,
  17. )
  18. db.add(task_object)
  19. db.commit()
  20. db.refresh(task_object)
  21. return task_object
  22. def change_task_status(task_id: int, status: str, db: Session):
  23. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  24. if not task:
  25. return "notaskfound"
  26. if status == "In Progress":
  27. # see if there are any other tasks in progress by this driver, if yes, cancel
  28. tasks = db.query(DriveTask).filter(DriveTask.DriverId == task.DriverId).all()
  29. for task in tasks:
  30. if task.Status == "In Progress":
  31. return "driverhasothertask"
  32. task.Status = status
  33. db.commit()
  34. db.refresh(task)
  35. return task
  36. def get_task_driver(task_id: int, db: Session):
  37. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  38. if not task:
  39. return "notaskfound"
  40. driver = get_user_by_id(task.DriverId, "Any", db)
  41. if not driver:
  42. return "notdriver"
  43. return driver
  44. def get_tasks_by_driver(driver_id: int, db: Session):
  45. driver = get_user_by_id(driver_id, "Any", db)
  46. if not driver:
  47. return "notdriver"
  48. if driver.Role != "Driver":
  49. return "notdriver"
  50. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  51. return tasks
  52. def get_task_by_id(task_id: int, db: Session):
  53. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  54. task.Driver = task.CreatedBy.__dict__
  55. task.Driver["AssignedVehicle"] = task.CreatedBy.vehicle
  56. if not task:
  57. return "notaskfound"
  58. return task
  59. def get_all_tasks(status: str, db: Session):
  60. if status != "Any":
  61. tasks = db.query(DriveTask).filter(DriveTask.Status == status).all()
  62. else:
  63. tasks = db.query(DriveTask).all()
  64. for task in tasks:
  65. task.Driver = task.CreatedBy.__dict__
  66. task.Driver["AssignedVehicle"] = task.CreatedBy.vehicle
  67. return tasks
  68. def edit_task(task_id: int, task: CreateTask, db: Session):
  69. tasks = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  70. if not tasks:
  71. return "notaskfound"
  72. tasks.Description = task.Description
  73. tasks.StartLocation = task.StartLocation
  74. tasks.EndLocation = task.EndLocation
  75. db.commit()
  76. db.refresh(tasks)
  77. return tasks
  78. def get_active_route_by_driver(driver_id: int, db: Session):
  79. driver = get_user_by_id(driver_id, "Driver", db)
  80. if not driver:
  81. return "notdriver"
  82. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  83. for task in tasks:
  84. if task.Status == "In Progress":
  85. return task
  86. return "noroute"
  87. def get_my_routes(driver_id: int, db: Session):
  88. driver = get_user_by_id(driver_id, "Driver", db)
  89. if not driver:
  90. return "notdriver"
  91. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  92. return tasks