术→技巧, 研发

FastAPI学习之WebSocket

钱魏Way · · 119 次浏览

FastAPI基于Starlette,而Starlette提供了WebSocket支持,所以FastAPI自然也能处理WebSocket连接。

WebSocket的适用场景

WebSocket 是一种支持全双工通信的网络协议,适用于需要实时交互服务器主动推送的场景。

实时通信应用

核心需求:客户端与服务端需要高频次、低延迟的双向数据交换。

示例:

  • 在线聊天系统:用户发送消息后,服务端立即推送给其他用户。
  • 多人在线游戏:实时同步玩家位置、状态和动作。
  • 视频会议:传输音视频流和共享白板操作。

优势:

  • 减少延迟:无需客户端轮询(如 HTTP 长轮询),服务端可直接推送数据。
  • 节省资源:长连接避免重复建立 HTTP 请求的开销。

实时数据监控与推送

核心需求:服务端需要主动向客户端推送动态数据。

示例:

  • 金融交易平台:实时股票价格、K线图更新。
  • IoT 设备监控:传感器数据(温度、湿度)实时展示。
  • 实时统计看板:在线用户数、订单成交量的动态刷新。

优势:

  • 即时性:数据变化后立即推送,无需等待客户端请求。
  • 高效性:适合高频数据更新(如每秒多次)。

协作工具与在线编辑

核心需求:多用户协同操作时,需实时同步状态。

示例:

  • 文档协作:多人同时编辑同一文档(如 Google Docs)。
  • 设计工具:实时同步设计稿的修改和评论。
  • 项目管理面板:任务状态拖拽更新的实时同步。

优势:

  • 双向同步:用户操作实时反馈给其他协作者。
  • 冲突解决:通过 WebSocket 消息协调操作顺序。

实时通知与提醒

核心需求:服务端触发事件后,客户端需立即感知。

示例:

  • 消息通知:社交媒体新消息、点赞提醒。
  • 订单状态更新:电商订单支付成功、物流变动通知。
  • 系统告警:服务器故障或安全事件的实时提醒。

优势:

  • 精准触达:避免轮询带来的延迟或遗漏。
  • 用户体验:即时反馈提升用户满意度。

在线游戏与交互体验

核心需求:需要快速同步玩家状态和游戏事件。

示例:

  • 多人在线游戏:玩家移动、攻击动作的实时同步。
  • 互动直播:弹幕、礼物打赏的实时显示。
  • 虚拟现实(VR):用户动作和视角的实时传输。

优势:

  • 低延迟:确保游戏操作的流畅性和公平性。
  • 高吞吐量:支持大量玩家同时在线交互。

不适用 WebSocket 的场景

场景特点:单向通信或低频次请求。

示例:

  • 静态资源加载:图片、CSS/JS 文件下载。
  • 表单提交:用户提交数据后仅需单次响应。
  • 简单 API 调用:如查询天气、获取用户信息。

替代方案:

  • HTTP REST API:简单、易实现。
  • Server-Sent Events (SSE):服务端单向推送(如新闻订阅)。

WebSocket 基础使用

WebSocket 的路由的差异

在 FastAPI 中,WebSocket 的路由与传统 HTTP 路由在设计和使用上有显著差异。以下是主要区别的详细解释:

协议与通信模式

  • HTTP 路由
    • 协议:基于HTTP/HTTPS,遵循请求-响应模型。
    • 通信方式:客户端发送请求,服务端返回响应后立即关闭连接。
    • 类比:像打电话,一问一答后挂断。
  • WebSocket 路由
    • 协议:基于 WebSocket(ws://或 wss://),提供全双工通信。
    • 通信方式:建立持久连接后,客户端和服务端可以主动互相发送消息。
    • 类比:像对讲机,连接后双方随时可对话。

路由定义方式

HTTP 路由

from fastapi import FastAPI

app = FastAPI()

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}

WebSocket 路由

from fastapi import WebSocket

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
    await websocket.accept()
    # 处理连接...

关键区别:使用 @app.websocket 装饰器,参数包含 WebSocket 对象。

连接生命周期

HTTP 路由

  • 生命周期:单次请求,响应后立即结束。
  • 状态管理:无状态(除非手动使用 Cookie 或 Token)。

WebSocket 路由

  • 生命周期:
    • 客户端发起握手请求。
    • 服务端显式接受连接(await websocket.accept())。
    • 通过循环持续接收/发送消息。
    • 连接关闭时处理清理逻辑(如close())。
  • 状态保持:连接期间可维护状态(如用户会话、实时数据)。

数据交互方式

HTTP 路由

  • 数据传递:通过路径参数、查询参数、请求体(JSON/表单)等。
  • 响应格式:通常返回 JSON、HTML 或文件。

WebSocket 路由

  • 数据传递:
    • 文本:await websocket.receive_text()
    • 二进制:await websocket.receive_bytes()
    • JSON:await websocket.receive_json()
  • 发送消息:
    • 文本:await websocket.send_text(“Hello”)
    • 二进制:await websocket.send_bytes(b”data”)
    • JSON:await websocket.send_json({“key”: “value”})

参数与依赖注入

HTTP 路由

  • 支持路径参数、查询参数、请求体、Header、Cookie 等。
  • 依赖注入(Depends)用于共享逻辑(如数据库会话)。

WebSocket 路由

  • 参数获取:
    • 路径参数:直接在路由函数中声明(如room_id: str)。
    • 查询参数:通过query_params 或依赖注入。
  • 依赖注入:与 HTTP 路由类似,但需手动处理连接接受后的逻辑。

异常处理

