FastAPI基于Starlette,而Starlette提供了WebSocket支持,所以FastAPI自然也能处理WebSocket连接。
WebSocket的适用场景
WebSocket 是一种支持全双工通信的网络协议,适用于需要实时交互或服务器主动推送的场景。
实时通信应用
核心需求:客户端与服务端需要高频次、低延迟的双向数据交换。
示例:
- 在线聊天系统:用户发送消息后,服务端立即推送给其他用户。
- 多人在线游戏:实时同步玩家位置、状态和动作。
- 视频会议:传输音视频流和共享白板操作。
优势:
- 减少延迟:无需客户端轮询(如 HTTP 长轮询),服务端可直接推送数据。
- 节省资源:长连接避免重复建立 HTTP 请求的开销。
实时数据监控与推送
核心需求:服务端需要主动向客户端推送动态数据。
示例:
- 金融交易平台:实时股票价格、K线图更新。
- IoT 设备监控:传感器数据(温度、湿度)实时展示。
- 实时统计看板:在线用户数、订单成交量的动态刷新。
优势:
- 即时性:数据变化后立即推送,无需等待客户端请求。
- 高效性:适合高频数据更新(如每秒多次)。
协作工具与在线编辑
核心需求:多用户协同操作时,需实时同步状态。
示例:
- 文档协作:多人同时编辑同一文档(如 Google Docs)。
- 设计工具:实时同步设计稿的修改和评论。
- 项目管理面板:任务状态拖拽更新的实时同步。
优势:
- 双向同步:用户操作实时反馈给其他协作者。
- 冲突解决:通过 WebSocket 消息协调操作顺序。
实时通知与提醒
核心需求:服务端触发事件后,客户端需立即感知。
示例:
- 消息通知:社交媒体新消息、点赞提醒。
- 订单状态更新:电商订单支付成功、物流变动通知。
- 系统告警:服务器故障或安全事件的实时提醒。
优势:
- 精准触达:避免轮询带来的延迟或遗漏。
- 用户体验:即时反馈提升用户满意度。
在线游戏与交互体验
核心需求:需要快速同步玩家状态和游戏事件。
示例:
- 多人在线游戏:玩家移动、攻击动作的实时同步。
- 互动直播:弹幕、礼物打赏的实时显示。
- 虚拟现实(VR):用户动作和视角的实时传输。
优势:
- 低延迟:确保游戏操作的流畅性和公平性。
- 高吞吐量:支持大量玩家同时在线交互。
不适用 WebSocket 的场景
场景特点:单向通信或低频次请求。
示例:
- 静态资源加载:图片、CSS/JS 文件下载。
- 表单提交:用户提交数据后仅需单次响应。
- 简单 API 调用:如查询天气、获取用户信息。
替代方案:
- HTTP REST API:简单、易实现。
- Server-Sent Events (SSE):服务端单向推送(如新闻订阅)。
WebSocket 基础使用
WebSocket 的路由的差异
在 FastAPI 中,WebSocket 的路由与传统 HTTP 路由在设计和使用上有显著差异。以下是主要区别的详细解释:
协议与通信模式
- HTTP 路由:
- 协议:基于HTTP/HTTPS,遵循请求-响应模型。
- 通信方式:客户端发送请求,服务端返回响应后立即关闭连接。
- 类比:像打电话,一问一答后挂断。
- WebSocket 路由:
- 协议:基于 WebSocket(ws://或 wss://),提供全双工通信。
- 通信方式:建立持久连接后,客户端和服务端可以主动互相发送消息。
- 类比:像对讲机,连接后双方随时可对话。
路由定义方式
HTTP 路由:
from fastapi import FastAPI app = FastAPI() @app.get("/items/{item_id}") async def read_item(item_id: int): return {"item_id": item_id}
WebSocket 路由:
from fastapi import WebSocket @app.websocket("/ws/{room_id}") async def websocket_endpoint(websocket: WebSocket, room_id: str): await websocket.accept() # 处理连接...
关键区别:使用 @app.websocket 装饰器,参数包含 WebSocket 对象。
连接生命周期
HTTP 路由:
- 生命周期:单次请求,响应后立即结束。
- 状态管理:无状态(除非手动使用 Cookie 或 Token)。
WebSocket 路由:
- 生命周期:
- 客户端发起握手请求。
- 服务端显式接受连接(await websocket.accept())。
- 通过循环持续接收/发送消息。
- 连接关闭时处理清理逻辑(如close())。
- 状态保持:连接期间可维护状态(如用户会话、实时数据)。
数据交互方式
HTTP 路由:
- 数据传递:通过路径参数、查询参数、请求体(JSON/表单)等。
- 响应格式:通常返回 JSON、HTML 或文件。
WebSocket 路由:
- 数据传递:
- 文本:await websocket.receive_text()
- 二进制:await websocket.receive_bytes()
- JSON:await websocket.receive_json()
- 发送消息:
- 文本:await websocket.send_text(“Hello”)
- 二进制:await websocket.send_bytes(b”data”)
- JSON:await websocket.send_json({“key”: “value”})
参数与依赖注入
HTTP 路由:
- 支持路径参数、查询参数、请求体、Header、Cookie 等。
- 依赖注入(Depends)用于共享逻辑(如数据库会话)。
WebSocket 路由:
- 参数获取:
- 路径参数:直接在路由函数中声明(如room_id: str)。
- 查询参数:通过query_params 或依赖注入。
- 依赖注入:与 HTTP 路由类似,但需手动处理连接接受后的逻辑。
异常处理
HTTP 路由:抛出 HTTPException 自动生成对应状态码的响应。
示例:
from fastapi import HTTPException @app.get("/items/{item_id}") async def read_item(item_id: int): if item_id < 0: raise HTTPException(status_code=400, detail="Invalid ID") return {"item_id": item_id}
WebSocket 路由:需要手动捕获异常(如 WebSocketDisconnect)并关闭连接。
示例:
from fastapi import WebSocket, WebSocketDisconnect @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() # 处理消息 except WebSocketDisconnect: print("客户端断开连接") finally: await websocket.close()
性能与并发
- HTTP 路由:
- 短连接,适合低频请求。
- 高并发时依赖异步处理(如async def)和服务器配置(如 Uvicorn 工作进程)。
- WebSocket 路由:
- 长连接,需高效管理资源。
- 关键优化点:
- 使用异步操作(如await)避免阻塞。
- 合理管理连接池(如广播消息时遍历所有活跃连接)。
创建 WebSocket 路由
from fastapi import FastAPI, WebSocket app = FastAPI() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): # 接受客户端连接 await websocket.accept() try: while True: # 接收客户端消息(文本) data = await websocket.receive_text() # 处理消息并返回响应 await websocket.send_text(f"Server: {data}") except Exception as e: print(f"Error: {e}") finally: await websocket.close()
客户端连接示例(JavaScript)
const socket = new WebSocket("ws://localhost:8000/ws"); socket.onopen = () => { socket.send("Hello FastAPI!"); }; socket.onmessage = (event) => { console.log("Server says:", event.data); };
WebSocket核心功能实现
处理不同消息类型
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: # 接收不同类型消息 message = await websocket.receive() if message["type"] == "websocket.receive": if "text" in message: await websocket.send_text(f"Text: {message['text']}") elif "bytes" in message: await websocket.send_bytes(message["bytes"]) except WebSocketDisconnect: pass
广播消息(多客户端管理)
from fastapi import WebSocketDisconnect class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/chat") async def chat_websocket(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.broadcast(f"User: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast("A user left the chat")
身份验证
from fastapi.security import OAuth2PasswordBearer oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") @app.websocket("/auth-ws") async def auth_websocket( websocket: WebSocket, token: str = Query(...) # 从查询参数获取 token ): # 验证 token user = await authenticate_token(token) if not user: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return await websocket.accept() # ...处理消息...
心跳检测
import asyncio @app.websocket("/ws-with-ping") async def websocket_with_ping(websocket: WebSocket): await websocket.accept() try: while True: # 设置心跳间隔 await asyncio.wait_for( websocket.receive_text(), timeout=30 # 30秒无消息则断开 ) except asyncio.TimeoutError: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) except WebSocketDisconnect: pass
性能优化技巧
使用二进制协议
@app.websocket("/binary") async def binary_websocket(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_bytes() processed = process_binary(data) await websocket.send_bytes(processed) except WebSocketDisconnect: pass
消息压缩
from fastapi.middleware.gzip import GZipMiddleware app.add_middleware(GZipMiddleware) # 对文本消息自动压缩
测试方法
使用 Python 测试客户端
from fastapi.testclient import TestClient def test_websocket(): client = TestClient(app) with client.websocket_connect("/ws") as websocket: websocket.send_text("Hello") data = websocket.receive_text() assert data == "Echo: Hello"
浏览器端测试
<script> const ws = new WebSocket("ws://localhost:8000/ws"); ws.onmessage = (event) => console.log(event.data); ws.onopen = () => ws.send("Hello FastAPI"); </script>
部署注意事项
- 服务器选择:使用支持 WebSocket 的 ASGI 服务器(推荐 Uvicorn 或 Daphne)
uvicorn main:app --ws websockets
- 负载均衡:配置 Nginx 支持 WebSocket
proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade";
- 连接限制:监控连接数防止资源耗尽
完整示例:实时聊天室
服务端代码
from fastapi import FastAPI, WebSocket from typing import List app = FastAPI() class ChatManager: def __init__(self): self.connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.connections.append(websocket) await self.broadcast("New user joined!") async def broadcast(self, message: str): for conn in self.connections: await conn.send_text(message) async def disconnect(self, websocket: WebSocket): self.connections.remove(websocket) await websocket.close() chat_manager = ChatManager() @app.websocket("/chat") async def chat(websocket: WebSocket): await chat_manager.connect(websocket) try: while True: message = await websocket.receive_text() await chat_manager.broadcast(f"User: {message}") except: await chat_manager.disconnect(websocket)
客户端代码(HTML + JavaScript)
<!DOCTYPE html> <html> <body> <input id="message" placeholder="Type message"> <button onclick="sendMessage()">Send</button> <div id="chat"></div> <script> const socket = new WebSocket("ws://localhost:8000/chat"); const chatDiv = document.getElementById("chat"); socket.onmessage = (event) => { const p = document.createElement("p"); p.textContent = event.data; chatDiv.appendChild(p); }; function sendMessage() { const input = document.getElementById("message"); socket.send(input.value); input.value = ""; } </script> </body> </html>
测试 WebSocket 端点
使用 WebSocketTestClient 进行单元测试:
from fastapi.testclient import TestClient from fastapi.websockets import WebSocketDisconnect def test_websocket(): client = TestClient(app) try: # 连接并发送消息 with client.websocket_connect("/chat") as websocket: websocket.send_text("Hello") data = websocket.receive_text() assert data == "User: Hello" except WebSocketDisconnect: pass
通过以上实现,FastAPI 可以轻松构建高性能的实时 WebSocket 应用。建议根据业务需求选择合适的消息协议(如 JSON、Protobuf)和连接管理策略。
参考链接: