术→技巧, 研发

FastAPI学习之接口认证

钱魏Way · · 92 次浏览

OAuth2 与 JWT

OAuth2 和 JWT 是两种常用于身份验证与授权的技术,但它们的核心目标和应用场景不同。

本质区别

特性 OAuth2 JWT
定位 授权框架(定义资源访问的流程) 令牌格式(安全传输信息的标准)
核心目标 允许第三方应用安全访问用户资源 安全传递声明信息(如用户身份)
协议/标准 协议(RFC 6749) 数据格式(RFC 7519)
依赖关系 不依赖特定令牌格式 可作为 OAuth2 的令牌实现方式

核心功能对比

OAuth2:授权委托

  • 场景:用户允许第三方应用(如 GitHub 登录的网站)访问其资源(如用户的基本信息)。
  • 流程
    • 用户授权 → 颁发访问令牌 → 3. 第三方用令牌访问资源服务器。
  • 关键角色
    • 资源所有者(用户)
    • 客户端(第三方应用)
    • 授权服务器(颁发令牌)
    • 资源服务器(存储用户数据)

JWT:信息传递

  • 场景:服务端生成包含用户信息的令牌,客户端携带令牌证明身份。
  • 结构:Payload.Signature(Base64 编码的 JSON)。
    • Payload:携带声明(如用户 ID、过期时间)。
    • Signature:防篡改验证(使用密钥签名)。

典型应用场景

技术 适用场景
OAuth2 – 第三方登录(如“用微信登录”)<br>- API 访问授权(如允许应用读取用户数据)
JWT – 无状态用户认证(如前后端分离架构)<br>- 服务间安全通信(如微服务调用)

协同工作示例

OAuth2 和 JWT 可以结合使用

  • OAuth2 颁发 JWT 作为访问令牌
    • 用户授权后,授权服务器生成 JWT 格式的访问令牌。
    • 客户端携带 JWT 访问资源服务器,资源服务器验证 JWT 有效性。
  • 优势
    • 减少对授权服务器的频繁验证请求(JWT 可自解析)。
    • 支持无状态架构。

关键区别点

对比维度 OAuth2 JWT
令牌类型 支持多种令牌(如 Bearer Token、JWT) 固定为 JWT 格式
信息携带 令牌本身不携带用户信息(需查询授权服务器) 令牌直接包含用户信息(自包含)
状态管理 通常需存储令牌状态(如刷新令牌) 无状态(验证签名即可)
安全焦点 控制资源访问权限 确保信息传输的完整性和真实性

常见误解澄清

  • 误区 1:OAuth2 是认证协议。事实:OAuth2 是授权框架,认证需结合其他协议(如 OpenID Connect)。
  • 误区 2:JWT 必须与 OAuth2 一起使用。事实:JWT 可独立用于认证(如生成登录令牌),不依赖 OAuth2。

技术选型建议

  • 选择 OAuth2 的场景
    • 需要第三方应用访问用户资源(如社交媒体登录)。
    • 复杂授权流程(如不同权限级别的 API 访问)。
  • 选择 JWT 的场景
    • 无状态认证(如 RESTful API)。
    • 服务间安全通信(避免频繁查询数据库)。

API接口身份认证

在API接口开发中,选择合适的身份认证方案需根据应用场景、安全需求及用户体验综合考量。以下是常见方案的对比及适用场景指南:

API Key(API密钥)

  • 实现方式:客户端在请求头或参数中携带唯一密钥(如X-API-Key: your_key)。
  • 适用场景
    • 内部服务间通信。
    • 低安全需求的开放API(如天气查询接口)。
  • 优点:简单易实现,无复杂流程。
  • 缺点
    • 密钥易泄露(需配合HTTPS)。
    • 无动态权限控制。

FastAPI示例:

from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import APIKeyHeader

app = FastAPI()
api_key_header = APIKeyHeader(name="X-API-Key")

def validate_api_key(api_key: str = Depends(api_key_header)):
    if api_key != "your_secret_key":
        raise HTTPException(status_code=403, detail="Invalid API Key")
    return api_key

@app.get("/data/")
async def get_data(api_key: str = Depends(validate_api_key)):
    return {"data": "protected_data"}

OAuth2 + JWT

  • 实现方式
    • 用户登录后,授权服务器颁发JWT格式的访问令牌(Access Token)。
    • 客户端在请求头携带令牌(如Authorization: Bearer <token>)。
  • 适用场景
    • 第三方应用访问用户资源(如社交媒体API)。
    • 需要细粒度权限控制(如读写分离)。
  • 优点
    • 标准化协议,支持动态权限和令牌吊销。
    • JWT自包含用户信息,减少数据库查询。
  • 缺点
    • 需维护授权服务器。
    • JWT令牌一旦泄露,有效期内容易被滥用。

