本次的程式碼與目錄結構可以參考 FastAPI Tutorial : Day19 branch
我們在 Day17 完成 hash password 的實作
在 Day18 完成 JWT token 的實作
今天我們會完成整體 OAuth2 password login 的實作
包括 Authorize Dependency 和 權限管理
因為我們在註冊 User 時,都是以 Email 作為 username
所以在 Login 時,也要以 Email 作為 username
並且我們需要在 crud/user.py
中新增 get_user_in_db
讓我們只取出 User
中的 username
和 password
crud/user.py
# ...
class UserCrudManager:
# ...
async def get_user_in_db(self,email: str,db_session:AsyncSession=None) -> UserSchema.UserInDB :
stmt = select(UserModel.id,UserModel.name,UserModel.password).where(UserModel.email == email)
result = await db_session.execute(stmt)
user = result.first()
if user:
return user
return None
# ...
也可以順便加上 UserInDB
的 schema
讓我們在 API Endpoint 知道 get_user_in_db
回傳的資料格式
schemas/user.py
# ...
class UserInDB(BaseModel):
id: int
name: str
password: str
接著我們就可以在 api/auth.py
中實作 Login Endpoint
先透過 get_user_in_db
取得 User
的 username
和 password
如果沒有該 User
,就先丟出 HTTPException
api/auth.py
@router.post("/login",response_model=Token)
async def login(form_data: login_form_schema):
user_in_db:UserInDB = await UserCrud.get_user_in_db(form_data.username)
if user_in_db is None:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}
)
# ...
再接著檢查密碼是否正確
如果也沒問題,就回傳一組新的 JWT token
api/auth.py
async def login(form_data: login_form_schema):
# ...
if not verify_password(form_data.password,user_in_db.password):
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}
)
return await create_token_pair({"username": user_in_db.name},{"username": user_in_db.name})
先創建一個測試 User
以錯誤帳號來測試
針對一些需要登入才能使用的 API Endpoint
我們應該要帶入 JWT token 來驗證使用者身份
這邊我們使用 fastapi.security
中的 OAuth2PasswordBearer
來作為 Authorize Dependency
這邊先以 update_user
API Endpoint 為例
api/user.py
@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
async def update_user(
newUser: UserSchema.UserUpdate,
user_id:int=Depends(check_user_id),
token:str = Depends(OAuth2PasswordBearer(tokenUrl="api/auth/login")) # <----- 新增
):
# ...
在 Swagger UI 中,我們可以看到 update_user
API Endpoint 被加上了鎖頭
並且我們按下鎖頭後,就可以輸入 Username 和 Password
Swagger UI 會幫我們帶入 tokenUrl
中的 URL
並在 Authorization
header 中帶入 JWT token
當我們輸入正確的帳號密碼後,就可以成功打 update_user
API Endpoint
但我們還遇到一個嚴重的問題
就是任何有登入的 User都可以使用 update_user
API Endpoint
所以我們應該要在 update_user
API Endpoint 中檢查 JWT token 中的 User 與 user_id
是否相同
又因為我們的 User.name
可以重複
所以我們可以在 JWT token 中加入 User.id
api/auth.py
@router.post("/login",response_model=Token)
async def login(form_data: login_form_schema):
# ...
return await create_token_pair(
{"username": user_in_db.name, "id": user_in_db.id},
{"username": user_in_db.name, "id": user_in_db.id},
)
@router.post("/refresh",response_model=Token)
async def refresh(refersh_data: RefreshRequest):
# ...
u_id:int = payload.get("id")
if username is None or u_id is None:
raise exception_invalid_token
return await create_token_pair(
{"username": username , "id": u_id},
{"username": username , "id": u_id}
)
回到 update_user
中
我們可以透過 payload
來取得 username
和 id
並且檢查 id
是否與 user_id
相同
如果不相同,就回傳 403 Permission Denied
api/user.py
@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
# ...
payload = await verify_access_token(token)
if payload.get("id") != user_id:
raise HTTPException(status_code=403, detail="Permission denied")
# ...
當我們以 test
用戶的 Token 去打 PUT /api/users/1
( user1
用戶) 時
就會回傳 403 Permission Denied
改為打自己 ( test
用戶) 的 PUT /api/users/2
時
就可以成功更新
對於需要 Authorize 的 API Endpoint
我們可以透過 get_current_user
來簡化取得當前 User 的過程
讓我們不用再所有的 Route 都寫上 token:str = Depends(OAuth2PasswordBearer(tokenUrl="api/auth/login"))
和 payload = await verify_access_token(token)
auth/utils.py
from fastapi import HTTPException
from crud.users import UserCrudManager
from schemas.auth import oauth2_token_scheme
from auth.jwt import verify_access_token
UserCrud = UserCrudManager()
async def get_current_user(token = oauth2_token_scheme ):
payload = await verify_access_token(token)
user_id = int(payload.get("id"))
user = await UserCrud.get_user_by_id(user_id)
if user is None:
raise HTTPException(
status_code=401,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"}
)
return user
接下來需要 Authorize 的 API Endpoint
都可以直接使注入get_current_user
Dependency
來取得當前登入的 User
api/user.py
@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
async def update_user(
newUser: UserSchema.UserUpdate,
user_id:int=Depends(check_user_id),
user = Depends(get_current_user)
):
if user.id != user_id:
raise HTTPException(status_code=403, detail="Permission denied")
# ...
就只需要 user = Depends(get_current_user)
就可以取得當前登入的 User
再判斷 user.id
是否與 user_id
相同即可
今天我們完成了 OAuth2 password login 的實作
並且實作了 Authorize Dependency 和 權限管理
也完成 JWT token 的 Refresh 換發機制
明天我們把到目前的專案整理一下
為目前 OAuth2 password login 的實作做一個總結
FastAPI : OAuth2 with Password (and hashing), Bearer with JWT tokens