!文章内容如有错误或排版问题,请提交反馈,非常感谢!
FastAPI中实现缓存可以提高应用的性能,尤其是在处理重复请求的时候,减少数据库的压力,加快响应速度。
缓存后端
根据应用需求选择合适的缓存后端:
- 内存缓存(InMemory):适用于单进程开发环境,简单快速,但不支持多实例。
- Redis:分布式缓存,适用于生产环境,支持多实例共享缓存。
- 其他后端:如Memcached、数据库等。
内存缓存
在FastAPI中实现内存缓存(InMemory 缓存)非常简单,无需依赖外部服务(如Redis),适合单进程开发环境或临时缓存需求。
内存缓存的特点:
- 单进程适用:仅在单个应用进程内有效,多进程或多实例部署时缓存不共享。
- 数据易失性:应用重启后缓存数据会丢失。
- 简单轻量:无需外部服务,适合开发环境或小型临时缓存需求。
内存缓存适用场景:
- 开发环境:快速验证业务逻辑,无需搭建Redis。
- 临时数据缓存:缓存高频但短时间有效的数据(如临时配置、计算结果)。
- 单实例部署:小型应用或无需分布式缓存的场景。
通过内存缓存,可以快速提升FastAPI应用的性能,但生产环境建议使用Redis等分布式缓存方案。
安装依赖库
使用fastapi-cache库简化内存缓存的实现:
pip install fastapi-cache
(无需安装Redis或其他额外依赖)
配置内存缓存
在FastAPI应用启动时初始化内存缓存:
from fastapi import FastAPI from fastapi_cache import FastAPICache from fastapi_cache.backends.inmemory import InMemoryBackend app = FastAPI() # 初始化内存缓存 @app.on_event("startup") async def startup(): FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache")
缓存路由响应
使用@cache装饰器缓存特定路由的响应,设置过期时间(单位:秒):
from fastapi_cache.decorator import cache @app.get("/users/{user_id}") @cache(expire=60*5) # 缓存5分钟 async def get_user(user_id: int): # 模拟数据库查询(例如从数据库或外部API获取数据) return {"user_id": user_id, "name": "Alice"}
手动管理缓存
在数据更新时清除相关缓存,保持数据一致性:
from fastapi_cache import FastAPICache from fastapi_cache.key_builder import default_key_builder @app.put("/users/{user_id}") async def update_user(user_id: int, name: str): # 1. 更新数据库逻辑(此处省略) # 2. 清除该用户的缓存 cache_key = default_key_builder( func=get_user, # 需要清除的缓存函数 user_id=user_id # 路由参数 ) await FastAPICache.get_backend().delete(cache_key) return {"status": "success", "user_id": user_id}
高级用法
自定义缓存键生成
避免不同路由的缓存键冲突,可以自定义键生成逻辑:
def custom_key_builder(func, **kwargs): return f"{func.__name__}:{kwargs.get('user_id')}" @app.get("/users/{user_id}") @cache(expire=300, key_builder=custom_key_builder) async def get_user(user_id: int): return {"user_id": user_id}
禁用缓存
在特定条件下跳过缓存(例如调试时):
@app.get("/users/{user_id}") @cache(expire=300, unless=lambda _: DEBUG_MODE) # DEBUG_MODE为True时禁用缓存 async def get_user(user_id: int): return {"user_id": user_id}
Redis缓存
以下是如何在FastAPI中集成Redis缓存的详细教程,涵盖配置、基本使用、高级功能及注意事项:
安装Python依赖
pip install fastapi uvicorn fastapi-cache redis
配置FastAPI+Redis
初始化Redis连接
from fastapi import FastAPI from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend from redis import asyncio as aioredis # 异步Redis客户端 app = FastAPI() @app.on_event("startup") async def startup(): # 创建Redis连接(支持密码、SSL等) redis = aioredis.from_url( "redis://localhost:6379", encoding="utf-8", decode_responses=True ) FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
生产环境配置建议
redis = aioredis.from_url( "rediss://:password@redis-host:6379/0", # SSL+密码 ssl_cert_reqs="none", # 禁用证书验证(根据需求调整) socket_connect_timeout=5, # 连接超时 socket_keepalive=True, # 保持长连接 retry_on_timeout=True # 超时重试 )
基本使用:缓存路由响应
缓存GET请求结果
from fastapi_cache.decorator import cache @app.get("/items/{item_id}") @cache(expire=300) # 缓存5分钟 async def read_item(item_id: int): # 模拟数据库查询或耗时计算 return {"item_id": item_id, "data": "expensive_data"}
自定义缓存键(避免冲突)
from fastapi_cache.key_builder import default_key_builder def custom_key_builder(func, **kwargs): # 示例:将路由参数包含在键中 return f"{func.__module__}:{func.__name__}:{kwargs['item_id']}" @app.get("/items/{item_id}") @cache(expire=300, key_builder=custom_key_builder) async def read_item(item_id: int): return {"item_id": item_id}
手动管理缓存
删除缓存
from fastapi_cache import FastAPICache @app.put("/items/{item_id}") async def update_item(item_id: int, new_data: str): # 1. 更新数据库 # 2. 清除缓存 cache_key = default_key_builder( func=read_item, # 需要清除的缓存对应的函数 item_id=item_id ) await FastAPICache.get_backend().delete(cache_key) return {"status": "cache_cleared"}
批量删除缓存(通配符匹配)
async def clear_cache_pattern(pattern: str = "fastapi-cache:*"): redis = FastAPICache.get_backend().redis keys = await redis.keys(pattern) if keys: await redis.delete(*keys)
高级功能
缓存数据库查询结果
from fastapi_cache.decorator import cache from sqlalchemy.ext.asyncio import AsyncSession @cache(expire=3600) async def get_cached_user(user_id: int, session: AsyncSession): # 执行数据库查询 user = await session.get(User, user_id) return user.dict()
缓存依赖项
from fastapi import Depends async def get_expensive_data(): # 耗时操作 return {"data": "value"} @app.get("/data") @cache(expire=60) async def data_view(data: dict = Depends(get_expensive_data)): return data
监控与调试
检查Redis缓存内容
# 查看所有缓存键 redis-cli keys "fastapi-cache:*" # 获取某个键的值 redis-cli GET "fastapi-cache:your_cache_key"
集成日志
import logging redis_logger = logging.getLogger("redis") class RedisLogger: async def get(self, key: str): value = await self.backend.get(key) redis_logger.debug(f"Cache GET: {key} -> {value}") return value # 在初始化时注入自定义后端(需继承RedisBackend)
生产环境注意事项
连接池配置:
redis = aioredis.ConnectionPool.from_url( "redis://localhost:6379", max_connections=100, socket_timeout=5 )
缓存雪崩防护:为过期时间添加随机偏移(如 expire=300+random.randint(0,60))
缓存穿透处理:对空结果设置短时间缓存(如 expire=10)
序列化优化:
from fastapi_cache.serializers import JsonSerializer FastAPICache.init( RedisBackend(redis, serializer=JsonSerializer()), prefix="fastapi-cache" )
通过以上步骤,可以在FastAPI中高效集成Redis缓存,显著提升应用性能。建议在生产环境中启用Redis持久化(AOF/RDB)并配置哨兵或集群模式实现高可用。