文章内容如有错误或排版问题,请提交反馈,非常感谢!
为什么需要后台任务?
- 场景痛点:同步处理耗时操作(如发送邮件、数据分析)导致请求阻塞,用户体验差。
- 核心价值:后台任务实现异步非阻塞,提升吞吐量和响应速度。
- FastAPI 优势:原生支持异步、多种任务方案灵活选择。

后台任务的用途
- 异步执行耗时操作:避免阻塞主请求响应(如发送邮件、数据处理)。
- 解耦非关键逻辑:将非实时性任务(如日志记录、文件清理)移至后台。
- 提升用户体验:快速返回 HTTP 响应,后台异步处理复杂流程。
后台任务的适用场景
- 实时性要求低:日志记录、数据清洗。
- 资源密集型操作:图像处理、机器学习推理。
- 第三方服务调用:发送短信/邮件、支付回调。
- 定时任务:每日报表生成、缓存刷新。
后台任务的实现方式
使用 BackgroundTasks(轻量级任务)
适用场景:简单的、与请求生命周期绑定的任务(如发送通知)。
特点:
- 任务在请求响应后立即执行。
- 任务与请求共享同一事件循环(需避免 CPU 密集型操作)。
基础使用
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def log_operation(message: str):
with open("logs.txt", "a") as f:
f.write(f"{message}\n")
@app.post("/items/")
async def create_item(
name: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(log_operation, f"New item: {name}")
return {"message": "Item created"}
异步任务支持
async def async_send_email(email: str, content: str):
# 模拟异步发送邮件
await asyncio.sleep(1)
print(f"Email to {email}: {content}")
@app.post("/notify")
async def send_notification(
email: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(async_send_email, email, "Hello!")
return {"status": "Notification queued"}
注意事项:
- 任务在请求结束后执行,与请求共享同一事件循环。
- 避免在此处执行 CPU 密集型任务(如复杂计算)。
使用 asyncio 异步任务
适用场景:需要完全异步控制的任务(如调用第三方 API)。
特点:
- 使用 create_task() 启动独立协程。
- 任务生命周期可能长于请求。
代码示例:
import asyncio
from fastapi import FastAPI
app = FastAPI()
async def send_notification(email: str):
await asyncio.sleep(3) # 模拟耗时操作
print(f"Notification sent to {email}")
@app.post("/subscribe/")
async def subscribe(email: str):
asyncio.create_task(send_notification(email))
return {"status": "Subscribed"}
注意事项:
- 需手动处理任务异常(如添加 try-except)。
- 使用 gather() 管理多个并发任务。
使用 Celery(分布式任务队列)
适用场景:高可靠性的复杂任务(如数据处理、定时任务)。
特点:
- 支持任务重试、结果存储、分布式执行。
- 依赖中间件(如 Redis/RabbitMQ)。
代码示例:
步骤 1:安装依赖
pip install celery redis
步骤 2:创建 Celery 实例 (celery_app.py)
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_data(data: dict):
# 模拟耗时数据处理
import time
time.sleep(10)
return {"result": "processed"}
步骤 3:FastAPI 集成
from fastapi import FastAPI
from celery_app import celery_app
app = FastAPI()
@app.post("/process/")
async def start_processing(data: dict):
task = celery_app.send_task("process_data", args=[data])
return {"task_id": task.id}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
result = celery_app.AsyncResult(task_id)
return {
"status": result.status,
"result": result.result
}
生产建议:
- 使用 Flower 监控 Celery 任务。
- 配置 CeleryBeat 实现定时任务。
使用 APScheduler(定时任务)
适用场景:周期性任务(如每日数据备份)。
代码示例:
from apscheduler.schedulers.background import BackgroundScheduler
from fastapi import FastAPI
app = FastAPI()
scheduler = BackgroundScheduler()
def daily_report():
print("Generating daily report...")
@app.on_event("startup")
def init_scheduler():
scheduler.add_job(daily_report, "cron", hour=0) # 每天0点执行
scheduler.start()
@app.on_event("shutdown")
def shutdown_scheduler():
scheduler.shutdown()
注意事项:
- 避免在调度器中执行长耗时任务(可能阻塞后续任务)。
- 使用 AsyncIOScheduler 适配 FastAPI 异步环境。
不同实现方式的对比
| 方式 | 适用场景 | 优点 | 缺点 |
| BackgroundTasks | 简单、短时任务 | 无需额外依赖,与请求生命周期绑定 | 不适合 CPU 密集型或长耗时任务 |
| asyncio | 完全异步控制 | 灵活,适合异步 I/O 操作 | 需手动管理任务状态和错误 |
| Celery | 复杂、分布式任务 | 支持重试、结果存储、水平扩展 | 需要中间件,配置复杂度高 |
| APScheduler | 定时任务 | 简单实现周期性操作 | 不适合动态任务调度 |
根据任务需求选择方案:
- 简单需求:内置 BackgroundTasks
- 复杂需求:Celery + Redis/RabbitMQ
- 定时任务:APScheduler
- 高性能异步:create_task
任务管理最佳实践
任务生命周期管理
| 任务类型 | 生命周期 | 存储需求 |
| BackgroundTasks | 请求生命周期内 | 内存 |
| Celery | 持久化(重启后继续) | 数据库/Redis |
| APScheduler | 应用运行期间 | 内存 |
关键注意事项
错误处理与重试
使用 Celery 的 retry 机制或 tenacity 库实现自动重试。
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
async def call_unstable_api():
response = await httpx.get("https://unstable-api.com")
return response.json()
任务状态监控
记录任务 ID、状态、时间戳到数据库(如 PostgreSQL)。
from sqlalchemy import Column, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TaskStatus(Base):
__tablename__ = "task_status"
id = Column(String, primary_key=True)
status = Column(String) # PENDING, SUCCESS, FAILURE
created_at = Column(DateTime)
性能优化
使用 uvloop 加速 asyncio 事件循环。
对 CPU 密集型任务使用 ThreadPoolExecutor 或分离 Worker。
import concurrent.futures
async def run_cpu_bound():
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
await loop.run_in_executor(pool, cpu_intensive_calculation)
安全性
- 限制后台任务的资源访问权限(如数据库连接池大小)。
- 对任务参数进行校验,避免注入攻击。
参考链接:



