You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

61 lines
2.3 KiB

  1. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  2. from fastapi import Depends, APIRouter
  3. from sqlalchemy.orm import Session
  4. from fastapi import status, HTTPException
  5. from typing import Annotated
  6. from db.session import get_db
  7. from core.hashing import Hasher
  8. from core.config import settings
  9. from jose import JWTError, jwt
  10. from schemas.token import Token
  11. from db.repository.user import get_user_by_email, get_user_by_phone
  12. from core.auth import create_access_token
  13. router = APIRouter()
  14. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
  15. def authenticate_user(login: str, password: str, db: Session):
  16. print("Trying to auth...")
  17. user = None
  18. if ("@" in login):
  19. user = get_user_by_email(email=login, db=db)
  20. elif ("+" in login):
  21. user = get_user_by_phone(phone=login, db=db)
  22. else:
  23. return False
  24. if not user:
  25. return False
  26. if not Hasher.verify_password(password, user.HashedPassword):
  27. return False
  28. return user
  29. def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Annotated[Session, Depends(get_db)]):
  30. print("Getting current user...")
  31. try:
  32. payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  33. username: str = payload.get("sub")
  34. if username is None:
  35. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
  36. except JWTError:
  37. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
  38. if ("@" in username):
  39. user = get_user_by_email(email=username, db=db)
  40. elif ("+" in username):
  41. user = get_user_by_phone(phone=username, db=db)
  42. else:
  43. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
  44. return user
  45. @router.post("/token", response_model=Token)
  46. def access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
  47. print("Getting token...")
  48. user = authenticate_user(form_data.username, form_data.password, db)
  49. print(user)
  50. if not user:
  51. raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password")
  52. access_token = create_access_token(data={"sub": user.Email})
  53. return {"access_token": access_token, "token_type": "bearer"}