# Routes for user. MAIN PART OF THE API from math import ceil from fastapi import APIRouter, HTTPException, status, Query from sqlalchemy.orm import Session from sqlalchemy import or_ from fastapi import Depends from typing import List, Annotated from apis.v1.route_auth import get_current_user from core.config import settings from db.models.user import User from schemas.user import UserCreate, ShowUser, ShowDriver, DriverCreate, OutputUser, UsersPage from db.session import get_db from db.repository.user import ( create_new_user, list_users, get_user_by_id, replace_user_data, create_new_driver, delete_user_data, get_users_by_name ) router = APIRouter() @router.post("/", response_model=ShowUser, status_code=status.HTTP_201_CREATED) def create_user( user: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): if user.Role not in settings.ALLOWED_ROLES: raise HTTPException( status_code=400, detail=f"Role {user.Role} is not allowed. Allowed roles are {settings.ALLOWED_ROLES}", ) if current_user.Role != "Admin": raise HTTPException( status_code=403, detail="You are not authorized to perform this action" ) # if current_user.Role != "Admin": # raise HTTPException(status_code=403, detail="You are not authorized to perform this action") user = create_new_user(user=user, db=db) return user @router.post("/driver", response_model=ShowDriver, status_code=status.HTTP_201_CREATED) def create_driver( driver: DriverCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.Role != "Admin": raise HTTPException( status_code=403, detail="You are not authorized to perform this action" ) driver = create_new_driver(driver=driver, db=db) return driver @router.get("/", response_model=List[ShowUser], status_code=status.HTTP_200_OK) def get_all_users(db: Session = Depends(get_db), role: str = None): if role is None: users = list_users(db=db) return users users = list_users(db=db, role=role) return users @router.put("/{user_id}", response_model=ShowUser, status_code=status.HTTP_202_ACCEPTED) def update_user( user_id: int, user: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.Role != "Admin": raise HTTPException( status_code=403, detail="You are not authorized to perform this action" ) user = replace_user_data(user_id=user_id, user_data=user, db=db) if user == "userNotFound": raise HTTPException(status_code=404, detail="User not found") return user @router.get("/me", response_model=ShowUser, status_code=status.HTTP_200_OK) def get_user_me( current_user: Annotated[User, Depends(get_current_user)], db: Annotated[Session, Depends(get_db)], ): print("Getting current user...") return current_user @router.get("/{user_id}", response_model=ShowUser, status_code=status.HTTP_200_OK) def get_user(user_id: int, db: Session = Depends(get_db)): user = get_user_by_id(user_id=user_id, role="Any", db=db) if not user: raise HTTPException(status_code=404, detail="User not found") return user @router.get( "/driver/{driver_id}", response_model=ShowDriver, status_code=status.HTTP_200_OK ) def get_driver(driver_id: int, db: Session = Depends(get_db)): driver = get_user_by_id(user_id=driver_id, role="Driver", db=db) if not driver: raise HTTPException(status_code=404, detail="Driver not found") res = driver.__dict__ res["AssignedVehicle"] = driver.vehicle return driver @router.delete("/{user_id}", status_code=status.HTTP_200_OK) def delete_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): if current_user.Role != "Admin": raise HTTPException( status_code=403, detail="You are not authorized to perform this action" ) result = delete_user_data(id=user_id, db=db) if result == "userNotFound": raise HTTPException(status_code=404, detail="User not found") return result @router.get("/search/", response_model=UsersPage) def search_users(db: Session = Depends(get_db), name: str = None, role: str = None, page: int = 1, per_page: int = 20): query = db.query(User).filter(User.Name.like(f"{name}%")) if role is not None and role != "Admin": query = query.filter(User.Role == role) total_users = query.count() total_pages = ceil(total_users / per_page) users = query.offset((page - 1) * per_page).limit(per_page).all() output_users = [OutputUser( id= user.Id, Name=user.Name, MiddleName = user.MiddleName, LastName=user.LastName, ContactNumber=user.ContactNumber, Address=user.Address, Email=user.Email, Role=user.Role, AssignedVehicle= None ) for user in users] return {"users": output_users, "total_pages": total_pages}