FastAPI示例:

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "your_secret_key"

async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload.get("sub")
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.get("/user/")
async def get_user(current_user: str = Depends(get_current_user)):
    return {"user": current_user}

HMAC签名

  • 实现方式
    • 客户端用密钥对请求参数生成签名(如SHA256)。
    • 服务端验证签名一致性。
  • 适用场景
    • 高安全性要求的API(如支付接口)。
    • 防止请求篡改。
  • 优点
    • 防重放攻击(需加入时间戳)。
    • 无需传输密钥本身。
  • 缺点
    • 实现较复杂。
    • 客户端需安全存储密钥。

签名示例:

import hmac
import hashlib
import time

def generate_signature(secret: str, params: dict) -> str:
    sorted_params = "&".join(f"{k}={v}" for k, v in sorted(params.items()))
    return hmac.new(secret.encode(), sorted_params.encode(), hashlib.sha256).hexdigest()

# 客户端调用
params = {"user_id": 123, "timestamp": int(time.time())}
signature = generate_signature("your_secret", params)
headers = {"X-Signature": signature}

双向TLS(mTLS)

  • 实现方式:客户端和服务端通过509证书互相验证身份。
  • 适用场景
    • 企业级内部API(如银行系统)。
    • 高安全要求的服务间通信。
  • 优点
    • 最高级别的安全性。
    • 防中间人攻击。
  • 缺点
    • 证书管理复杂。
    • 不适用于浏览器客户端。

方案选型决策树

  • 是否需要第三方授权访问用户数据?
    • → 选 OAuth2
    • → 进入下一步。
  • 是否需要无状态且快速验证?
    • → 选 JWT
    • → 进入下一步。
  • 是否需防范请求篡改?
    • → 选 HMAC
    • → 进入下一步。
  • 是否为内部服务通信?
    • → 选 API Key 或 mTLS
    • → 考虑 OAuth2 或 JWT

安全实践建议

  • 强制HTTPS:所有认证数据必须加密传输。
  • 令牌短期有效:JWT设置短过期时间(如15分钟),并提供Refresh Token。
  • 密钥安全管理:使用环境变量或密钥管理服务(如Vault)。
  • 限流与监控:防止暴力破解,记录异常请求。

根据业务需求选择最简方案,并遵循最小权限原则,避免过度设计。

FastAPI接入OAuth2

在 FastAPI 中接入 OAuth2 认证可以通过内置的 OAuth2PasswordBearer 和 OAuth2PasswordRequestForm 实现。以下是完整的实现流程(以 密码授权模式 为例):

安装依赖

pip install fastapi uvicorn python-jose[cryptography] passlib python-multipart
  • python-jose:用于生成/验证 JWT 令牌
  • passlib:密码哈希加密
  • python-multipart:处理表单数据

配置 OAuth2 核心逻辑

文件结构

.
├── main.py
├── auth.py
└── schemas.py

定义 Pydantic 模型 (schemas.py)

from pydantic import BaseModel

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    email: str | None = None
    disabled: bool | None = None

class UserInDB(User):
    hashed_password: str

实现认证工具 (auth.py)

from datetime import datetime, timedelta
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from schemas import TokenData, UserInDB

# 安全配置
SECRET_KEY = "your-secret-key-keep-it-secret"  # 生产环境使用环境变量存储
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码哈希上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2 令牌获取端点
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模拟数据库(替换为真实数据库)
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "hashed_password": pwd_context.hash("secret"),
        "disabled": False
    }
}

def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭证",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用户未激活")
    return current_user

主应用逻辑 (main.py)

from fastapi import FastAPI, Depends, HTTPException
from auth import (
    authenticate_user,
    create_access_token,
    get_current_active_user,
    fake_users_db,
    ACCESS_TOKEN_EXPIRE_MINUTES
)
from schemas import User, Token
from datetime import timedelta

app = FastAPI()

@app.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

测试流程

获取令牌

curl -X POST "http://localhost:8000/token" \
     -H "Content-Type: application/x-www-form-urlencoded" \
     -d "username=johndoe&password=secret"

响应示例:

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer"
}

访问受保护端点

curl -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." http://localhost:8000/users/me

响应示例:

{
  "username": "johndoe",
  "email": null,
  "disabled": false
}

