OAuth2 与 JWT
OAuth2 和 JWT 是两种常用于身份验证与授权的技术,但它们的核心目标和应用场景不同。
本质区别
特性 | OAuth2 | JWT |
定位 | 授权框架(定义资源访问的流程) | 令牌格式(安全传输信息的标准) |
核心目标 | 允许第三方应用安全访问用户资源 | 安全传递声明信息(如用户身份) |
协议/标准 | 协议(RFC 6749) | 数据格式(RFC 7519) |
依赖关系 | 不依赖特定令牌格式 | 可作为 OAuth2 的令牌实现方式 |
核心功能对比
OAuth2:授权委托
- 场景:用户允许第三方应用(如 GitHub 登录的网站)访问其资源(如用户的基本信息)。
- 流程:
- 用户授权 → 颁发访问令牌 → 3. 第三方用令牌访问资源服务器。
- 关键角色:
- 资源所有者(用户)
- 客户端(第三方应用)
- 授权服务器(颁发令牌)
- 资源服务器(存储用户数据)
JWT:信息传递
- 场景:服务端生成包含用户信息的令牌,客户端携带令牌证明身份。
- 结构:Payload.Signature(Base64 编码的 JSON)。
- Payload:携带声明(如用户 ID、过期时间)。
- Signature:防篡改验证(使用密钥签名)。
典型应用场景
技术 | 适用场景 |
OAuth2 | – 第三方登录(如“用微信登录”)<br>- API 访问授权(如允许应用读取用户数据) |
JWT | – 无状态用户认证(如前后端分离架构)<br>- 服务间安全通信(如微服务调用) |
协同工作示例
OAuth2 和 JWT 可以结合使用:
- OAuth2 颁发 JWT 作为访问令牌:
- 用户授权后,授权服务器生成 JWT 格式的访问令牌。
- 客户端携带 JWT 访问资源服务器,资源服务器验证 JWT 有效性。
- 优势:
- 减少对授权服务器的频繁验证请求(JWT 可自解析)。
- 支持无状态架构。
关键区别点
对比维度 | OAuth2 | JWT |
令牌类型 | 支持多种令牌(如 Bearer Token、JWT) | 固定为 JWT 格式 |
信息携带 | 令牌本身不携带用户信息(需查询授权服务器) | 令牌直接包含用户信息(自包含) |
状态管理 | 通常需存储令牌状态(如刷新令牌) | 无状态(验证签名即可) |
安全焦点 | 控制资源访问权限 | 确保信息传输的完整性和真实性 |
常见误解澄清
- 误区 1:OAuth2 是认证协议。事实:OAuth2 是授权框架,认证需结合其他协议(如 OpenID Connect)。
- 误区 2:JWT 必须与 OAuth2 一起使用。事实:JWT 可独立用于认证(如生成登录令牌),不依赖 OAuth2。
技术选型建议
- 选择 OAuth2 的场景:
- 需要第三方应用访问用户资源(如社交媒体登录)。
- 复杂授权流程(如不同权限级别的 API 访问)。
- 选择 JWT 的场景:
- 无状态认证(如 RESTful API)。
- 服务间安全通信(避免频繁查询数据库)。
API接口身份认证
在API接口开发中,选择合适的身份认证方案需根据应用场景、安全需求及用户体验综合考量。以下是常见方案的对比及适用场景指南:
API Key(API密钥)
- 实现方式:客户端在请求头或参数中携带唯一密钥(如X-API-Key: your_key)。
- 适用场景:
- 内部服务间通信。
- 低安全需求的开放API(如天气查询接口)。
- 优点:简单易实现,无复杂流程。
- 缺点:
- 密钥易泄露(需配合HTTPS)。
- 无动态权限控制。
FastAPI示例:
from fastapi import Depends, FastAPI, HTTPException from fastapi.security import APIKeyHeader app = FastAPI() api_key_header = APIKeyHeader(name="X-API-Key") def validate_api_key(api_key: str = Depends(api_key_header)): if api_key != "your_secret_key": raise HTTPException(status_code=403, detail="Invalid API Key") return api_key @app.get("/data/") async def get_data(api_key: str = Depends(validate_api_key)): return {"data": "protected_data"}
OAuth2 + JWT
- 实现方式:
- 用户登录后,授权服务器颁发JWT格式的访问令牌(Access Token)。
- 客户端在请求头携带令牌(如Authorization: Bearer <token>)。
- 适用场景:
- 第三方应用访问用户资源(如社交媒体API)。
- 需要细粒度权限控制(如读写分离)。
- 优点:
- 标准化协议,支持动态权限和令牌吊销。
- JWT自包含用户信息,减少数据库查询。
- 缺点:
- 需维护授权服务器。
- JWT令牌一旦泄露,有效期内容易被滥用。
FastAPI示例:
from fastapi import Depends, FastAPI from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") SECRET_KEY = "your_secret_key" async def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload.get("sub") except JWTError: raise HTTPException(status_code=401, detail="Invalid token") @app.get("/user/") async def get_user(current_user: str = Depends(get_current_user)): return {"user": current_user}
HMAC签名
- 实现方式:
- 客户端用密钥对请求参数生成签名(如SHA256)。
- 服务端验证签名一致性。
- 适用场景:
- 高安全性要求的API(如支付接口)。
- 防止请求篡改。
- 优点:
- 防重放攻击(需加入时间戳)。
- 无需传输密钥本身。
- 缺点:
- 实现较复杂。
- 客户端需安全存储密钥。
签名示例:
import hmac import hashlib import time def generate_signature(secret: str, params: dict) -> str: sorted_params = "&".join(f"{k}={v}" for k, v in sorted(params.items())) return hmac.new(secret.encode(), sorted_params.encode(), hashlib.sha256).hexdigest() # 客户端调用 params = {"user_id": 123, "timestamp": int(time.time())} signature = generate_signature("your_secret", params) headers = {"X-Signature": signature}
双向TLS(mTLS)
- 实现方式:客户端和服务端通过509证书互相验证身份。
- 适用场景:
- 企业级内部API(如银行系统)。
- 高安全要求的服务间通信。
- 优点:
- 最高级别的安全性。
- 防中间人攻击。
- 缺点:
- 证书管理复杂。
- 不适用于浏览器客户端。
方案选型决策树
- 是否需要第三方授权访问用户数据?
- 是→ 选 OAuth2。
- 否→ 进入下一步。
- 是否需要无状态且快速验证?
- 是→ 选 JWT。
- 否→ 进入下一步。
- 是否需防范请求篡改?
- 是→ 选 HMAC。
- 否→ 进入下一步。
- 是否为内部服务通信?
- 是→ 选 API Key 或 mTLS。
- 否→ 考虑 OAuth2 或 JWT。
安全实践建议
- 强制HTTPS:所有认证数据必须加密传输。
- 令牌短期有效:JWT设置短过期时间(如15分钟),并提供Refresh Token。
- 密钥安全管理:使用环境变量或密钥管理服务(如Vault)。
- 限流与监控:防止暴力破解,记录异常请求。
根据业务需求选择最简方案,并遵循最小权限原则,避免过度设计。
FastAPI接入OAuth2
在 FastAPI 中接入 OAuth2 认证可以通过内置的 OAuth2PasswordBearer 和 OAuth2PasswordRequestForm 实现。以下是完整的实现流程(以 密码授权模式 为例):
安装依赖
pip install fastapi uvicorn python-jose[cryptography] passlib python-multipart
- python-jose:用于生成/验证 JWT 令牌
- passlib:密码哈希加密
- python-multipart:处理表单数据
配置 OAuth2 核心逻辑
文件结构
. ├── main.py ├── auth.py └── schemas.py
定义 Pydantic 模型 (schemas.py)
from pydantic import BaseModel class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: str | None = None class User(BaseModel): username: str email: str | None = None disabled: bool | None = None class UserInDB(User): hashed_password: str
实现认证工具 (auth.py)
from datetime import datetime, timedelta from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from passlib.context import CryptContext from schemas import TokenData, UserInDB # 安全配置 SECRET_KEY = "your-secret-key-keep-it-secret" # 生产环境使用环境变量存储 ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 密码哈希上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # OAuth2 令牌获取端点 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # 模拟数据库(替换为真实数据库) fake_users_db = { "johndoe": { "username": "johndoe", "hashed_password": pwd_context.hash("secret"), "disabled": False } } def verify_password(plain_password: str, hashed_password: str): return pwd_context.verify(plain_password, hashed_password) def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def authenticate_user(db, username: str, password: str): user = get_user(db, username) if not user or not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无法验证凭证", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: Annotated[User, Depends(get_current_user)] ): if current_user.disabled: raise HTTPException(status_code=400, detail="用户未激活") return current_user
主应用逻辑 (main.py)
from fastapi import FastAPI, Depends, HTTPException from auth import ( authenticate_user, create_access_token, get_current_active_user, fake_users_db, ACCESS_TOKEN_EXPIRE_MINUTES ) from schemas import User, Token from datetime import timedelta app = FastAPI() @app.post("/token", response_model=Token) async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()] ): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me", response_model=User) async def read_users_me( current_user: Annotated[User, Depends(get_current_active_user)] ): return current_user if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
测试流程
获取令牌
curl -X POST "http://localhost:8000/token" \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "username=johndoe&password=secret"
响应示例:
{ "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type": "bearer" }
访问受保护端点
curl -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." http://localhost:8000/users/me
响应示例:
{ "username": "johndoe", "email": null, "disabled": false }
高级配置
使用环境变量
通过 pydantic-settings 管理敏感配置:
from pydantic_settings import BaseSettings class Settings(BaseSettings): secret_key: str algorithm: str = "HS256" access_token_expire_minutes: int = 30 class Config: env_file = ".env" settings = Settings()
集成第三方 OAuth2 提供商(如 Google)
使用 authlib 库:
from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( name='google', client_id=GOOGLE_CLIENT_ID, client_secret=GOOGLE_CLIENT_SECRET, server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'} ) @app.get("/login/google") async def login_google(request: Request): redirect_uri = request.url_for('auth_google') return await oauth.google.authorize_redirect(request, redirect_uri)
安全注意事项
- 生产环境必须使用 HTTPS
- SECRET_KEY 必须保密(不要硬编码在代码中)
- 合理设置令牌过期时间
- 使用强密码哈希算法(如 bcrypt)
- 定期轮换密钥(尤其是发生泄露时)
通过以上步骤,即可在 FastAPI 中实现完整的 OAuth2 认证流程。根据业务需求,可以扩展支持更多授权模式(如 Authorization Code Flow)。
FastAPI接入JWT
在 FastAPI 中接入 JWT(JSON Web Token)认证可以通过以下步骤实现,这是完整的实现指南:
安装依赖
pip install fastapi uvicorn python-jose[cryptography] passlib python-multipart
- python-jose:JWT 编码/解码
- passlib:密码哈希加密
- python-multipart:处理表单数据
核心代码结构
. ├── main.py # 主应用 ├── auth.py # JWT 认证逻辑 └── schemas.py # 数据模型
定义数据模型 (schemas.py)
from pydantic import BaseModel class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: str | None = None class User(BaseModel): username: str disabled: bool = False class UserInDB(User): hashed_password: str
实现 JWT 工具类 (auth.py)
from datetime import datetime, timedelta from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from passlib.context import CryptContext from schemas import TokenData, UserInDB # 安全配置(生产环境应使用环境变量) SECRET_KEY = "your-secret-key-keep-it-secret" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 密码哈希上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 定义令牌获取端点 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # 模拟用户数据库 fake_users_db = { "johndoe": { "username": "johndoe", "hashed_password": pwd_context.hash("secret"), "disabled": False } } # ---------- 工具函数 ---------- def verify_password(plain_password: str, hashed_password: str): return pwd_context.verify(plain_password, hashed_password) def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def authenticate_user(db, username: str, password: str): user = get_user(db, username) if not user or not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无法验证凭证", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if not username: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if not user: raise credentials_exception return user async def get_current_active_user( current_user: Annotated[User, Depends(get_current_user)] ): if current_user.disabled: raise HTTPException(status_code=400, detail="用户未激活") return current_user
主应用逻辑 (main.py)
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from auth import ( authenticate_user, create_access_token, get_current_active_user, fake_users_db, ACCESS_TOKEN_EXPIRE_MINUTES ) from schemas import Token, User from datetime import timedelta app = FastAPI() # ---------- 认证端点 ---------- @app.post("/token", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends() ): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} # ---------- 受保护端点 ---------- @app.get("/users/me", response_model=User) async def read_users_me( current_user: User = Depends(get_current_active_user) ): return current_user if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
测试流程
获取 JWT 令牌
curl -X POST "http://localhost:8000/token" \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "username=johndoe&password=secret"
响应示例:
{ "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type": "bearer" }
访问受保护端点
curl -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." http://localhost:8000/users/me
响应示例:
{ "username": "johndoe", "disabled": false }
高级配置
环境变量配置
创建 .env 文件:
SECRET_KEY=your_secure_key_here ALGORITHM=HS256 ACCESS_TOKEN_EXPIRE_MINUTES=30
使用 pydantic-settings 管理配置:
from pydantic_settings import BaseSettings class Settings(BaseSettings): secret_key: str algorithm: str = "HS256" access_token_expire_minutes: int = 30 class Config: env_file = ".env" settings = Settings()
刷新令牌机制
def create_refresh_token(data: dict): return jwt.encode( data, settings.secret_key, algorithm=settings.algorithm ) @app.post("/refresh-token") async def refresh_token(refresh_token: str): try: payload = jwt.decode( refresh_token, settings.secret_key, algorithms=[settings.algorithm] ) new_access_token = create_access_token({"sub": payload["sub"]}) return {"access_token": new_access_token} except JWTError: raise HTTPException(status_code=401, detail="无效的刷新令牌")
安全最佳实践
- 永远不要硬编码密钥:使用环境变量或密钥管理服务
- 使用强加密算法:推荐 HS256 或 RS256
- 设置合理过期时间:访问令牌建议 15-60 分钟,刷新令牌可设置数天
- 启用 HTTPS:生产环境必须使用 HTTPS
- 令牌存储安全:前端应使用 HttpOnly cookies 或安全存储机制
通过以上步骤,即可在 FastAPI 中实现完整的 JWT 认证系统。根据业务需求可扩展以下功能:
- 角色权限管理
- 令牌黑名单
- 多设备登录管理
- 第三方登录集成