Ver a proveniência

Added more assignment/task functions

main
Madiwka3 há 1 ano
ascendente
cometimento
03082929c6
3 ficheiros alterados com 135 adições e 1 eliminações
  1. +92
    -1
      app/apis/v1/route_task.py
  2. +1
    -0
      app/core/config.py
  3. +42
    -0
      app/db/repository/drivetask.py

+ 92
- 1
app/apis/v1/route_task.py Ver ficheiro

@@ -6,16 +6,22 @@ from db.session import get_db
from core.config import settings
from db.repository.drivetask import (
create_new_task,
get_task_driver,
change_task_status,
get_all_tasks,
get_task_by_id,
get_tasks_by_driver,
)
from schemas.drivetask import CreateTask
from db.models.user import User
from apis.v1.route_auth import get_current_user
from db.models.drivetask import DriveTask

router = APIRouter()


@router.post("/", status_code=status.HTTP_201_CREATED)
def create_tasK(
def create_task(
task: CreateTask,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
@@ -30,3 +36,88 @@ def create_tasK(
status_code=404, detail=f"Driver with id {task.DriverId} not found"
)
return task


@router.patch("/", status_code=status.HTTP_200_OK)
def changeStatus(
task_id: int,
status: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.Role == "Admin" or current_user.Role == "Driver":
if status not in settings.ALLOWED_TASK_STATUS:
raise HTTPException(
status_code=400,
detail=f"Status {status} is not allowed. Allowed status are {settings.ALLOWED_TASK_STATUS}",
)
if current_user.Role == "Driver":
verification = get_task_driver(task_id, db)
if verification.Id != current_user.Id:
raise HTTPException(
status_code=403,
detail="You are not authorized to perform this action",
)
task = change_task_status(task_id, status, db)
if task == "notaskfound":
raise HTTPException(
status_code=404, detail=f"Task with id {task_id} not found"
)
return task
else:
raise HTTPException(
status_code=403, detail="You are not authorized to perform this action"
)


@router.get("/", status_code=status.HTTP_200_OK)
def getAllTasks(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.Role == "Admin":
tasks = get_all_tasks(db)
return tasks
else:
raise HTTPException(
status_code=403, detail="You are not authorized to perform this action"
)


@router.get("/{task_id}", status_code=status.HTTP_200_OK)
def getTaskById(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.Role != "Admin":
raise HTTPException(
status_code=403, detail="You are not authorized to perform this action"
)
task = get_task_by_id(task_id, db)
if task == "notaskfound":
raise HTTPException(status_code=404, detail=f"Task with id {task_id} not found")
return task


@router.get("/driver/{driver_id}", status_code=status.HTTP_200_OK)
def getTasksByDriver(
driver_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.Role != "Admin" and current_user.Role != "Driver":
raise HTTPException(
status_code=403, detail="You are not authorized to perform this action"
)
if current_user.Role == "Driver":
if current_user.Id != driver_id:
raise HTTPException(
status_code=403, detail="You are not authorized to perform this action"
)
tasks = get_tasks_by_driver(driver_id, db)
if tasks == "notdriver":
raise HTTPException(
status_code=404, detail=f"Driver with id {driver_id} not found"
)
return tasks

+ 1
- 0
app/core/config.py Ver ficheiro

@@ -10,6 +10,7 @@ class Settings:
ACCESS_TOKEN_EXPIRE: int = 60 * 24 * 7 # 7 days
SECRET_KEY: str = "tH357aC6oA7ofCaN3yTffYkRh"
ALGORITHM: str = "HS256"
ALLOWED_TASK_STATUS: list = ["Pending", "In Progress", "Completed", "Cancelled"]


settings = Settings()


+ 42
- 0
app/db/repository/drivetask.py Ver ficheiro

@@ -22,3 +22,45 @@ def create_new_task(task: CreateTask, db: Session):
db.commit()
db.refresh(task_object)
return task_object


def change_task_status(task_id: int, status: str, db: Session):
task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
if not task:
return "notaskfound"
task.Status = status
db.commit()
db.refresh(task)
return task


def get_task_driver(task_id: int, db: Session):
task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
if not task:
return "notaskfound"
driver = get_user_by_id(task.DriverId, db)
if not driver:
return "notdriver"
return driver


def get_tasks_by_driver(driver_id: int, db: Session):
driver = get_user_by_id(driver_id, db)
if not driver:
return "notdriver"
if driver.Role != "Driver":
return "notdriver"
tasks = db.query(DriveTask).filter(DriveTask.DriverId == driver_id).all()
return tasks


def get_task_by_id(task_id: int, db: Session):
task = db.query(DriveTask).filter(DriveTask.Id == task_id).first()
if not task:
return "notaskfound"
return task


def get_all_tasks(db: Session):
tasks = db.query(DriveTask).all()
return tasks

Carregando…
Cancelar
Guardar