HTTP 路由:抛出 HTTPException 自动生成对应状态码的响应。

示例:

from fastapi import HTTPException

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    if item_id < 0:
        raise HTTPException(status_code=400, detail="Invalid ID")
    return {"item_id": item_id}

WebSocket 路由:需要手动捕获异常(如 WebSocketDisconnect)并关闭连接。

示例:

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            # 处理消息
    except WebSocketDisconnect:
        print("客户端断开连接")
    finally:
        await websocket.close()

性能与并发

  • HTTP 路由
    • 短连接,适合低频请求。
    • 高并发时依赖异步处理(如async def)和服务器配置(如 Uvicorn 工作进程)。
  • WebSocket 路由
    • 长连接,需高效管理资源。
    • 关键优化点
      • 使用异步操作(如await)避免阻塞。
      • 合理管理连接池(如广播消息时遍历所有活跃连接)。

创建 WebSocket 路由

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # 接受客户端连接
    await websocket.accept()
    
    try:
        while True:
            # 接收客户端消息(文本)
            data = await websocket.receive_text()
            # 处理消息并返回响应
            await websocket.send_text(f"Server: {data}")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        await websocket.close()

客户端连接示例(JavaScript

const socket = new WebSocket("ws://localhost:8000/ws");

socket.onopen = () => {
    socket.send("Hello FastAPI!");
};

socket.onmessage = (event) => {
    console.log("Server says:", event.data);
};

WebSocket核心功能实现

处理不同消息类型

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 接收不同类型消息
            message = await websocket.receive()
            if message["type"] == "websocket.receive":
                if "text" in message:
                    await websocket.send_text(f"Text: {message['text']}")
                elif "bytes" in message:
                    await websocket.send_bytes(message["bytes"])
    except WebSocketDisconnect:
        pass

广播消息(多客户端管理)

from fastapi import WebSocketDisconnect

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/chat")
async def chat_websocket(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"User: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast("A user left the chat")

身份验证

from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.websocket("/auth-ws")
async def auth_websocket(
    websocket: WebSocket,
    token: str = Query(...)  # 从查询参数获取 token
):
    # 验证 token
    user = await authenticate_token(token)
    if not user:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return
    
    await websocket.accept()
    # ...处理消息...

心跳检测

import asyncio

@app.websocket("/ws-with-ping")
async def websocket_with_ping(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 设置心跳间隔
            await asyncio.wait_for(
                websocket.receive_text(),
                timeout=30  # 30秒无消息则断开
            )
    except asyncio.TimeoutError:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
    except WebSocketDisconnect:
        pass

性能优化技巧

使用二进制协议

@app.websocket("/binary")
async def binary_websocket(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_bytes()
            processed = process_binary(data)
            await websocket.send_bytes(processed)
    except WebSocketDisconnect:
        pass

消息压缩

from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware)  # 对文本消息自动压缩

测试方法

使用 Python 测试客户端

from fastapi.testclient import TestClient

def test_websocket():
    client = TestClient(app)
    with client.websocket_connect("/ws") as websocket:
        websocket.send_text("Hello")
        data = websocket.receive_text()
        assert data == "Echo: Hello"

浏览器端测试

<script>
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (event) => console.log(event.data);
ws.onopen = () => ws.send("Hello FastAPI");
</script>

部署注意事项

  • 服务器选择:使用支持 WebSocket 的 ASGI 服务器(推荐 Uvicorn 或 Daphne)
uvicorn main:app --ws websockets
  • 负载均衡:配置 Nginx 支持 WebSocket
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
  • 连接限制:监控连接数防止资源耗尽

完整示例:实时聊天室

服务端代码

from fastapi import FastAPI, WebSocket
from typing import List

app = FastAPI()

class ChatManager:
    def __init__(self):
        self.connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.connections.append(websocket)
        await self.broadcast("New user joined!")
    
    async def broadcast(self, message: str):
        for conn in self.connections:
            await conn.send_text(message)
    
    async def disconnect(self, websocket: WebSocket):
        self.connections.remove(websocket)
        await websocket.close()

chat_manager = ChatManager()

@app.websocket("/chat")
async def chat(websocket: WebSocket):
    await chat_manager.connect(websocket)
    try:
        while True:
            message = await websocket.receive_text()
            await chat_manager.broadcast(f"User: {message}")
    except:
        await chat_manager.disconnect(websocket)

客户端代码(HTML + JavaScript)

<!DOCTYPE html>
<html>
<body>
    <input id="message" placeholder="Type message">
    <button onclick="sendMessage()">Send</button>
    <div id="chat"></div>

    <script>
        const socket = new WebSocket("ws://localhost:8000/chat");
        const chatDiv = document.getElementById("chat");

        socket.onmessage = (event) => {
            const p = document.createElement("p");
            p.textContent = event.data;
            chatDiv.appendChild(p);
        };

        function sendMessage() {
            const input = document.getElementById("message");
            socket.send(input.value);
            input.value = "";
        }
    </script>
</body>
</html>

测试 WebSocket 端点

使用 WebSocketTestClient 进行单元测试:

from fastapi.testclient import TestClient
from fastapi.websockets import WebSocketDisconnect

def test_websocket():
    client = TestClient(app)
    try:
        # 连接并发送消息
        with client.websocket_connect("/chat") as websocket:
            websocket.send_text("Hello")
            data = websocket.receive_text()
            assert data == "User: Hello"
    except WebSocketDisconnect:
        pass

通过以上实现,FastAPI 可以轻松构建高性能的实时 WebSocket 应用。建议根据业务需求选择合适的消息协议(如 JSON、Protobuf)和连接管理策略。

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注