浏览代码

Added token expiration

main
Madiwka3 1年前
父节点
当前提交
4cac217ce4
共有 6 个文件被更改,包括 68 次插入27 次删除
  1. +49
    -16
      app/apis/v1/route_auth.py
  2. +1
    -1
      app/apis/v1/route_vehicle.py
  3. +3
    -2
      app/core/auth.py
  4. +6
    -7
      app/core/config.py
  5. +4
    -1
      app/main.py
  6. +5
    -0
      app/schemas/token.py

+ 49
- 16
app/apis/v1/route_auth.py 查看文件

@@ -1,3 +1,4 @@
from datetime import datetime
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi import Depends, APIRouter
from sqlalchemy.orm import Session
@@ -7,20 +8,23 @@ from db.session import get_db
from core.hashing import Hasher
from core.config import settings
from jose import JWTError, jwt
from schemas.token import Token
from schemas.token import Token, TokenPayload
from db.repository.user import get_user_by_email, get_user_by_phone
from core.auth import create_access_token


router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/token",
)


def authenticate_user(login: str, password: str, db: Session):
print("Trying to auth...")
user = None
if ("@" in login):
if "@" in login:
user = get_user_by_email(email=login, db=db)
elif ("+" in login):
elif "+" in login:
user = get_user_by_phone(phone=login, db=db)
else:
return False
@@ -31,32 +35,61 @@ def authenticate_user(login: str, password: str, db: Session):
return user


def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Annotated[Session, Depends(get_db)]):
def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[Session, Depends(get_db)],
):
print("Getting current user...")
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)

if datetime.fromtimestamp(token_data.exp) < datetime.now():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session expired. Please login again.",
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
if ("@" in username):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
if "@" in username:
user = get_user_by_email(email=username, db=db)
elif ("+" in username):
elif "+" in username:
user = get_user_by_phone(phone=username, db=db)
else:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
return user


@router.post("/token", response_model=Token)
def access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
def access_token(
form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)
):
print("Getting token...")
user = authenticate_user(form_data.username, form_data.password, db)
print(user)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
)
access_token = create_access_token(data={"sub": user.Email})
return {"access_token": access_token, "token_type": "bearer"}


print("TOKENS ARE: ")
print(access_token)
return {
"access_token": access_token,
"token_type": "bearer",
}

+ 1
- 1
app/apis/v1/route_vehicle.py 查看文件

@@ -1,7 +1,7 @@
from fastapi import APIRouter, status, HTTPException
from sqlalchemy.orm import Session
from fastapi import Depends
from typing import Annotated, List
from typing import List
from db.session import get_db
from schemas.vehicle import OutputVehicle, CreateVehicle, UpdateVehicle, VehicleLocation
from db.repository.vehicle import (


+ 3
- 2
app/core/auth.py 查看文件

@@ -12,6 +12,7 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt


+ 6
- 7
app/core/config.py 查看文件

@@ -1,5 +1,3 @@


class Settings:
PROJECT_NAME: str = "VMS"
PROJECT_VERSION: str = "1.0.0"
@@ -9,7 +7,7 @@ class Settings:
POSTGRES_PORT: str = "5432"
POSTGRES_DB: str = "VMSData"
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
ACCESS_TOKEN_EXPIRE: int = 30
ACCESS_TOKEN_EXPIRE: int = 60 * 24 * 7 # 7 days
SECRET_KEY: str = "tH357aC6oA7ofCaN3yTffYkRh"
ALGORITHM: str = "HS256"

@@ -21,8 +19,8 @@ def createAdminAcc():
from db.session import SessionLocal
from db.repository.user import create_new_user
from schemas.user import UserCreate
from core.hashing import Hasher
from db.models.user import User

db = SessionLocal()
user = UserCreate(
Email="madi.turgunov@nu.edu.kz",
@@ -31,8 +29,9 @@ def createAdminAcc():
LastName="Turgunov",
ContactNumber="+77071234567",
Role="Admin",
BirthDate="2000-01-01T00:00:00+06:00")
if (db.query(User).filter(User.Email == user.Email).first()):
BirthDate="2000-01-01T00:00:00+06:00",
)
if db.query(User).filter(User.Email == user.Email).first():
return False
create_new_user(user=user, db=db)
return True
return True

+ 4
- 1
app/main.py 查看文件

@@ -10,7 +10,10 @@ def include_routes(app): # include all routes from our api/v1/


def startup(): # start the project, and create the tables
app = FastAPI(title=settings.PROJECT_NAME, version=settings.PROJECT_VERSION)
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.PROJECT_VERSION,
)
Base.metadata.create_all(bind=engine)
createAdminAcc()
include_routes(app)


+ 5
- 0
app/schemas/token.py 查看文件

@@ -4,3 +4,8 @@ from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str


class TokenPayload(BaseModel):
sub: str = None
exp: int = None

正在加载...
取消
保存