为什么需要后台任务?
- 场景痛点:同步处理耗时操作(如发送邮件、数据分析)导致请求阻塞,用户体验差。
- 核心价值:后台任务实现异步非阻塞,提升吞吐量和响应速度。
- FastAPI 优势:原生支持异步、多种任务方案灵活选择。
后台任务的用途
- 异步执行耗时操作:避免阻塞主请求响应(如发送邮件、数据处理)。
- 解耦非关键逻辑:将非实时性任务(如日志记录、文件清理)移至后台。
- 提升用户体验:快速返回 HTTP 响应,后台异步处理复杂流程。
后台任务的适用场景
- 实时性要求低:日志记录、数据清洗。
- 资源密集型操作:图像处理、机器学习推理。
- 第三方服务调用:发送短信/邮件、支付回调。
- 定时任务:每日报表生成、缓存刷新。
后台任务的实现方式
使用 BackgroundTasks(轻量级任务)
适用场景:简单的、与请求生命周期绑定的任务(如发送通知)。
特点:
- 任务在请求响应后立即执行。
- 任务与请求共享同一事件循环(需避免 CPU 密集型操作)。
基础使用
from fastapi import BackgroundTasks, FastAPI app = FastAPI() def log_operation(message: str): with open("logs.txt", "a") as f: f.write(f"{message}\n") @app.post("/items/") async def create_item( name: str, background_tasks: BackgroundTasks ): background_tasks.add_task(log_operation, f"New item: {name}") return {"message": "Item created"}
异步任务支持
async def async_send_email(email: str, content: str): # 模拟异步发送邮件 await asyncio.sleep(1) print(f"Email to {email}: {content}") @app.post("/notify") async def send_notification( email: str, background_tasks: BackgroundTasks ): background_tasks.add_task(async_send_email, email, "Hello!") return {"status": "Notification queued"}
注意事项:
- 任务在请求结束后执行,与请求共享同一事件循环。
- 避免在此处执行 CPU 密集型任务(如复杂计算)。
使用 asyncio 异步任务
适用场景:需要完全异步控制的任务(如调用第三方 API)。
特点:
- 使用create_task() 启动独立协程。
- 任务生命周期可能长于请求。
代码示例:
import asyncio from fastapi import FastAPI app = FastAPI() async def send_notification(email: str): await asyncio.sleep(3) # 模拟耗时操作 print(f"Notification sent to {email}") @app.post("/subscribe/") async def subscribe(email: str): asyncio.create_task(send_notification(email)) return {"status": "Subscribed"}
注意事项:
- 需手动处理任务异常(如添加try-except)。
- 使用gather() 管理多个并发任务。
使用 Celery(分布式任务队列)
适用场景:高可靠性的复杂任务(如数据处理、定时任务)。
特点:
- 支持任务重试、结果存储、分布式执行。
- 依赖中间件(如 Redis/RabbitMQ)。
代码示例:
步骤 1:安装依赖
pip install celery redis
步骤 2:创建 Celery 实例 (celery_app.py)
from celery import Celery celery_app = Celery( "tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0" ) @celery_app.task def process_data(data: dict): # 模拟耗时数据处理 import time time.sleep(10) return {"result": "processed"}
步骤 3:FastAPI 集成
from fastapi import FastAPI from celery_app import celery_app app = FastAPI() @app.post("/process/") async def start_processing(data: dict): task = celery_app.send_task("process_data", args=[data]) return {"task_id": task.id} @app.get("/task/{task_id}") async def get_task_status(task_id: str): result = celery_app.AsyncResult(task_id) return { "status": result.status, "result": result.result }
生产建议:
- 使用Flower 监控 Celery 任务。
- 配置Celery Beat 实现定时任务。
使用 APScheduler(定时任务)
适用场景:周期性任务(如每日数据备份)。
代码示例:
from apscheduler.schedulers.background import BackgroundScheduler from fastapi import FastAPI app = FastAPI() scheduler = BackgroundScheduler() def daily_report(): print("Generating daily report...") @app.on_event("startup") def init_scheduler(): scheduler.add_job(daily_report, "cron", hour=0) # 每天 0 点执行 scheduler.start() @app.on_event("shutdown") def shutdown_scheduler(): scheduler.shutdown()
注意事项:
- 避免在调度器中执行长耗时任务(可能阻塞后续任务)。
- 使用AsyncIOScheduler 适配 FastAPI 异步环境。
不同实现方式的对比
方式 | 适用场景 | 优点 | 缺点 |
BackgroundTasks | 简单、短时任务 | 无需额外依赖,与请求生命周期绑定 | 不适合 CPU 密集型或长耗时任务 |
asyncio | 完全异步控制 | 灵活,适合异步 I/O 操作 | 需手动管理任务状态和错误 |
Celery | 复杂、分布式任务 | 支持重试、结果存储、水平扩展 | 需要中间件,配置复杂度高 |
APScheduler | 定时任务 | 简单实现周期性操作 | 不适合动态任务调度 |
根据任务需求选择方案:
- 简单需求:内置BackgroundTasks
- 复杂需求:Celery+ Redis/RabbitMQ
- 定时任务:APScheduler
- 高性能异步:create_task
任务管理最佳实践
任务生命周期管理
任务类型 | 生命周期 | 存储需求 |
BackgroundTasks | 请求生命周期内 | 内存 |
Celery | 持久化(重启后继续) | 数据库/Redis |
APScheduler | 应用运行期间 | 内存 |
关键注意事项
错误处理与重试
使用 Celery 的 retry 机制或 tenacity 库实现自动重试。
from tenacity import retry, stop_after_attempt @retry(stop=stop_after_attempt(3)) async def call_unstable_api(): response = await httpx.get("https://unstable-api.com") return response.json()
任务状态监控
记录任务 ID、状态、时间戳到数据库(如 PostgreSQL)。
from sqlalchemy import Column, String, DateTime from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class TaskStatus(Base): __tablename__ = "task_status" id = Column(String, primary_key=True) status = Column(String) # PENDING, SUCCESS, FAILURE created_at = Column(DateTime)
性能优化
使用 uvloop 加速 asyncio 事件循环。
对 CPU 密集型任务使用 ThreadPoolExecutor 或分离 Worker。
import concurrent.futures async def run_cpu_bound(): loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as pool: await loop.run_in_executor(pool, cpu_intensive_calculation)
安全性
- 限制后台任务的资源访问权限(如数据库连接池大小)。
- 对任务参数进行校验,避免注入攻击。
参考链接: