|
- from sqlalchemy.orm import Session
-
- from schemas.drivetask import CreateTask
- from db.models.drivetask import DriveTask
- from db.repository.user import get_user_by_id
-
-
- def create_new_task(task: CreateTask, db: Session):
- driver = get_user_by_id(task.DriverId, db)
- if driver is None:
- return "notdriver"
- elif driver.Role != "Driver":
- return "notdriver"
- task_object = DriveTask(
- DriverId=task.DriverId,
- Description=task.Description,
- Status="Pending",
- StartLocation=task.StartLocation,
- EndLocation=task.EndLocation,
- )
- db.add(task_object)
- db.commit()
- db.refresh(task_object)
- return task_object
-
-
- def change_task_status(task_id: int, status: str, db: Session):
- task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
- if not task:
- return "notaskfound"
- task.Status = status
- db.commit()
- db.refresh(task)
- return task
-
-
- def get_task_driver(task_id: int, db: Session):
- task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
- if not task:
- return "notaskfound"
- driver = get_user_by_id(task.DriverId, db)
- if not driver:
- return "notdriver"
- return driver
-
-
- def get_tasks_by_driver(driver_id: int, db: Session):
- driver = get_user_by_id(driver_id, db)
- if not driver:
- return "notdriver"
- if driver.Role != "Driver":
- return "notdriver"
- tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
- return tasks
-
-
- def get_task_by_id(task_id: int, db: Session):
- task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
- if not task:
- return "notaskfound"
- return task
-
-
- def get_all_tasks(db: Session):
- tasks = db.query(DriveTask).all()
- return tasks
-
- def edit_task(task_id: int, task: CreateTask, db: Session):
- tasks = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
- if not tasks:
- return "notaskfound"
- tasks.Description = task.Description
- tasks.StartLocation = task.StartLocation
- tasks.EndLocation = task.EndLocation
- db.commit()
- db.refresh(tasks)
- return tasks
|