No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

route_auth.py 2.3 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  2. from fastapi import Depends, APIRouter
  3. from sqlalchemy.orm import Session
  4. from fastapi import status, HTTPException
  5. from typing import Annotated
  6. from db.session import get_db
  7. from core.hashing import Hasher
  8. from core.config import settings
  9. from jose import JWTError, jwt
  10. from schemas.token import Token
  11. from db.repository.user import get_user_by_email, get_user_by_phone
  12. from core.auth import create_access_token
  13. router = APIRouter()
  14. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
  15. def authenticate_user(login: str, password: str, db: Session):
  16. print("Trying to auth...")
  17. user = None
  18. if ("@" in login):
  19. user = get_user_by_email(email=login, db=db)
  20. elif ("+" in login):
  21. user = get_user_by_phone(phone=login, db=db)
  22. else:
  23. return False
  24. if not user:
  25. return False
  26. if not Hasher.verify_password(password, user.HashedPassword):
  27. return False
  28. return user
  29. def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Annotated[Session, Depends(get_db)]):
  30. print("Getting current user...")
  31. try:
  32. payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  33. username: str = payload.get("sub")
  34. if username is None:
  35. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
  36. except JWTError:
  37. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
  38. if ("@" in username):
  39. user = get_user_by_email(email=username, db=db)
  40. elif ("+" in username):
  41. user = get_user_by_phone(phone=username, db=db)
  42. else:
  43. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
  44. return user
  45. @router.post("/token", response_model=Token)
  46. def access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
  47. print("Getting token...")
  48. user = authenticate_user(form_data.username, form_data.password, db)
  49. print(user)
  50. if not user:
  51. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password")
  52. access_token = create_access_token(data={"sub": user.Email})
  53. return {"access_token": access_token, "token_type": "bearer"}