术→技巧, 研发

FastAPI学习之中间件

钱魏Way · · 134 次浏览
!文章内容如有错误或排版问题,请提交反馈,非常感谢!

在 FastAPI 中接入中间件(Middleware)非常简单,因为 FastAPI 基于 Starlette 框架,可以直接使用 Starlette 的中间件系统。

中间件基础概念

中间件是 FastAPI 中用于拦截HTTP 请求和响应的组件,它在请求到达路由处理函数之前和响应返回客户端之前执行。

  • 类比:类似于”洋葱模型”,中间件层层包裹路由处理函数,可修改请求或响应。
  • 核心作用:处理全局性或跨路由的通用逻辑(如日志、鉴权、限流)。

中间件的执行流程

中间件按添加顺序 逆序执行(类似洋葱模型):

app.add_middleware(MiddlewareA) # 外层
app.add_middleware(MiddlewareB) # 内层

# 执行顺序:
# MiddlewareA 请求前 → MiddlewareB 请求前 → 路由处理
# MiddlewareB 响应后 → MiddlewareA 响应后

关键特性

  • 中间件按添加顺序反向嵌套 执行(先添加的中间件在外层)。
  • 可修改请求对象(如添加 Headers)或响应对象(如修改状态码)。

中间件的核心用途

场景 示例
日志记录 记录请求的 URL、方法、耗时,响应的状态码。
身份验证 验证 API 密钥或 JWT Token,拦截非法请求。
跨域资源共享(CORS) 添加 Access-Control-Allow-Origin 头,支持前端跨域请求。
请求限流 限制单个 IP 的请求频率,防止滥用。
异常处理 捕获全局异常,返回统一格式的错误响应。
数据预处理 修改请求 Body(如解密数据)或添加全局请求头。

常用内置中间件及使用

CORS 跨域中间件

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], # 允许的源列表
    allow_credentials=True,
    allow_methods=["*"], # 允许的 HTTP 方法
    allow_headers=["*"], # 允许的请求头
)

GZip压缩中间件

from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(
    GZipMiddleware,
    minimum_size=1000 # 仅压缩大于 1000 字节的响应
)

HTTPS重定向中间件

from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

app.add_middleware(HTTPSRedirectMiddleware) # 强制所有请求使用 HTTPS

信任代理头部中间件

from fastapi.middleware.trustedhost import TrustedHostMiddleware

app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["example.com", "*.example.com"] # 允许的 Host 头
)

自定义中间件开发

基础模板

from fastapi import Request
from fastapi.responses import Response

async def custom_middleware(request: Request, call_next):
    # 请求处理前逻辑
    print(f"Request started: {request.url}")

    response = await call_next(request)

    # 响应返回前逻辑
    response.headers["X-Custom-Header"] = "FastAPI"
    return response

app.middleware("http")(custom_middleware)

示例:请求耗时统计

import time

async def timing_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

app.middleware("http")(timing_middleware)

示例:API密钥认证

