You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

113 lines
3.4 KiB

  1. from datetime import datetime
  2. from sqlalchemy.orm import Session
  3. from schemas.drivetask import CreateTask
  4. from db.models.drivetask import DriveTask
  5. from db.repository.user import get_user_by_id
  6. def create_new_task(task: CreateTask, db: Session):
  7. driver = get_user_by_id(task.DriverId, "Driver", db)
  8. if driver is None:
  9. return "notdriver"
  10. elif driver.Role != "Driver":
  11. return "notdriver"
  12. task_object = DriveTask(
  13. DriverId=task.DriverId,
  14. Description=task.Description,
  15. Status="Pending",
  16. StartLocation=task.StartLocation,
  17. EndLocation=task.EndLocation,
  18. )
  19. db.add(task_object)
  20. db.commit()
  21. db.refresh(task_object)
  22. return task_object
  23. def change_task_status(task_id: int, status: str, distance: int, db: Session):
  24. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  25. if not task:
  26. return "notaskfound"
  27. if status == "In Progress":
  28. # see if there are any other tasks in progress by this driver, if yes, cancel
  29. task.StartDateTime = datetime.now()
  30. if status == "Completed":
  31. task.DistanceCovered = distance
  32. task.EndDateTime = datetime.now()
  33. task.Status = status
  34. db.commit()
  35. db.refresh(task)
  36. return task
  37. def get_task_driver(task_id: int, db: Session):
  38. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  39. if not task:
  40. return "notaskfound"
  41. driver = get_user_by_id(task.DriverId, "Any", db)
  42. if not driver:
  43. return "notdriver"
  44. return driver
  45. def get_tasks_by_driver(driver_id: int, db: Session):
  46. driver = get_user_by_id(driver_id, "Any", db)
  47. if not driver:
  48. return "notdriver"
  49. if driver.Role != "Driver":
  50. return "notdriver"
  51. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  52. return tasks
  53. def get_task_by_id(task_id: int, db: Session):
  54. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  55. task.Driver = task.CreatedBy.__dict__
  56. task.Driver["AssignedVehicle"] = task.CreatedBy.vehicle
  57. if not task:
  58. return "notaskfound"
  59. return task
  60. def get_all_tasks(status: str, db: Session):
  61. if status != "Any":
  62. tasks = db.query(DriveTask).filter(DriveTask.Status == status).all()
  63. else:
  64. tasks = db.query(DriveTask).all()
  65. for task in tasks:
  66. task.Driver = task.CreatedBy.__dict__
  67. task.Driver["AssignedVehicle"] = task.CreatedBy.vehicle
  68. return tasks
  69. def edit_task(task_id: int, task: CreateTask, db: Session):
  70. tasks = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  71. if not tasks:
  72. return "notaskfound"
  73. tasks.Description = task.Description
  74. tasks.StartLocation = task.StartLocation
  75. tasks.EndLocation = task.EndLocation
  76. db.commit()
  77. db.refresh(tasks)
  78. return tasks
  79. def get_active_route_by_driver(driver_id: int, db: Session):
  80. driver = get_user_by_id(driver_id, "Driver", db)
  81. if not driver:
  82. return "notdriver"
  83. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  84. for task in tasks:
  85. if task.Status == "In Progress":
  86. return task
  87. return "noroute"
  88. def get_my_routes(driver_id: int, db: Session):
  89. driver = get_user_by_id(driver_id, "Driver", db)
  90. if not driver:
  91. return "notdriver"
  92. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  93. return tasks