高级配置

使用环境变量

通过 pydantic-settings 管理敏感配置:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"

settings = Settings()

集成第三方 OAuth2 提供商(如 Google)

使用 authlib 库:

from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
    name='google',
    client_id=GOOGLE_CLIENT_ID,
    client_secret=GOOGLE_CLIENT_SECRET,
    server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
    client_kwargs={'scope': 'openid email profile'}
)

@app.get("/login/google")
async def login_google(request: Request):
    redirect_uri = request.url_for('auth_google')
    return await oauth.google.authorize_redirect(request, redirect_uri)

安全注意事项

  • 生产环境必须使用 HTTPS
  • SECRET_KEY 必须保密(不要硬编码在代码中)
  • 合理设置令牌过期时间
  • 使用强密码哈希算法(如 bcrypt)
  • 定期轮换密钥(尤其是发生泄露时)

通过以上步骤,即可在 FastAPI 中实现完整的 OAuth2 认证流程。根据业务需求,可以扩展支持更多授权模式(如 Authorization Code Flow)。

FastAPI接入JWT

在 FastAPI 中接入 JWT(JSON Web Token)认证可以通过以下步骤实现,这是完整的实现指南:

安装依赖

pip install fastapi uvicorn python-jose[cryptography] passlib python-multipart
  • python-jose:JWT 编码/解码
  • passlib:密码哈希加密
  • python-multipart:处理表单数据

核心代码结构

.
├── main.py         # 主应用
├── auth.py         # JWT 认证逻辑
└── schemas.py      # 数据模型

定义数据模型 (schemas.py)

from pydantic import BaseModel

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    disabled: bool = False

class UserInDB(User):
    hashed_password: str

实现 JWT 工具类 (auth.py)

from datetime import datetime, timedelta
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from schemas import TokenData, UserInDB

# 安全配置(生产环境应使用环境变量)
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码哈希上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 定义令牌获取端点
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模拟用户数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "hashed_password": pwd_context.hash("secret"),
        "disabled": False
    }
}

# ---------- 工具函数 ----------
def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭证",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    
    user = get_user(fake_users_db, username=token_data.username)
    if not user:
        raise credentials_exception
    return user

async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用户未激活")
    return current_user

主应用逻辑 (main.py)

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from auth import (
    authenticate_user,
    create_access_token,
    get_current_active_user,
    fake_users_db,
    ACCESS_TOKEN_EXPIRE_MINUTES
)
from schemas import Token, User
from datetime import timedelta

app = FastAPI()

# ---------- 认证端点 ----------
@app.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends()
):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# ---------- 受保护端点 ----------
@app.get("/users/me", response_model=User)
async def read_users_me(
    current_user: User = Depends(get_current_active_user)
):
    return current_user

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

测试流程

获取 JWT 令牌

curl -X POST "http://localhost:8000/token" \
     -H "Content-Type: application/x-www-form-urlencoded" \
     -d "username=johndoe&password=secret"

响应示例:

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer"
}

访问受保护端点

curl -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." http://localhost:8000/users/me

响应示例:

{
  "username": "johndoe",
  "disabled": false
}

高级配置

环境变量配置

创建 .env 文件:

SECRET_KEY=your_secure_key_here
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30

使用 pydantic-settings 管理配置:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"

settings = Settings()

刷新令牌机制

def create_refresh_token(data: dict):
    return jwt.encode(
        data, 
        settings.secret_key, 
        algorithm=settings.algorithm
    )

@app.post("/refresh-token")
async def refresh_token(refresh_token: str):
    try:
        payload = jwt.decode(
            refresh_token,
            settings.secret_key,
            algorithms=[settings.algorithm]
        )
        new_access_token = create_access_token({"sub": payload["sub"]})
        return {"access_token": new_access_token}
    except JWTError:
        raise HTTPException(status_code=401, detail="无效的刷新令牌")

安全最佳实践

  • 永远不要硬编码密钥:使用环境变量或密钥管理服务
  • 使用强加密算法:推荐 HS256 或 RS256
  • 设置合理过期时间:访问令牌建议 15-60 分钟,刷新令牌可设置数天
  • 启用 HTTPS:生产环境必须使用 HTTPS
  • 令牌存储安全:前端应使用 HttpOnly cookies 或安全存储机制

通过以上步骤,即可在 FastAPI 中实现完整的 JWT 认证系统。根据业务需求可扩展以下功能:

  • 角色权限管理
  • 令牌黑名单
  • 多设备登录管理
  • 第三方登录集成

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注