您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

drivetask.py 3.4 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from datetime import datetime
  2. from sqlalchemy.orm import Session
  3. from schemas.drivetask import CreateTask
  4. from db.models.drivetask import DriveTask
  5. from db.repository.user import get_user_by_id
  6. def create_new_task(task: CreateTask, db: Session):
  7. driver = get_user_by_id(task.DriverId, "Driver", db)
  8. if driver is None:
  9. return "notdriver"
  10. elif driver.Role != "Driver":
  11. return "notdriver"
  12. task_object = DriveTask(
  13. DriverId=task.DriverId,
  14. Description=task.Description,
  15. Status="Pending",
  16. StartLocation=task.StartLocation,
  17. EndLocation=task.EndLocation,
  18. )
  19. db.add(task_object)
  20. db.commit()
  21. db.refresh(task_object)
  22. return task_object
  23. def change_task_status(task_id: int, status: str, distance: int, db: Session):
  24. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  25. if not task:
  26. return "notaskfound"
  27. if status == "In Progress":
  28. # see if there are any other tasks in progress by this driver, if yes, cancel
  29. task.StartDateTime = datetime.now()
  30. if status == "Completed":
  31. task.DistanceCovered = distance
  32. task.EndDateTime = datetime.now()
  33. task.Status = status
  34. db.commit()
  35. db.refresh(task)
  36. return task
  37. def get_task_driver(task_id: int, db: Session):
  38. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  39. if not task:
  40. return "notaskfound"
  41. driver = get_user_by_id(task.DriverId, "Any", db)
  42. if not driver:
  43. return "notdriver"
  44. return driver
  45. def get_tasks_by_driver(driver_id: int, db: Session):
  46. driver = get_user_by_id(driver_id, "Any", db)
  47. if not driver:
  48. return "notdriver"
  49. if driver.Role != "Driver":
  50. return "notdriver"
  51. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  52. return tasks
  53. def get_task_by_id(task_id: int, db: Session):
  54. task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  55. if not task:
  56. return "notaskfound"
  57. task.Driver = task.CreatedBy.__dict__
  58. task.Driver["AssignedVehicle"] = task.CreatedBy.vehicle
  59. return task
  60. def get_all_tasks(status: str, db: Session):
  61. if status != "Any":
  62. tasks = db.query(DriveTask).filter(DriveTask.Status == status).all()
  63. else:
  64. tasks = db.query(DriveTask).all()
  65. for task in tasks:
  66. task.Driver = task.CreatedBy.__dict__
  67. task.Driver["AssignedVehicle"] = task.CreatedBy.vehicle
  68. return tasks
  69. def edit_task(task_id: int, task: CreateTask, db: Session):
  70. tasks = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
  71. if not tasks:
  72. return "notaskfound"
  73. tasks.Description = task.Description
  74. tasks.StartLocation = task.StartLocation
  75. tasks.EndLocation = task.EndLocation
  76. db.commit()
  77. db.refresh(tasks)
  78. return tasks
  79. def get_active_route_by_driver(driver_id: int, db: Session):
  80. driver = get_user_by_id(driver_id, "Driver", db)
  81. if not driver:
  82. return "notdriver"
  83. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  84. for task in tasks:
  85. if task.Status == "In Progress":
  86. return task
  87. return "noroute"
  88. def get_my_routes(driver_id: int, db: Session):
  89. driver = get_user_by_id(driver_id, "Driver", db)
  90. if not driver:
  91. return "notdriver"
  92. tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
  93. return tasks