```html
from fastapi import HTTPException, status

@app.middleware("http")
async def verify_api_key(request: Request, call_next):
    api_key = request.headers.get("X-API-Key")
    if api_key != "SECRET_KEY":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid API Key"
        )
    return await call_next(request)

第三方中间件集成

Prometheus监控中间件

pip install starlette-prometheus
from starlette_prometheus import PrometheusMiddleware

app.add_middleware(PrometheusMiddleware)

Sentry错误监控

pip install sentry-sdk
import sentry_sdk
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware

sentry_sdk.init(dsn="your-sentry-dsn")
app.add_middleware(SentryAsgiMiddleware)

同步 vs 异步中间件

核心对比速览

特性 同步中间件 异步中间件
定义方式 普通函数(def) 协程函数(async def)
I/O操作兼容性 阻塞性操作(如同步数据库查询) 非阻塞异步操作(如 asyncpg、httpx)
执行线程 在主线程中运行,可能阻塞事件循环 在事件循环中异步执行,不阻塞其他任务
性能影响 高并发场景下性能较差 高并发下性能更优
适用场景 简单逻辑、无I/O或CPU密集型任务 涉及网络请求、数据库访问等异步操作

代码定义对比

同步中间件

from fastapi import Request

@app.middleware("http")
def sync_middleware(request: Request, call_next):
    # 同步操作(可能阻塞事件循环)
    print("Sync Middleware: Request received")
    response = call_next(request)
    return response

异步中间件

from fastapi import Request

@app.middleware("http")
async def async_middleware(request: Request, call_next):
    # 异步操作(非阻塞)
    print("Async Middleware: Request received")
    response = await call_next(request)
    return response

执行流程差异

同步中间件

┌───────────────────────┐
│ 同步中间件 │→阻塞事件循环,直到处理完成
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 路由处理函数 │
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 同步中间件 │
└───────────────────────┘

异步中间件

┌───────────────────────┐
│ 异步中间件 │→挂起协程,释放事件循环处理其他请求
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 路由处理函数 │
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 异步中间件 │
└───────────────────────┘

性能关键点

场景 同步中间件 异步中间件
无I/O操作 无显著差异 无显著差异
涉及网络/数据库请求 阻塞事件循环,导致请求堆积,延迟增加 非阻塞,高并发下吞吐量更高
CPU密集型任务 可能拖慢整体性能(如图像处理) 同样会阻塞事件循环,需用多进程优化

使用场景示例

适合同步中间件的场景

简单日志记录

@app.middleware("http")
def log_request(request: Request, call_next):
    print(f"Request: {request.method} {request.url}")
    response = call_next(request)
    return response

同步数据处理(如CPU计算):

def heavy_calculation():
    # 模拟CPU密集型任务
    return sum(i*i for i in range(10**6))

@app.middleware("http")
def cpu_middleware(request: Request, call_next):
    result = heavy_calculation() # 同步阻塞
    response = call_next(request)
    response.headers["X-Calculation"] = str(result)
    return response

适合异步中间件的场景

异步数据库查询
“`

@app.middleware("http")
async def db_middleware(request: Request, call_next):
    async with AsyncSession() as session:
        request.state.db = session
        response = await call_next(request)
        return response

调用外部API

import httpx

@app.middleware("http")
async def external_api_middleware(request: Request, call_next):
    async with httpx.AsyncClient() as client:
        # 非阻塞请求外部服务
        resp = await client.get("https://api.example.com/data")
        request.state.external_data = resp.json()
        response = await call_next(request)
        return response

混合使用的注意事项

若同时存在同步和异步中间件,需注意执行顺序对性能的影响:

# 添加顺序决定执行顺序(外层到内层)

app.add_middleware(sync_middleware) # 外层同步中间件

app.add_middleware(async_middleware) # 内层异步中间件

  • 潜在问题:外层的同步中间件若包含阻塞操作,会降低内层异步中间件的性能优势。
  • 最佳实践:尽量统一中间件类型(全同步或全异步),或确保外层中间件无阻塞。

异常处理差异

异常类型 同步中间件 异步中间件
同步异常 直接抛出,中断请求流程 需用 try/except 捕获
异步异常 无法正确处理(需手动处理) 通过 await 自动传播异常

总结:如何选择?

决策因素 选择同步中间件 选择异步中间件
是否涉及I/O操作 ❌避免使用(除非逻辑极简单) ✅优先使用
代码库整体异步程度 项目以同步代码为主 项目主要使用异步库(如 asyncpg)
高并发需求 ❌不适用 ✅必要选择
维护成本 简单易读 需熟悉异步编程概念

 

最终建议

  • 默认使用异步中间件,尤其是在涉及网络、数据库等I/O操作时。
  • 仅在无I/O且逻辑简单时使用同步中间件,避免阻塞事件循环。
  • 通过性能测试(如locust)验证中间件对吞吐量的影响。
  • 避免在中间件中进行耗时同步操作
  • 使用BackgroundTasks 处理非即时需求(如日志写入)

中间件与依赖注入

核心概念对比

