数据, 术→技巧

FastAPI学习之后台任务

钱魏Way · · 92 次浏览

为什么需要后台任务?

  • 场景痛点:同步处理耗时操作(如发送邮件、数据分析)导致请求阻塞,用户体验差。
  • 核心价值:后台任务实现异步非阻塞,提升吞吐量和响应速度。
  • FastAPI 优势:原生支持异步、多种任务方案灵活选择。

后台任务的用途

  • 异步执行耗时操作:避免阻塞主请求响应(如发送邮件、数据处理)。
  • 解耦非关键逻辑:将非实时性任务(如日志记录、文件清理)移至后台。
  • 提升用户体验:快速返回 HTTP 响应,后台异步处理复杂流程。

后台任务的适用场景

  • 实时性要求低:日志记录、数据清洗。
  • 资源密集型操作:图像处理、机器学习推理。
  • 第三方服务调用:发送短信/邮件、支付回调。
  • 定时任务:每日报表生成、缓存刷新。

后台任务的实现方式

使用 BackgroundTasks(轻量级任务)

适用场景:简单的、与请求生命周期绑定的任务(如发送通知)。

特点

  • 任务在请求响应后立即执行。
  • 任务与请求共享同一事件循环(需避免 CPU 密集型操作)。

基础使用

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def log_operation(message: str):
    with open("logs.txt", "a") as f:
        f.write(f"{message}\n")

@app.post("/items/")
async def create_item(
    name: str, 
    background_tasks: BackgroundTasks
):
    background_tasks.add_task(log_operation, f"New item: {name}")
    return {"message": "Item created"}

异步任务支持

async def async_send_email(email: str, content: str):
    # 模拟异步发送邮件
    await asyncio.sleep(1)
    print(f"Email to {email}: {content}")

@app.post("/notify")
async def send_notification(
    email: str, 
    background_tasks: BackgroundTasks
):
    background_tasks.add_task(async_send_email, email, "Hello!")
    return {"status": "Notification queued"}

注意事项

  • 任务在请求结束后执行,与请求共享同一事件循环。
  • 避免在此处执行 CPU 密集型任务(如复杂计算)。

使用 asyncio 异步任务

适用场景:需要完全异步控制的任务(如调用第三方 API)。

特点:

  • 使用create_task() 启动独立协程。
  • 任务生命周期可能长于请求。

代码示例:

import asyncio
from fastapi import FastAPI

app = FastAPI()

async def send_notification(email: str):
    await asyncio.sleep(3)  # 模拟耗时操作
    print(f"Notification sent to {email}")

@app.post("/subscribe/")
async def subscribe(email: str):
    asyncio.create_task(send_notification(email))
    return {"status": "Subscribed"}

注意事项

  • 需手动处理任务异常(如添加try-except)。
  • 使用gather() 管理多个并发任务。

使用 Celery(分布式任务队列)

适用场景:高可靠性的复杂任务(如数据处理、定时任务)。

特点

  • 支持任务重试、结果存储、分布式执行。
  • 依赖中间件(如 Redis/RabbitMQ)。

代码示例:

步骤 1:安装依赖

pip install celery redis

步骤 2:创建 Celery 实例 (celery_app.py)

from celery import Celery

celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task
def process_data(data: dict):
    # 模拟耗时数据处理
    import time
    time.sleep(10)
    return {"result": "processed"}

步骤 3:FastAPI 集成

from fastapi import FastAPI
from celery_app import celery_app

app = FastAPI()

@app.post("/process/")
async def start_processing(data: dict):
    task = celery_app.send_task("process_data", args=[data])
    return {"task_id": task.id}

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    result = celery_app.AsyncResult(task_id)
    return {
        "status": result.status,
        "result": result.result
    }

生产建议

  • 使用Flower 监控 Celery 任务。
  • 配置Celery Beat 实现定时任务。

使用 APScheduler(定时任务)

适用场景:周期性任务(如每日数据备份)。

代码示例:

from apscheduler.schedulers.background import BackgroundScheduler
from fastapi import FastAPI

app = FastAPI()
scheduler = BackgroundScheduler()

def daily_report():
    print("Generating daily report...")

@app.on_event("startup")
def init_scheduler():
    scheduler.add_job(daily_report, "cron", hour=0)  # 每天 0 点执行
    scheduler.start()

@app.on_event("shutdown")
def shutdown_scheduler():
    scheduler.shutdown()

注意事项

  • 避免在调度器中执行长耗时任务(可能阻塞后续任务)。
  • 使用AsyncIOScheduler 适配 FastAPI 异步环境。

不同实现方式的对比

方式 适用场景 优点 缺点
BackgroundTasks 简单、短时任务 无需额外依赖,与请求生命周期绑定 不适合 CPU 密集型或长耗时任务
asyncio 完全异步控制 灵活,适合异步 I/O 操作 需手动管理任务状态和错误
Celery 复杂、分布式任务 支持重试、结果存储、水平扩展 需要中间件,配置复杂度高
APScheduler 定时任务 简单实现周期性操作 不适合动态任务调度

根据任务需求选择方案:

  • 简单需求:内置BackgroundTasks
  • 复杂需求:Celery+ Redis/RabbitMQ
  • 定时任务:APScheduler
  • 高性能异步:create_task

任务管理最佳实践

任务生命周期管理

任务类型 生命周期 存储需求
BackgroundTasks 请求生命周期内 内存
Celery 持久化(重启后继续) 数据库/Redis
APScheduler 应用运行期间 内存

关键注意事项

错误处理与重试

使用 Celery 的 retry 机制或 tenacity 库实现自动重试。

from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
async def call_unstable_api():
    response = await httpx.get("https://unstable-api.com")
    return response.json()

任务状态监控

记录任务 ID、状态、时间戳到数据库(如 PostgreSQL)。

from sqlalchemy import Column, String, DateTime
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class TaskStatus(Base):
    __tablename__ = "task_status"
    id = Column(String, primary_key=True)
    status = Column(String)  # PENDING, SUCCESS, FAILURE
    created_at = Column(DateTime)

性能优化

使用 uvloop 加速 asyncio 事件循环。

对 CPU 密集型任务使用 ThreadPoolExecutor 或分离 Worker。

import concurrent.futures

async def run_cpu_bound():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        await loop.run_in_executor(pool, cpu_intensive_calculation)

安全性

  • 限制后台任务的资源访问权限(如数据库连接池大小)。
  • 对任务参数进行校验,避免注入攻击。

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注