在 FastAPI 中,连接数据库通常使用SQLAlchemy(适用于关系型数据库)或Tortoise-ORM(异步 ORM)。以下是基于 SQLAlchemy 和 Tortoise-ORM 的常见方法:
FastAPI 与 SQLAlchemy 的集成

FastAPI 与 SQLAlchemy 集成可以实现高效的数据库操作,支持同步和异步两种模式。FastAPI 与 SQLAlchemy 集成关键点:
- 同步模式:适合简单应用,直接使用 Session。
- 异步模式:需使用 AsyncSession 和异步驱动(如 asyncpg)。
- 依赖注入:通过 Depends 管理数据库会话生命周期。
- 迁移工具:使用 Alembic 管理数据库版本。
安装依赖
# 同步模式 pip install sqlalchemy fastapi uvicorn # 异步模式(需 Python 3.7+) pip install sqlalchemy[asyncio] fastapi uvicorn asyncpg
同步模式集成
配置数据库连接
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
# 创建引擎
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=10, # 连接池大小
max_overflow=5, # 最大溢出连接数
pool_recycle=1800 # 连接回收时间(秒)
)
# 会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
Base = declarative_base()
定义数据模型
# models.py
from sqlalchemy import Column, Integer, String
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50))
email = Column(String(100), unique=True)
依赖注入管理会话
from fastapi import Depends
from sqlalchemy.orm import Session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/")
def create_user(name: str, email: str, db: Session = Depends(get_db)):
user = User(name=name, email=email)
db.add(user)
db.commit()
db.refresh(user)
return user
异步模式集成
配置异步数据库连接
# async_database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
SQLALCHEMY_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# 创建异步引擎
async_engine = create_async_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=10,
max_overflow=5,
pool_recycle=1800
)
# 异步会话工厂
AsyncSessionLocal = sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
定义异步依赖
async def get_async_db():
async with AsyncSessionLocal() as db:
yield db
@app.post("/async-users/")
async def create_async_user(
name: str,
email: str,
db: AsyncSession = Depends(get_async_db)
):
user = User(name=name, email=email)
db.add(user)
await db.commit()
await db.refresh(user)
return user
数据库迁移(Alembic)
安装 Alembic
pip install alembic
初始化迁移环境
alembic init migrations
配置 alembic.ini
sqlalchemy.url = postgresql://user:password@localhost/dbname
修改 migrations/env.py
from models import Base # 导入模型 target_metadata = Base.metadata
生成迁移脚本
alembic revision --autogenerate -m "Initial migration" alembic upgrade head
最佳实践
会话生命周期管理:
- 使用 yield 或 async with 确保会话正确关闭。
- 避免在全局范围长期持有会话。
事务管理:
async def update_user(user_id: int, new_name: str, db: AsyncSession):
async with db.begin():
user = await db.get(User, user_id)
user.name = new_name
await db.commit()
性能优化:
- 批量操作代替逐条提交。
- 使用 selectinload 避免 N+1 查询问题:
from sqlalchemy.orm import selectinload stmt = select(User).options(selectinload(User.addresses)) result = await db.execute(stmt)
FastAPI 与 Tortoise-ORM 的集成
在 FastAPI 中集成 Tortoise-ORM(专为异步设计的 Python ORM)可以实现高效的异步数据库操作。
FastAPI + Tortoise-ORM 集成优势:
- 全异步支持:适合高并发 I/O 密集型应用
- 简洁的模型定义:类似 Django ORM 的语法
- 自动 OpenAPI 文档:通过 Pydantic 模型生成
- 迁移工具集成:Aerich 提供数据库版本管理

安装依赖
pip install fastapi uvicorn tortoise-orm asyncpg # PostgreSQL 示例 # 或 MySQL:pip install aiomysql
配置 Tortoise-ORM
在 FastAPI 中初始化 Tortoise
# main.py
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
# Tortoise 配置
TORTOISE_ORM = {
"connections": {
"default": "postgres://user:password@localhost/dbname" # PostgreSQL 示例
# MySQL 示例: "mysql://user:password@localhost/dbname"
},
"apps": {
"models": {
"models": ["app.models", "aerich.models"], # 模型文件路径 + Aerich 迁移模型
"default_connection": "default",
}
},
"use_tz": True, # 启用时区支持
"timezone": "Asia/Shanghai",
}
# 注册 Tortoise 到 FastAPI
register_tortoise(
app,
config=TORTOISE_ORM,
generate_schemas=True, # 自动创建表(仅开发环境)
add_exception_handlers=True, # 启用 ORM 异常处理
)
定义数据模型
# app/models.py
from tortoise.models import Model
from tortoise import fields
class User(Model):
id = fields.IntField(pk=True)
username = fields.CharField(max_length=50, unique=True)
email = fields.CharField(max_length=100)
created_at = fields.DatetimeField(auto_now_add=True)
class Meta:
table = "users" # 自定义表名
class Post(Model):
id = fields.IntField(pk=True)
title = fields.CharField(max_length=200)
content = fields.TextField()
author: fields.ForeignKeyRelation[User] = fields.ForeignKeyField(
"models.User", related_name="posts"
)
class Meta:
table = "posts"
创建Pydantic模型
# app/schemas.py from pydantic import BaseModel from tortoise.contrib.pydantic import pydantic_model_creator # 自动从 Tortoise 模型生成 Pydantic 模型 User_Pydantic = pydantic_model_creator(User, name="User") UserCreate_Pydantic = pydantic_model_creator(User, name="UserCreate", exclude_readonly=True) Post_Pydantic = pydantic_model_creator(Post, name="Post")
实现CRUD路由
创建用户
from fastapi import APIRouter, HTTPException
from tortoise.exceptions import IntegrityError
router = APIRouter()
@router.post("/users/", response_model=User_Pydantic)
async def create_user(user: UserCreate_Pydantic):
try:
user_obj = await User.create(**user.dict())
return await User_Pydantic.from_tortoise_orm(user_obj)
except IntegrityError:
raise HTTPException(status_code=400, detail="用户名已存在")
查询用户及其关联数据
@router.get("/users/{user_id}", response_model=User_Pydantic)
async def read_user(user_id: int):
user = await User.get_or_none(id=user_id).prefetch_related("posts")
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return await User_Pydantic.from_tortoise_orm(user)
分页查询
@router.get("/users/", response_model=list[User_Pydantic])
async def list_users(skip: int = 0, limit: int = 100):
return await User_Pydantic.from_queryset(User.all().offset(skip).limit(limit))
高级查询与过滤
条件过滤
from tortoise.expressions import Q
@router.get("/search/", response_model=list[User_Pydantic])
async def search_users(query: str):
return await User_Pydantic.from_queryset(
User.filter(Q(username__icontains=query) | Q(email__icontains=query))
聚合查询
@router.get("/stats/")
async def user_stats():
count = await User.all().count()
return {"total_users": count}
数据库迁移(Aerich)
安装Aerich
pip install aerich
初始化迁移配置
aerich init -t app.main.TORTOISE_ORM # 指定配置路径 aerich init-db # 创建初始迁移
生成迁移文件
修改模型后运行:
aerich migrate --name "add_user_table" aerich upgrade # 应用迁移
完整项目结构
project/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI入口 │ ├── models.py # Tortoise模型 │ └── schemas.py # Pydantic模型 ├── migrations/ # Aerich迁移文件 ├── pyproject.toml └── aerich.toml # Aerich配置文件
最佳实践
依赖注入管理事务:
from contextlib import asynccontextmanager
from tortoise.transactions import in_transaction
@asynccontextmanager
async def get_transaction():
async with in_transaction() as tx:
yield tx
性能优化:
- 使用 .prefetch_related() 避免N+1查询问题
- 使用 .only() 限制返回字段
User.all().prefetch_related("posts").only("username", "email")
错误处理:
from tortoise.exceptions import DoesNotExist
@router.delete("/users/{user_id}")
async def delete_user(user_id: int):
try:
user = await User.get(id=user_id)
await user.delete()
return {"message": "用户已删除"}
except DoesNotExist:
raise HTTPException(status_code=404, detail="用户不存在")
FastAPI与databases的集成
Python的databases库是一个异步数据库访问库,旨在简化与关系型数据库的交互,同时支持异步编程模式(如asyncio)。它的设计目标是提供简单、一致的API,并与多种数据库后端兼容(如PostgreSQL、MySQL、SQLite),同时基于SQLAlchemy Core的查询构建功能。
核心特性
- 异步支持:原生支持async/await 语法,适合异步框架(如FastAPI、Starlette)。
- 多数据库兼容:通过不同驱动支持多种数据库:
- PostgreSQL: asyncpg 或 aiopg
- MySQL: aiomysql
- SQLite: aiosqlite
- 轻量级ORM替代:不强制使用ORM,而是允许直接执行SQL或使用SQLAlchemy Core的声明式查询。
- 连接池管理:自动管理数据库连接池,优化高并发场景的性能。
- 类型安全:支持类型注解,与Python的现代类型系统兼容。
安装依赖
pip install fastapi uvicorn databases[postgresql] sqlalchemy
- databases[postgresql]:包含PostgreSQL异步驱动asyncpg。
- sqlalchemy:用于表结构定义(databases依赖SQLAlchemy Core)。
项目结构
. ├── app │ ├── __init__.py │ ├── main.py # FastAPI主入口 │ ├── database.py # 数据库配置 │ ├── models.py # 数据模型(SQLAlchemy Core) │ └── routers # API路由 │ └── users.py
配置数据库连接
在app/database.py中:
from databases import Database DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb" database = Database(DATABASE_URL)
定义数据模型(SQLAlchemy Core)
在app/models.py中:
from sqlalchemy import Table, Column, Integer, String, MetaData
metadata = MetaData()
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
Column("email", String(100), unique=True),
)
创建FastAPI应用并集成数据库
在app/main.py中:
from fastapi import FastAPI
from app.database import database
from app.models import metadata
app = FastAPI()
# 启动时连接数据库
@app.on_event("startup")
async def startup():
await database.connect()
# 自动创建表(仅示例,生产环境建议用Alembic迁移)
# 注意:metadata.create_all是同步方法,需在同步上下文中执行
# 此处仅为演示,实际项目应单独处理表创建
# 关闭时断开连接
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
# 引入路由
from app.routers import users
app.include_router(users.router)
编写API路由
在app/routers/users.py中:
from fastapi import APIRouter, HTTPException
from app.database import database
from app.models import users
from pydantic import BaseModel
router = APIRouter(prefix="/users", tags=["users"])
# 请求/响应模型
class UserCreate(BaseModel):
name: str
email: str
class UserResponse(UserCreate):
id: int
# 获取所有用户
@router.get("/", response_model=list[UserResponse])
async def get_all_users():
query = users.select()
return await database.fetch_all(query)
# 创建用户
@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
query = users.insert().values(**user.dict())
user_id = await database.execute(query)
return {**user.dict(), "id": user_id}
# 获取单个用户
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
query = users.select().where(users.c.id == user_id)
user = await database.fetch_one(query)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
运行应用
uvicorn app.main:app --reload
访问http://localhost:8000/docs测试API。
高级用法
事务处理
async def create_user_with_transaction(user: UserCreate):
async with database.transaction():
# 执行多个操作(原子性)
query = users.insert().values(**user.dict())
user_id = await database.execute(query)
# 其他操作...
return user_id
原始SQL查询
@router.get("/search/")
async def search_users(name: str):
query = "SELECT * FROM users WHERE name = :name"
return await database.fetch_all(query=query, values={"name": name})
连接池配置
# 在database.py中调整连接池大小 database = Database(DATABASE_URL, min_size=5, max_size=20)
批量操作
query = users.insert()
values = [{"name": "Alice"}, {"name": "Bob"}]
await database.execute_many(query, values)
结果集处理:
- fetch_one():返回单条记录。
- fetch_all():返回所有记录。
- execute():执行写操作(INSERT/UPDATE/DELETE)并返回最后一行ID(如适用)。
错误处理与中间件
在app/main.py中添加:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# 全局异常处理(示例)
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
生产环境优化
- 使用环境变量:通过pydantic-settings 管理数据库URL。
- 连接池监控:定期检查连接池状态(如connection())。
- 异步Alembic:使用alembic + asyncpg 实现异步数据库迁移。
常见问题
- 驱动安装错误:确保安装了正确的异步驱动(如asyncpg)。
- 同步代码调用异步方法:避免在同步函数中直接调用await,需使用 run() 包装。
- 连接泄漏:确保每个请求结束后释放连接(FastAPI自动处理)。
通过以上步骤,你可以快速将 FastAPI 与 databases 集成,构建高性能异步 API。根据项目需求选择是否引入 ORM(如 Tortoise-ORM)或保持轻量级 SQL 操作。
SQLAlchemy、Tortoise-ORM 还是 databases?
在 FastAPI 中选择SQLAlchemy、Tortoise-ORM 还是databases,取决于具体需求(如性能、开发体验、是否需要 ORM 功能等)。
SQLAlchemy(同步/异步混合)
- 定位:成熟的 Python ORM 和 SQL 工具包,支持同步和异步(需搭配 databases 库)。
- 特点:
- 功能全面:支持复杂查询、事务、关系模型、迁移(需 Alembic)。
- 同步为主:原生为同步设计,异步需通过 databases + asyncpg/aiomysql 等驱动实现。
- 学习曲线陡峭:API 较复杂,适合熟悉 ORM 的开发者。
- 适用场景:
- 需要复杂 ORM 功能(如多表关联、混合属性)。
- 已有基于 SQLAlchemy 的代码,需迁移到 FastAPI。
- 同步和异步混合使用(如后台任务用同步,API 用异步)。
Tortoise-ORM(纯异步 ORM)
- 定位:专为异步设计的 ORM,语法类似 Django ORM。
- 特点:
- 异步原生:直接支持 async/await。
- 轻量 ORM:提供模型定义、迁移工具(需 aerich)、关系管理。
- FastAPI 集成友好:官方支持 FastAPI 插件(contrib.fastapi)。
- 适用场景:
- 需要完整的异步 ORM(如模型、外键、预加载查询)。
- 快速开发中小型项目,希望减少样板代码。
- 习惯 Django ORM 风格。
databases(纯异步 SQL 工具)
- 定位:轻量级异步数据库访问库,基于 SQLAlchemy Core。
- 特点:
- 无 ORM:直接执行 SQL 或使用 SQLAlchemy Core 查询构造器。
- 灵活性高:适合喜欢手写 SQL 或简单查询的场景。
- 依赖少:需配合其他库(如 Pydantic)实现数据验证。
- 适用场景:
- 不需要复杂 ORM 功能,只需执行原始 SQL 或简单查询。
- 已有 SQLAlchemy Core 查询逻辑,需异步化。
- 追求极致轻量,避免 ORM 开销。
对比总结
| 维度 | SQLAlchemy+databases | Tortoise-ORM | databases |
| 异步支持 | 是(需异步驱动) | 是(原生) | 是(原生) |
| ORM 功能 | 全面(复杂查询、关系) | 完整(轻量 ORM) | 无(需手写 SQL 或 SQLAlchemy Core) |
| 学习曲线 | 高(需掌握 SQLAlchemy) | 中(类似 Django ORM) | 低(直接执行 SQL) |
| 性能 | 高(连接池优化) | 中(ORM 有开销) | 高(无 ORM 开销) |
| 适用项目规模 | 中大型(复杂业务逻辑) | 中小型(快速开发) | 小型或微服务(轻量查询) |
| 迁移工具 | 需 Alembic | 内置 Aerich | 无(需手动处理) |
选择建议
- 需要完整 ORM 且纯异步 → Tortoise-ORM
- 适合快速开发、模型关系复杂的项目,如用户管理系统、博客平台。
- 复杂查询+异步/同步混合 → SQLAlchemy+databases
- 适合企业级应用(如电商后台),需处理事务、复杂连接查询。
- 轻量级+直接控制 SQL → databases
- 适合微服务、API 网关等简单查询场景,如缓存层、数据分析接口。
最终推荐组合
- 大多数 FastAPI 项目:Tortoise-ORM(平衡开发效率和异步性能)+ Pydantic(数据验证)。
- 高性能复杂场景:databases(异步 SQL)+ SQLAlchemy Core(查询构造)+ Pydantic(验证)。
- 遗留系统迁移:保持SQLAlchemy(同步)+逐步异步化部分接口。
FastAPI 数据库连接池管理
在 FastAPI 中管理数据库连接池是确保应用高性能和稳定性的关键步骤。FastAPI 中数据库连接池管理的核心步骤:
- 配置连接池参数:根据负载调整 pool_size、max_overflow 等。
- 依赖注入管理会话:使用 yield 或 async with 确保连接释放。
- 异步支持:选择适合的异步驱动(如 asyncpg、aiomysql)。
- 监控与调优:通过日志和数据库监控工具优化连接池行为。
以下是详细的数据库连接池管理方法,以 SQLAlchemy(同步/异步)和 Tortoise-ORM(异步)为例:
连接池的核心概念
- 连接池的作用:复用数据库连接,避免频繁创建/关闭连接的开销。
- 关键参数:
- pool_size:连接池保持的最小连接数。
- max_overflow:允许超出 pool_size 的最大临时连接数。
- pool_recycle:连接自动回收时间(秒),避免数据库主动断开空闲连接。
- pool_timeout:获取连接的超时时间(秒)。
使用 SQLAlchemy 管理同步连接池
配置连接池
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 连接池配置
DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(
DATABASE_URL,
pool_size=10, # 最小连接数
max_overflow=5, # 最大临时连接数
pool_recycle=1800, # 30分钟回收连接
pool_timeout=30, # 获取连接超时时间
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
依赖注入管理会话
from fastapi import Depends
from sqlalchemy.orm import Session
def get_db():
db = SessionLocal()
try:
yield db # 使用生成器确保会话关闭
finally:
db.close()
@app.get("/items/{item_id}")
def read_item(item_id: int, db: Session = Depends(get_db)):
item = db.query(Item).filter(Item.id == item_id).first()
return item
使用SQLAlchemy 管理异步连接池
配置异步引擎
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
async_engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=5,
pool_recycle=1800,
pool_timeout=30,
)
AsyncSessionLocal = sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False
)
异步依赖注入
async def get_async_db():
async with AsyncSessionLocal() as db:
yield db
@app.get("/async-items/{item_id}")
async def read_async_item(item_id: int, db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(Item).filter(Item.id == item_id))
item = result.scalars().first()
return item
使用Tortoise-ORM 管理异步连接池
配置Tortoise-ORM
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
register_tortoise(
app,
db_url="postgres://user:password@localhost/dbname",
modules={"models": ["app.models"]},
generate_schemas=True, # 自动生成表结构(仅开发环境)
add_exception_handlers=True,
config={
"connections": {
"default": {
"engine": "tortoise.backends.asyncpg",
"credentials": {
"host": "localhost",
"port": "5432",
"user": "user",
"password": "password",
"database": "dbname",
},
"pool_size": 10, # 连接池大小
"max_overflow": 5, # 最大溢出连接数
"pool_recycle": 1800, # 连接回收时间
}
}
}
)
使用依赖注入
from tortoise.transactions import in_transaction
@app.post("/items/")
async def create_item(item_data: dict):
async with in_transaction() as db:
item = await Item.create(**item_data, using_db=db)
return item
连接池最佳实践
参数调优建议
| 参数 | 推荐值 | 说明 |
| pool_size | 5-20 | 根据并发量调整,避免过大占用数据库资源。 |
| max_overflow | pool_size*0.5 | 突发流量时允许临时扩容。 |
| pool_recycle | 1800(30分钟) | 防止数据库主动断开空闲连接(如MySQL默认8小时断开)。 |
| pool_timeout | 30 | 避免长时间等待连接导致请求堆积。 |
监控与调试
监控连接数:
-- PostgreSQL SELECT * FROM pg_stat_activity; -- MySQL SHOW STATUS LIKE 'Threads_connected';
日志跟踪:
import logging
logging.basicConfig()
logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG)
避免常见问题
- 连接泄漏:确保每次操作后关闭会话(使用 yield 或 async with)。
- 长事务:尽量缩短事务持有时间,避免占用连接过久。
- 连接耗尽:合理设置 pool_size 和 max_overflow,结合负载测试调整。
FastAPI CRUD 操作封装
在 FastAPI 中,通过封装 CRUD(Create, Read, Update, Delete)操作可以提高代码复用性和可维护性。通过封装你可以获得以下优势:
- 代码复用:所有模型的 CRUD 操作继承基类逻辑
- 类型安全:利用Python类型提示和Pydantic模型
- 灵活扩展:每个模型可自定义额外方法(如get_by_email)
- 事务安全:通过上下文管理器确保原子性操作
- API一致性:统一的分页、错误响应格式
以下是详细的CRUD封装方案,支持同步和异步模式,并兼容Pydantic模型与SQLAlchemy ORM。
基础CRUD类封装
定义通用CRUD基类
# crud/base.py
from typing import Any, Generic, TypeVar, Optional
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update, delete
ModelType = TypeVar("ModelType") # SQLAlchemy模型类型
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) # 创建请求模型
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel) # 更新请求模型
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: ModelType):
self.model = model
async def get(self, db: AsyncSession, id: Any) -> Optional[ModelType]:
"""根据ID查询单条记录"""
result = await db.execute(select(self.model).filter(self.model.id == id))
return result.scalars().first()
async def get_multi(
self,
db: AsyncSession,
skip: int = 0,
limit: int = 100,
**filters: Any
) -> list[ModelType]:
"""分页查询+过滤"""
query = select(self.model)
# 动态添加过滤条件
for field, value in filters.items():
if hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
result = await db.execute(query.offset(skip).limit(limit))
return result.scalars().all()
async def create(self, db: AsyncSession, obj_in: CreateSchemaType) -> ModelType:
"""创建记录"""
db_obj = self.model(**obj_in.dict()) # 假设Pydantic模型与ORM模型字段一致
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def update(
self,
db: AsyncSession,
db_obj: ModelType,
obj_in: UpdateSchemaType | dict[str, Any]
) -> ModelType:
"""更新记录"""
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
await db.execute(
update(self.model)
.where(self.model.id == db_obj.id)
.values(**update_data)
)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def delete(self, db: AsyncSession, id: Any) -> None:
"""删除记录"""
await db.execute(delete(self.model).where(self.model.id == id))
await db.commit()
自定义模型CRUD类
# crud/user.py
from app.models.user import User # SQLAlchemy模型
from app.schemas.user import UserCreate, UserUpdate # Pydantic模型
from .base import CRUDBase
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
async def get_by_email(self, db: AsyncSession, email: str) -> Optional[User]:
"""根据邮箱查询用户(自定义方法)"""
result = await db.execute(select(User).filter(User.email == email))
return result.scalars().first()
user_crud = CRUDUser(User) # 实例化,供路由直接调用
Pydantic模型与ORM模型映射
定义Pydantic模型
# schemas/user.py
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
email: EmailStr
name: str | None = None
class UserCreate(UserBase):
password: str
class UserUpdate(UserBase):
password: str | None = None
class UserInDB(UserBase):
id: int
is_active: bool
class Config:
orm_mode = True # 允许从ORM对象转换
SQLAlchemy模型
# models/user.py
from sqlalchemy import Column, Integer, String, Boolean
from app.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(100), unique=True, index=True)
name = Column(String(50))
password = Column(String(100))
is_active = Column(Boolean, default=True)
路由层调用封装
定义路由依赖
#api/deps.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_async_db
async def get_user_crud(db: AsyncSession = Depends(get_async_db)):
from app.crud.user import user_crud
return user_crud, db # 返回CRUD实例和数据库会话
路由实现
#api/routers/user.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.user import UserCreate, UserUpdate, UserInDB
from app.crud.user import user_crud
from .deps import get_user_crud
router = APIRouter()
@router.post("/", response_model=UserInDB)
async def create_user(
user_in: UserCreate,
crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
crud, db = crud_and_db
existing_user = await crud.get_by_email(db, email=user_in.email)
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
return await crud.create(db, obj_in=user_in)
@router.get("/{user_id}", response_model=UserInDB)
async def read_user(
user_id: int,
crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
crud, db = crud_and_db
user = await crud.get(db, id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.put("/{user_id}", response_model=UserInDB)
async def update_user(
user_id: int,
user_in: UserUpdate,
crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
crud, db = crud_and_db
user = await crud.get(db, id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return await crud.update(db, db_obj=user, obj_in=user_in)
高级封装技巧
事务管理装饰器
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
@asynccontextmanager
async def transactional(db: AsyncSession):
try:
yield
await db.commit()
except Exception as e:
await db.rollback()
raise e
# 使用示例
async def create_user_with_transaction(user_in: UserCreate, db: AsyncSession):
async with transactional(db):
user = User(**user_in.dict())
db.add(user)
await db.flush() # 获取生成的ID
# 其他操作...
分页响应模型
from pydantic.generics import GenericModel
from typing import Generic, TypeVar, Sequence
T = TypeVar("T")
class PaginatedResponse(GenericModel, Generic[T]):
data: Sequence[T]
total: int
skip: int
limit: int
# 路由中使用
@router.get("/", response_model=PaginatedResponse[UserInDB])
async def read_users(
skip: int = 0,
limit: int = 100,
crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
crud, db = crud_and_db
users = await crud.get_multi(db, skip=skip, limit=limit)
total = await crud.count(db)
return {"data": users, "total": total, "skip": skip, "limit": limit}
完整项目结构参考
project/ ├── app/ │ ├── crud/ │ │ ├── base.py # CRUD基类 │ │ └── user.py # 用户CRUD实现 │ ├── models/ │ │ └── user.py # SQLAlchemy模型 │ ├── schemas/ │ │ └── user.py # Pydantic模型 │ ├── api/ │ │ ├── deps.py # 依赖注入 │ │ └── routers/ # 路由文件 │ └── database.py # 数据库配置 └── main.py # FastAPI入口
性能优化建议
批量操作:实现 bulk_create 方法
async def bulk_create(self, db: AsyncSession, objs_in: list[CreateSchemaType]):
db_objs = [self.model(**obj.dict()) for obj in objs_in]
db.add_all(db_objs)
await db.commit()
预加载关联数据:使用 selectinload
from sqlalchemy.orm import selectinload
async def get_with_relations(self, db: AsyncSession, id: int):
result = await db.execute(
select(self.model)
.options(selectinload(self.model.addresses))
.filter(self.model.id == id)
)
return result.scalars().first()
缓存机制:集成Redis缓存高频查询结果



