!文章内容如有错误或排版问题,请提交反馈,非常感谢!
FastAPI的异步实现基于Python的asyncio库和ASGI(异步服务器网关接口)规范,能够高效处理I/O密集型和高并发请求。
FastAPI异步简介
基础概念
- 协程(Coroutine):通过async def 定义的函数,返回一个协程对象,需通过事件循环执行。
- 事件循环(Event Loop):管理异步任务的调度和执行(由ASGI服务器如Uvicorn处理)。
- 非阻塞I/O:在等待I/O操作(如数据库查询、HTTP请求)时释放CPU,处理其他任务。
基础架构
- 依赖的库:FastAPI构建于Starlette(异步Web框架)和Pydantic(数据验证),天然支持异步。
- ASGI服务器:需使用Uvicorn或Hypercorn等ASGI服务器运行,这些服务器基于asyncio 事件循环。
性能优势
- 高并发:单线程可处理数千个并发I/O操作,适合微服务、API网关等场景。
- 低延迟:通过非阻塞I/O减少等待时间。
注意事项
- 避免阻塞操作:在异步函数中不要直接调用同步I/O或CPU密集型代码。
- 正确使用await:确保所有异步调用都使用 await,否则会导致阻塞。
- 选择合适的库:使用支持异步的数据库驱动和HTTP客户端。
通过合理利用异步特性,FastAPI能够显著提升应用的吞吐量和响应速度。
FastAPI异步的使用
在FastAPI中,async/await 是处理异步操作的核心语法,基于Python的协程(coroutines)实现非阻塞I/O。
定义异步路由
基本异步路由
使用 async def 定义异步处理函数:
from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"message": "Hello World"}
异步操作示例
在路由中调用异步函数(如数据库查询):
async def fetch_data_from_db(): await asyncio.sleep(1) #模拟异步I/O操作 return "Data from DB" @app.get("/data") async def get_data(): data = await fetch_data_from_db() return {"data": data}
混合使用同步与异步代码
同步函数中的异步调用
错误方式:直接调用异步函数会抛出错误。
def sync_function(): data = fetch_data_from_db() #错误!返回协程对象而非实际数据 return data
正确方式:在同步函数中运行事件循环(不推荐,可能阻塞主线程)。
import asyncio def sync_function(): loop = asyncio.new_event_loop() data = loop.run_until_complete(fetch_data_from_db()) return data
异步函数中的同步调用
使用 asyncio.to_thread() 将同步代码放入线程池执行,避免阻塞事件循环:
import time import asyncio def blocking_io(): time.sleep(2) #同步阻塞操作 return "Result" @app.get("/blocking") async def non_blocking_endpoint(): result = await asyncio.to_thread(blocking_io) return {"result": result}
异步依赖注入
依赖函数可以是异步的:
async def get_async_db(): db = AsyncDBSession() try: yield db finally: await db.close() @app.get("/users/{user_id}") async def read_user(user_id: int, db: AsyncDBSession = Depends(get_async_db)): user = await db.get_user(user_id) return user
后台任务
在响应返回后执行异步后台任务(如发送邮件、数据处理)。
from fastapi import BackgroundTasks async def send_email(email: str): #模拟异步发送邮件 await email_service.send(email) @app.post("/register/") async def register_user(email: str, tasks: BackgroundTasks): tasks.add_task(send_email, email) return {"message": "Registration successful"}
调用异步库
与异步数据库驱动(如asyncpg、databases)或HTTP客户端(如httpx)配合使用。
import httpx @app.get("/external-api") async def call_external_api(): async with httpx.AsyncClient() as client: response = await client.get("https://api.example.com/data") return response.json()
并发执行异步任务
使用 asyncio.gather() 并行执行多个异步操作:
async def fetch_user(user_id: int): await asyncio.sleep(1) return f"User {user_id}" async def fetch_order(order_id: int): await asyncio.sleep(1) return f"Order {order_id}" @app.get("/concurrent") async def concurrent_requests(): user, order = await asyncio.gather( fetch_user(1), fetch_order(1) ) return {"user": user, "order": order}
异步流式响应
使用 StreamingResponse 处理大文件或实时数据流:
from fastapi.responses import StreamingResponse async def generate_large_data(): for i in range(10): await asyncio.sleep(0.1) yield f"Data chunk {i}\n" @app.get("/stream") async def stream_data(): return StreamingResponse(generate_large_data())
错误处理与中间件
异步异常处理
from fastapi import HTTPException @app.get("/error") async def raise_error(): try: await fetch_data_from_db() except Exception as e: raise HTTPException(status_code=500, detail=str(e))
异步中间件
通过 app.middleware(“http”) 或直接使用 Starlette 的中间件接口编写异步中间件。
@app.middleware("http") async def add_process_time_header(request: Request, call_next): start_time = time.time() response = await call_next(request) process_time = time.time() - start_time response.headers["X-Process-Time"] = str(process_time) return response
常见错误
忘记 await
@app.get("/error") async def error_example(): data = fetch_data_from_db() # 错误!未使用 await return {"data": data} # 实际返回协程对象,非数据
混合异步与同步代码
@app.get("/blocking-error") async def blocking_error(): time.sleep(5) # 同步阻塞,导致整个事件循环停滞 return {"message": "Done"}
性能对比
场景 | 同步代码 | 异步代码 |
100 个并发 I/O 操作 | 100 秒(串行) | 1 秒(并行) |
CPU 密集型任务 | 快(无切换开销) | 慢(事件循环开销) |
最佳实践
- I/O 密集型任务:优先使用异步(如数据库查询、HTTP 请求)。
- CPU 密集型任务:避免在异步函数中直接运行,改用线程池(to_thread())。
- 避免阻塞:不要在异步函数中调用同步阻塞代码(如 sleep())。
- 依赖管理:确保所有依赖(如数据库驱动)支持异步。
- 错误处理:使用 try/except 捕获异步操作中的异常。
参考链接: