术→技巧, 研发

FastAPI学习之异步实现

钱魏Way · · 208 次浏览
!文章内容如有错误或排版问题,请提交反馈,非常感谢!

FastAPI的异步实现基于Python的asyncio库和ASGI(异步服务器网关接口)规范,能够高效处理I/O密集型和高并发请求。

FastAPI异步简介

基础概念

  • 协程(Coroutine):通过async def 定义的函数,返回一个协程对象,需通过事件循环执行。
  • 事件循环(Event Loop):管理异步任务的调度和执行(由ASGI服务器如Uvicorn处理)。
  • 非阻塞I/O:在等待I/O操作(如数据库查询、HTTP请求)时释放CPU,处理其他任务。

基础架构

  • 依赖的库:FastAPI构建于Starlette(异步Web框架)和Pydantic(数据验证),天然支持异步。
  • ASGI服务器:需使用Uvicorn或Hypercorn等ASGI服务器运行,这些服务器基于asyncio 事件循环。

性能优势

  • 高并发:单线程可处理数千个并发I/O操作,适合微服务、API网关等场景。
  • 低延迟:通过非阻塞I/O减少等待时间。

注意事项

  • 避免阻塞操作:在异步函数中不要直接调用同步I/O或CPU密集型代码。
  • 正确使用await:确保所有异步调用都使用 await,否则会导致阻塞。
  • 选择合适的库:使用支持异步的数据库驱动和HTTP客户端。

通过合理利用异步特性,FastAPI能够显著提升应用的吞吐量和响应速度。

FastAPI异步的使用

在FastAPI中,async/await 是处理异步操作的核心语法,基于Python的协程(coroutines)实现非阻塞I/O。

定义异步路由

基本异步路由

使用 async def 定义异步处理函数:

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def read_root():
    return {"message": "Hello World"}

异步操作示例

在路由中调用异步函数(如数据库查询):

async def fetch_data_from_db():
    await asyncio.sleep(1) #模拟异步I/O操作
    return "Data from DB"

@app.get("/data")
async def get_data():
    data = await fetch_data_from_db()
    return {"data": data}

混合使用同步与异步代码

同步函数中的异步调用

错误方式:直接调用异步函数会抛出错误。

def sync_function():
    data = fetch_data_from_db() #错误!返回协程对象而非实际数据
    return data

正确方式:在同步函数中运行事件循环(不推荐,可能阻塞主线程)。

import asyncio

def sync_function():
    loop = asyncio.new_event_loop()
    data = loop.run_until_complete(fetch_data_from_db())
    return data

异步函数中的同步调用

使用 asyncio.to_thread() 将同步代码放入线程池执行,避免阻塞事件循环:

import time
import asyncio

def blocking_io():
    time.sleep(2) #同步阻塞操作
    return "Result"

@app.get("/blocking")
async def non_blocking_endpoint():
    result = await asyncio.to_thread(blocking_io)
    return {"result": result}

异步依赖注入

依赖函数可以是异步的:

async def get_async_db():
    db = AsyncDBSession()
    try:
        yield db
    finally:
        await db.close()

@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncDBSession = Depends(get_async_db)):
    user = await db.get_user(user_id)
    return user

后台任务

在响应返回后执行异步后台任务(如发送邮件、数据处理)。

from fastapi import BackgroundTasks

async def send_email(email: str):
    #模拟异步发送邮件
    await email_service.send(email)

@app.post("/register/")
async def register_user(email: str, tasks: BackgroundTasks):
    tasks.add_task(send_email, email)
    return {"message": "Registration successful"}

调用异步库

与异步数据库驱动(如asyncpg、databases)或HTTP客户端(如httpx)配合使用。

import httpx

@app.get("/external-api")
async def call_external_api():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
        return response.json()

并发执行异步任务

使用 asyncio.gather() 并行执行多个异步操作:

async def fetch_user(user_id: int):
    await asyncio.sleep(1)
    return f"User {user_id}"

async def fetch_order(order_id: int):
    await asyncio.sleep(1)
    return f"Order {order_id}"

@app.get("/concurrent")
async def concurrent_requests():
    user, order = await asyncio.gather(
        fetch_user(1),
        fetch_order(1)
    )
    return {"user": user, "order": order}

异步流式响应

使用 StreamingResponse 处理大文件或实时数据流:

from fastapi.responses import StreamingResponse

async def generate_large_data():
    for i in range(10):
        await asyncio.sleep(0.1)
        yield f"Data chunk {i}\n"

@app.get("/stream")
async def stream_data():
    return StreamingResponse(generate_large_data())

错误处理与中间件

异步异常处理

from fastapi import HTTPException

@app.get("/error")
async def raise_error():
    try:
        await fetch_data_from_db()
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

异步中间件

通过 app.middleware(“http”) 或直接使用 Starlette 的中间件接口编写异步中间件。

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

常见错误

忘记 await

@app.get("/error")
async def error_example():
    data = fetch_data_from_db() # 错误!未使用 await
    return {"data": data} # 实际返回协程对象,非数据

混合异步与同步代码

@app.get("/blocking-error")
async def blocking_error():
    time.sleep(5) # 同步阻塞,导致整个事件循环停滞
    return {"message": "Done"}

性能对比

场景 同步代码 异步代码
100 个并发 I/O 操作 100 秒(串行) 1 秒(并行)
CPU 密集型任务 快(无切换开销) 慢(事件循环开销)

最佳实践

  • I/O 密集型任务:优先使用异步(如数据库查询、HTTP 请求)。
  • CPU 密集型任务:避免在异步函数中直接运行,改用线程池(to_thread())。
  • 避免阻塞:不要在异步函数中调用同步阻塞代码(如 sleep())。
  • 依赖管理:确保所有依赖(如数据库驱动)支持异步。
  • 错误处理:使用 try/except 捕获异步操作中的异常。

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注