特性 中间件(Middleware) 依赖注入(Dependency Injection)
定位 全局或路由级别的请求/响应拦截器 面向路径操作函数的参数注入机制
执行时机 在请求到达路由前和响应返回前执行(包裹整个处理流程) 在路径操作函数执行前解析依赖,并注入到函数参数中
主要用途 处理与HTTP协议相关的横切关注点(如日志、认证头) 解耦业务逻辑、复用代码(如获取数据库连接、用户鉴权)
作用范围 全局或路由组级别 精确到单个路径操作函数或子路由
数据传递 通过请求状态(request.state)传递数据 直接通过函数参数传递数据
修改请求/响应能力 ✅可修改请求或响应对象(如添加Headers) ❌仅提供数据,不直接操作请求/响应对象

使用场景对比

中间件适用场景

全局性处理:

from fastapi import Request

@app.middleware("http")
async def log_requests(request: Request, call_next):
    print(f"Request: {request.method} {request.url}")
    response = await call_next(request)
    print(f"Response: {response.status_code}")
    return response
  • 日志记录、异常捕获、跨域(CORS)、限流、IP白名单等。
  • 修改请求/响应头(如添加X-Request-ID)。

路由组中间件:

sub_app = FastAPI()
sub_app.middleware("http")(auth_middleware)

依赖注入适用场景

复用逻辑

from fastapi import Depends

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/items/")
async def read_items(db: Session = Depends(get_db)):
    return db.query(Item).all()
  • 数据库连接、用户身份验证(如get_current_user)。
  • 请求参数预处理(如分页参数解析)。

分层依赖

def get_query_params(q: str = None, page: int = 1):
    return {"q": q, "page": page}

@app.get("/search/")
async def search(params: dict = Depends(get_query_params)):
    return params

执行顺序与数据流

中间件与依赖注入的协作流程

┌───────────────────────┐
│ 中间件1 │
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 中间件2 │
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 依赖注入解析(如数据库) │
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 路径操作函数(路由) │
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 中间件2 │
└──────────┬────────────┘
           │
┌──────────▼────────────┐
│ 中间件1 │
└───────────────────────┘

关键点

  • 中间件在依赖注入之前执行(如认证中间件先验证Token,依赖注入再获取用户信息)。
  • 依赖注入的参数在路由函数执行前解析完成。

代码示例对比

用户认证实现对比

中间件方式(处理全局认证):

@app.middleware("http")
async def auth_middleware(request: Request, call_next):
    token = request.headers.get("Authorization")
    if not validate_token(token):
        return JSONResponse(status_code=403, content={"error": "Forbidden"})
    request.state.user = get_user_from_token(token)
    return await call_next(request)

依赖注入方式(精确到路由):

def get_current_user(token: str = Header(...)):
    if not validate_token(token):
        raise HTTPException(status_code=403, detail="Forbidden")
    return get_user_from_token(token)

@app.get("/profile/")
async def profile(user: User = Depends(get_current_user)):
    return {"user": user.name}

数据传递对比

中间件传递数据

request.state.user_id = 123  # 中间件中设置

依赖注入传递数据

user_id: int = Depends(get_user_id)  # 直接作为参数注入

性能与灵活性

维度 中间件 依赖注入
性能影响 全局生效,可能影响所有请求 按需注入,仅影响使用该依赖的路由
可测试性 需模拟请求对象 直接调用依赖函数,易于单元测试
代码耦合度 与HTTP层耦合度高 业务逻辑解耦,依赖项可独立替换
复用性 适用于横跨多个路由的通用逻辑 支持细粒度复用(如单个参数解析)

总结:何时使用哪种?

场景 选择中间件 选择依赖注入
需要修改请求/响应头或Body
全局性日志或异常处理
路由级别的用户鉴权 ❌(除非路由组统一处理)
数据库连接管理
复杂业务逻辑解耦
跨域(CORS)配置 ✅(使用 CORSMiddleware)

最终结论

  • 中间件:处理HTTP协议层面、全局性的横切关注点。
  • 依赖注入:管理业务逻辑的依赖关系,实现代码解耦和复用。

两者可结合使用(如中间件处理Token验证,依赖注入获取用户对象),但需明确职责边界。

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注