在 FastAPI 中,连接数据库通常使用 SQLAlchemy(适用于关系型数据库)或 Tortoise-ORM(异步 ORM)。以下是基于 SQLAlchemy 和 Tortoise-ORM 的常见方法:
FastAPI 与 SQLAlchemy 的集成
FastAPI 与 SQLAlchemy 集成可以实现高效的数据库操作,支持同步和异步两种模式。FastAPI 与 SQLAlchemy 集成关键点:
- 同步模式:适合简单应用,直接使用Session。
- 异步模式:需使用AsyncSession 和异步驱动(如 asyncpg)。
- 依赖注入:通过Depends 管理数据库会话生命周期。
- 迁移工具:使用 Alembic 管理数据库版本。
安装依赖
# 同步模式 pip install sqlalchemy fastapi uvicorn # 异步模式(需 Python 3.7+) pip install sqlalchemy[asyncio] fastapi uvicorn asyncpg
同步模式集成
配置数据库连接
# database.py from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname" # 创建引擎 engine = create_engine( SQLALCHEMY_DATABASE_URL, pool_size=10, # 连接池大小 max_overflow=5, # 最大溢出连接数 pool_recycle=1800 # 连接回收时间(秒) ) # 会话工厂 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 声明基类 Base = declarative_base()
定义数据模型
# models.py from sqlalchemy import Column, Integer, String from database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) name = Column(String(50)) email = Column(String(100), unique=True)
依赖注入管理会话
from fastapi import Depends from sqlalchemy.orm import Session def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/users/") def create_user(name: str, email: str, db: Session = Depends(get_db)): user = User(name=name, email=email) db.add(user) db.commit() db.refresh(user) return user
异步模式集成
配置异步数据库连接
# async_database.py from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, declarative_base SQLALCHEMY_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname" # 创建异步引擎 async_engine = create_async_engine( SQLALCHEMY_DATABASE_URL, pool_size=10, max_overflow=5, pool_recycle=1800 ) # 异步会话工厂 AsyncSessionLocal = sessionmaker( bind=async_engine, class_=AsyncSession, expire_on_commit=False ) Base = declarative_base()
定义异步依赖
async def get_async_db(): async with AsyncSessionLocal() as db: yield db @app.post("/async-users/") async def create_async_user( name: str, email: str, db: AsyncSession = Depends(get_async_db) ): user = User(name=name, email=email) db.add(user) await db.commit() await db.refresh(user) return user
数据库迁移(Alembic)
安装 Alembic
pip install alembic
初始化迁移环境
alembic init migrations
配置 alembic.ini
sqlalchemy.url = postgresql://user:password@localhost/dbname
修改 migrations/env.py
from models import Base # 导入模型 target_metadata = Base.metadata
生成迁移脚本
alembic revision --autogenerate -m "Initial migration" alembic upgrade head
最佳实践
会话生命周期管理:
- 使用yield 或 async with 确保会话正确关闭。
- 避免在全局范围长期持有会话。
事务管理:
async def update_user(user_id: int, new_name: str, db: AsyncSession): async with db.begin(): user = await db.get(User, user_id) user.name = new_name await db.commit()
性能优化:
- 批量操作代替逐条提交。
- 使用selectinload 避免 N+1 查询问题:
from sqlalchemy.orm import selectinload stmt = select(User).options(selectinload(User.addresses)) result = await db.execute(stmt)
FastAPI与 Tortoise-ORM的集成
在 FastAPI 中集成 Tortoise-ORM(专为异步设计的 Python ORM)可以实现高效的异步数据库操作。
FastAPI + Tortoise-ORM 集成优势:
- 全异步支持:适合高并发 I/O 密集型应用
- 简洁的模型定义:类似 Django ORM 的语法
- 自动 OpenAPI 文档:通过 Pydantic 模型生成
- 迁移工具集成:Aerich 提供数据库版本管理
安装依赖
pip install fastapi uvicorn tortoise-orm asyncpg # PostgreSQL 示例 # 或 MySQL:pip install aiomysql
配置 Tortoise-ORM
在 FastAPI 中初始化 Tortoise
# main.py from fastapi import FastAPI from tortoise.contrib.fastapi import register_tortoise app = FastAPI() # Tortoise 配置 TORTOISE_ORM = { "connections": { "default": "postgres://user:password@localhost/dbname" # PostgreSQL 示例 # MySQL 示例: "mysql://user:password@localhost/dbname" }, "apps": { "models": { "models": ["app.models", "aerich.models"], # 模型文件路径 + Aerich 迁移模型 "default_connection": "default", } }, "use_tz": True, # 启用时区支持 "timezone": "Asia/Shanghai", } # 注册 Tortoise 到 FastAPI register_tortoise( app, config=TORTOISE_ORM, generate_schemas=True, # 自动创建表(仅开发环境) add_exception_handlers=True, # 启用 ORM 异常处理 )
定义数据模型
# app/models.py from tortoise.models import Model from tortoise import fields class User(Model): id = fields.IntField(pk=True) username = fields.CharField(max_length=50, unique=True) email = fields.CharField(max_length=100) created_at = fields.DatetimeField(auto_now_add=True) class Meta: table = "users" # 自定义表名 class Post(Model): id = fields.IntField(pk=True) title = fields.CharField(max_length=200) content = fields.TextField() author: fields.ForeignKeyRelation[User] = fields.ForeignKeyField( "models.User", related_name="posts" ) class Meta: table = "posts"
创建 Pydantic 模型
# app/schemas.py from pydantic import BaseModel from tortoise.contrib.pydantic import pydantic_model_creator # 自动从 Tortoise 模型生成 Pydantic 模型 User_Pydantic = pydantic_model_creator(User, name="User") UserCreate_Pydantic = pydantic_model_creator(User, name="UserCreate", exclude_readonly=True) Post_Pydantic = pydantic_model_creator(Post, name="Post")
实现 CRUD 路由
创建用户
from fastapi import APIRouter, HTTPException from tortoise.exceptions import IntegrityError router = APIRouter() @router.post("/users/", response_model=User_Pydantic) async def create_user(user: UserCreate_Pydantic): try: user_obj = await User.create(**user.dict()) return await User_Pydantic.from_tortoise_orm(user_obj) except IntegrityError: raise HTTPException(status_code=400, detail="用户名已存在")
查询用户及其关联数据
@router.get("/users/{user_id}", response_model=User_Pydantic) async def read_user(user_id: int): user = await User.get_or_none(id=user_id).prefetch_related("posts") if not user: raise HTTPException(status_code=404, detail="用户不存在") return await User_Pydantic.from_tortoise_orm(user)
分页查询
@router.get("/users/", response_model=list[User_Pydantic]) async def list_users(skip: int = 0, limit: int = 100): return await User_Pydantic.from_queryset(User.all().offset(skip).limit(limit))
高级查询与过滤
条件过滤
from tortoise.expressions import Q @router.get("/search/", response_model=list[User_Pydantic]) async def search_users(query: str): return await User_Pydantic.from_queryset( User.filter(Q(username__icontains=query) | Q(email__icontains=query)) )
聚合查询
@router.get("/stats/") async def user_stats(): count = await User.all().count() return {"total_users": count}
数据库迁移(Aerich)
安装 Aerich
pip install aerich
初始化迁移配置
aerich init -t app.main.TORTOISE_ORM # 指定配置路径 aerich init-db # 创建初始迁移
生成迁移文件
修改模型后运行:
aerich migrate --name "add_user_table" aerich upgrade # 应用迁移
完整项目结构
project/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI 入口 │ ├── models.py # Tortoise 模型 │ └── schemas.py # Pydantic 模型 ├── migrations/ # Aerich 迁移文件 ├── pyproject.toml └── aerich.toml # Aerich 配置文件
最佳实践
依赖注入管理事务:
from contextlib import asynccontextmanager from tortoise.transactions import in_transaction @asynccontextmanager async def get_transaction(): async with in_transaction() as tx: yield tx
性能优化:
- 使用.prefetch_related() 避免 N+1 查询问题
- 使用.only() 限制返回字段
User.all().prefetch_related("posts").only("username", "email")
错误处理:
from tortoise.exceptions import DoesNotExist @router.delete("/users/{user_id}") async def delete_user(user_id: int): try: user = await User.get(id=user_id) await user.delete() return {"message": "用户已删除"} except DoesNotExist: raise HTTPException(status_code=404, detail="用户不存在")
FastAPI与databases的集成
Python 的 databases 库是一个异步数据库访问库,旨在简化与关系型数据库的交互,同时支持异步编程模式(如 asyncio)。它的设计目标是提供简单、一致的 API,并与多种数据库后端兼容(如 PostgreSQL、MySQL、SQLite),同时基于 SQLAlchemy Core 的查询构建功能。
核心特性
- 异步支持:原生支持async/await 语法,适合异步框架(如 FastAPI、Starlette)。
- 多数据库兼容:通过不同驱动支持多种数据库:
- PostgreSQL:asyncpg 或 aiopg
- MySQL:aiomysql
- SQLite:aiosqlite
- 轻量级 ORM 替代:不强制使用 ORM,而是允许直接执行 SQL 或使用 SQLAlchemy Core 的声明式查询。
- 连接池管理:自动管理数据库连接池,优化高并发场景的性能。
- 类型安全:支持类型注解,与 Python 的现代类型系统兼容。
安装依赖
pip install fastapi uvicorn databases[postgresql] sqlalchemy
- databases[postgresql]:包含 PostgreSQL 异步驱动asyncpg。
- sqlalchemy:用于表结构定义(databases依赖 SQLAlchemy Core)。
项目结构
. ├── app │ ├── __init__.py │ ├── main.py # FastAPI 主入口 │ ├── database.py # 数据库配置 │ ├── models.py # 数据模型(SQLAlchemy Core) │ └── routers # API 路由 │ └── users.py
配置数据库连接
在 app/database.py 中:
from databases import Database DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb" database = Database(DATABASE_URL)
定义数据模型(SQLAlchemy Core)
在 app/models.py 中:
from sqlalchemy import Table, Column, Integer, String, MetaData metadata = MetaData() users = Table( "users", metadata, Column("id", Integer, primary_key=True), Column("name", String(50)), Column("email", String(100), unique=True), )
创建 FastAPI 应用并集成数据库
在 app/main.py 中:
from fastapi import FastAPI from app.database import database from app.models import metadata app = FastAPI() # 启动时连接数据库 @app.on_event("startup") async def startup(): await database.connect() # 自动创建表(仅示例,生产环境建议用 Alembic 迁移) # 注意:metadata.create_all 是同步方法,需在同步上下文中执行 # 此处仅为演示,实际项目应单独处理表创建 # 关闭时断开连接 @app.on_event("shutdown") async def shutdown(): await database.disconnect() # 引入路由 from app.routers import users app.include_router(users.router)
编写 API 路由
在 app/routers/users.py 中:
from fastapi import APIRouter, HTTPException from app.database import database from app.models import users from pydantic import BaseModel router = APIRouter(prefix="/users", tags=["users"]) # 请求/响应模型 class UserCreate(BaseModel): name: str email: str class UserResponse(UserCreate): id: int # 获取所有用户 @router.get("/", response_model=list[UserResponse]) async def get_all_users(): query = users.select() return await database.fetch_all(query) # 创建用户 @router.post("/", response_model=UserResponse, status_code=201) async def create_user(user: UserCreate): query = users.insert().values(**user.dict()) user_id = await database.execute(query) return {**user.dict(), "id": user_id} # 获取单个用户 @router.get("/{user_id}", response_model=UserResponse) async def get_user(user_id: int): query = users.select().where(users.c.id == user_id) user = await database.fetch_one(query) if not user: raise HTTPException(status_code=404, detail="User not found") return user
运行应用
uvicorn app.main:app --reload
访问 http://localhost:8000/docs 测试 API。
高级用法
事务处理
async def create_user_with_transaction(user: UserCreate): async with database.transaction(): # 执行多个操作(原子性) query = users.insert().values(**user.dict()) user_id = await database.execute(query) # 其他操作... return user_id
原始 SQL 查询
@router.get("/search/") async def search_users(name: str): query = "SELECT * FROM users WHERE name = :name" return await database.fetch_all(query=query, values={"name": name})
连接池配置
# 在 database.py 中调整连接池大小 database = Database(DATABASE_URL, min_size=5, max_size=20)
批量操作
query = users.insert() values = [{"name": "Alice"}, {"name": "Bob"}] await database.execute_many(query, values)
结果集处理:
- fetch_one():返回单条记录。
- fetch_all():返回所有记录。
- execute():执行写操作(INSERT/UPDATE/DELETE)并返回最后一行 ID(如适用)。
错误处理与中间件
在 app/main.py 中添加:
from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # 全局异常处理(示例) @app.exception_handler(HTTPException) async def custom_http_exception_handler(request, exc): return JSONResponse( status_code=exc.status_code, content={"detail": exc.detail}, )
生产环境优化
- 使用环境变量:通过pydantic-settings 管理数据库 URL。
- 连接池监控:定期检查连接池状态(如connection())。
- 异步 Alembic:使用alembic + asyncpg 实现异步数据库迁移。
常见问题
- 驱动安装错误:确保安装了正确的异步驱动(如asyncpg)。
- 同步代码调用异步方法:避免在同步函数中直接调用await,需使用 run() 包装。
- 连接泄漏:确保每个请求结束后释放连接(FastAPI 自动处理)。
通过以上步骤,你可以快速将 FastAPI 与 databases 集成,构建高性能异步 API。根据项目需求选择是否引入 ORM(如 Tortoise-ORM)或保持轻量级 SQL 操作。
SQLAlchemy、Tortoise-ORM还是databases?
在 FastAPI 中选择 SQLAlchemy、Tortoise-ORM 还是 databases,取决于具体需求(如性能、开发体验、是否需要 ORM 功能等)。
SQLAlchemy(同步/异步混合)
- 定位:成熟的 Python ORM 和 SQL 工具包,支持同步和异步(需搭配 databases 库)。
- 特点:
- 功能全面:支持复杂查询、事务、关系模型、迁移(需 Alembic)。
- 同步为主:原生为同步设计,异步需通过databases + asyncpg/aiomysql 等驱动实现。
- 学习曲线陡峭:API 较复杂,适合熟悉 ORM 的开发者。
- 适用场景:
- 需要复杂 ORM 功能(如多表关联、混合属性)。
- 已有基于 SQLAlchemy 的代码,需迁移到 FastAPI。
- 同步和异步混合使用(如后台任务用同步,API 用异步)。
Tortoise-ORM(纯异步 ORM)
- 定位:专为异步设计的 ORM,语法类似 Django ORM。
- 特点:
- 异步原生:直接支持async/await。
- 轻量 ORM:提供模型定义、迁移工具(需aerich)、关系管理。
- FastAPI 集成友好:官方支持 FastAPI 插件(contrib.fastapi)。
- 适用场景:
- 需要完整的异步 ORM(如模型、外键、预加载查询)。
- 快速开发中小型项目,希望减少样板代码。
- 习惯 Django ORM 风格。
databases(纯异步 SQL 工具)
- 定位:轻量级异步数据库访问库,基于 SQLAlchemy Core。
- 特点:
- 无 ORM:直接执行 SQL 或使用 SQLAlchemy Core 查询构造器。
- 灵活性高:适合喜欢手写 SQL 或简单查询的场景。
- 依赖少:需配合其他库(如 Pydantic)实现数据验证。
- 适用场景:
- 不需要复杂 ORM 功能,只需执行原始 SQL 或简单查询。
- 已有 SQLAlchemy Core 查询逻辑,需异步化。
- 追求极致轻量,避免 ORM 开销。
对比总结
维度 | SQLAlchemy + databases | Tortoise-ORM | databases |
异步支持 | 是(需异步驱动) | 是(原生) | 是(原生) |
ORM 功能 | 全面(复杂查询、关系) | 完整(轻量 ORM) | 无(需手写 SQL 或 SQLAlchemy Core) |
学习曲线 | 高(需掌握 SQLAlchemy) | 中(类似 Django ORM) | 低(直接执行 SQL) |
性能 | 高(连接池优化) | 中(ORM 有开销) | 高(无 ORM 开销) |
适用项目规模 | 中大型(复杂业务逻辑) | 中小型(快速开发) | 小型或微服务(轻量查询) |
迁移工具 | 需 Alembic | 内置 Aerich | 无(需手动处理) |
选择建议
- 需要完整 ORM 且纯异步 → Tortoise-ORM
- 适合快速开发、模型关系复杂的项目,如用户管理系统、博客平台。
- 复杂查询 + 异步/同步混合 → SQLAlchemy + databases
- 适合企业级应用(如电商后台),需处理事务、复杂连接查询。
- 轻量级 + 直接控制 SQL → databases
- 适合微服务、API 网关等简单查询场景,如缓存层、数据分析接口。
最终推荐组合
- 大多数 FastAPI 项目:Tortoise-ORM(平衡开发效率和异步性能) + Pydantic(数据验证)。
- 高性能复杂场景:databases(异步 SQL) + SQLAlchemy Core(查询构造) + Pydantic(验证)。
- 遗留系统迁移:保持 SQLAlchemy(同步) + 逐步异步化部分接口。
FastAPI 数据库连接池管理
在 FastAPI 中管理数据库连接池是确保应用高性能和稳定性的关键步骤。FastAPI 中数据库连接池管理的核心步骤:
- 配置连接池参数:根据负载调整pool_size、max_overflow 等。
- 依赖注入管理会话:使用yield 或 async with 确保连接释放。
- 异步支持:选择适合的异步驱动(如asyncpg、aiomysql)。
- 监控与调优:通过日志和数据库监控工具优化连接池行为。
以下是详细的数据库连接池管理方法,以 SQLAlchemy(同步/异步)和 Tortoise-ORM(异步)为例:
连接池的核心概念
- 连接池的作用:复用数据库连接,避免频繁创建/关闭连接的开销。
- 关键参数:
- pool_size:连接池保持的最小连接数。
- max_overflow:允许超出pool_size 的最大临时连接数。
- pool_recycle:连接自动回收时间(秒),避免数据库主动断开空闲连接。
- pool_timeout:获取连接的超时时间(秒)。
使用 SQLAlchemy 管理同步连接池
配置连接池
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # 连接池配置 DATABASE_URL = "postgresql://user:password@localhost/dbname" engine = create_engine( DATABASE_URL, pool_size=10, # 最小连接数 max_overflow=5, # 最大临时连接数 pool_recycle=1800, # 30 分钟回收连接 pool_timeout=30, # 获取连接超时时间 ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
依赖注入管理会话
from fastapi import Depends from sqlalchemy.orm import Session def get_db(): db = SessionLocal() try: yield db # 使用生成器确保会话关闭 finally: db.close() @app.get("/items/{item_id}") def read_item(item_id: int, db: Session = Depends(get_db)): item = db.query(Item).filter(Item.id == item_id).first() return item
使用 SQLAlchemy 管理异步连接池
配置异步引擎
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname" async_engine = create_async_engine( DATABASE_URL, pool_size=10, max_overflow=5, pool_recycle=1800, pool_timeout=30, ) AsyncSessionLocal = sessionmaker( bind=async_engine, class_=AsyncSession, expire_on_commit=False )
异步依赖注入
async def get_async_db(): async with AsyncSessionLocal() as db: yield db @app.get("/async-items/{item_id}") async def read_async_item(item_id: int, db: AsyncSession = Depends(get_async_db)): result = await db.execute(select(Item).filter(Item.id == item_id)) item = result.scalars().first() return item
使用 Tortoise-ORM 管理异步连接池
配置 Tortoise-ORM
from fastapi import FastAPI from tortoise.contrib.fastapi import register_tortoise app = FastAPI() register_tortoise( app, db_url="postgres://user:password@localhost/dbname", modules={"models": ["app.models"]}, generate_schemas=True, # 自动生成表结构(仅开发环境) add_exception_handlers=True, config={ "connections": { "default": { "engine": "tortoise.backends.asyncpg", "credentials": { "host": "localhost", "port": "5432", "user": "user", "password": "password", "database": "dbname", }, "pool_size": 10, # 连接池大小 "max_overflow": 5, # 最大溢出连接数 "pool_recycle": 1800, # 连接回收时间 } } } )
使用依赖注入
from tortoise.transactions import in_transaction @app.post("/items/") async def create_item(item_data: dict): async with in_transaction() as db: item = await Item.create(**item_data, using_db=db) return item
连接池最佳实践
参数调优建议
参数 | 推荐值 | 说明 |
pool_size | 5-20 | 根据并发量调整,避免过大占用数据库资源。 |
max_overflow | pool_size * 0.5 | 突发流量时允许临时扩容。 |
pool_recycle | 1800(30分钟) | 防止数据库主动断开空闲连接(如 MySQL 默认 8 小时断开)。 |
pool_timeout | 30 | 避免长时间等待连接导致请求堆积。 |
监控与调试
监控连接数:
-- PostgreSQL SELECT * FROM pg_stat_activity; -- MySQL SHOW STATUS LIKE 'Threads_connected';
日志跟踪:
import logging logging.basicConfig() logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG)
避免常见问题
- 连接泄漏:确保每次操作后关闭会话(使用yield 或 async with)。
- 长事务:尽量缩短事务持有时间,避免占用连接过久。
- 连接耗尽:合理设置pool_size 和 max_overflow,结合负载测试调整。
FastAPI CRUD操作封装
在 FastAPI 中,通过封装 CRUD(Create, Read, Update, Delete)操作可以提高代码复用性和可维护性。通过封装你可以获得以下优势:
- 代码复用:所有模型的 CRUD 操作继承基类逻辑
- 类型安全:利用 Python 类型提示和 Pydantic 模型
- 灵活扩展:每个模型可自定义额外方法(如get_by_email)
- 事务安全:通过上下文管理器确保原子性操作
- API 一致性:统一的分页、错误响应格式
以下是详细的 CRUD 封装方案,支持同步和异步模式,并兼容 Pydantic 模型与 SQLAlchemy ORM。
基础 CRUD 类封装
定义通用 CRUD 基类
# crud/base.py from typing import Any, Generic, TypeVar, Optional from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy import update, delete ModelType = TypeVar("ModelType") # SQLAlchemy 模型类型 CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) # 创建请求模型 UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel) # 更新请求模型 class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]): def __init__(self, model: ModelType): self.model = model async def get(self, db: AsyncSession, id: Any) -> Optional[ModelType]: """根据 ID 查询单条记录""" result = await db.execute(select(self.model).filter(self.model.id == id)) return result.scalars().first() async def get_multi( self, db: AsyncSession, skip: int = 0, limit: int = 100, **filters: Any ) -> list[ModelType]: """分页查询 + 过滤""" query = select(self.model) # 动态添加过滤条件 for field, value in filters.items(): if hasattr(self.model, field): query = query.filter(getattr(self.model, field) == value) result = await db.execute(query.offset(skip).limit(limit)) return result.scalars().all() async def create(self, db: AsyncSession, obj_in: CreateSchemaType) -> ModelType: """创建记录""" db_obj = self.model(**obj_in.dict()) # 假设 Pydantic 模型与 ORM 模型字段一致 db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj async def update( self, db: AsyncSession, db_obj: ModelType, obj_in: UpdateSchemaType | dict[str, Any] ) -> ModelType: """更新记录""" if isinstance(obj_in, dict): update_data = obj_in else: update_data = obj_in.dict(exclude_unset=True) await db.execute( update(self.model) .where(self.model.id == db_obj.id) .values(**update_data) ) await db.commit() await db.refresh(db_obj) return db_obj async def delete(self, db: AsyncSession, id: Any) -> None: """删除记录""" await db.execute(delete(self.model).where(self.model.id == id)) await db.commit()
自定义模型 CRUD 类
# crud/user.py from app.models.user import User # SQLAlchemy 模型 from app.schemas.user import UserCreate, UserUpdate # Pydantic 模型 from .base import CRUDBase class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]): async def get_by_email(self, db: AsyncSession, email: str) -> Optional[User]: """根据邮箱查询用户(自定义方法)""" result = await db.execute(select(User).filter(User.email == email)) return result.scalars().first() user_crud = CRUDUser(User) # 实例化,供路由直接调用
Pydantic 模型与 ORM 模型映射
定义 Pydantic 模型
# schemas/user.py from pydantic import BaseModel, EmailStr class UserBase(BaseModel): email: EmailStr name: str | None = None class UserCreate(UserBase): password: str class UserUpdate(UserBase): password: str | None = None class UserInDB(UserBase): id: int is_active: bool class Config: orm_mode = True # 允许从 ORM 对象转换
SQLAlchemy 模型
# models/user.py from sqlalchemy import Column, Integer, String, Boolean from app.database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String(100), unique=True, index=True) name = Column(String(50)) password = Column(String(100)) is_active = Column(Boolean, default=True)
路由层调用封装
定义路由依赖
# api/deps.py from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_async_db async def get_user_crud(db: AsyncSession = Depends(get_async_db)): from app.crud.user import user_crud return user_crud, db # 返回 CRUD 实例和数据库会话
路由实现
# api/routers/user.py from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from app.schemas.user import UserCreate, UserUpdate, UserInDB from app.crud.user import user_crud from .deps import get_user_crud router = APIRouter() @router.post("/", response_model=UserInDB) async def create_user( user_in: UserCreate, crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud) ): crud, db = crud_and_db existing_user = await crud.get_by_email(db, email=user_in.email) if existing_user: raise HTTPException(status_code=400, detail="Email already registered") return await crud.create(db, obj_in=user_in) @router.get("/{user_id}", response_model=UserInDB) async def read_user( user_id: int, crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud) ): crud, db = crud_and_db user = await crud.get(db, id=user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user @router.put("/{user_id}", response_model=UserInDB) async def update_user( user_id: int, user_in: UserUpdate, crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud) ): crud, db = crud_and_db user = await crud.get(db, id=user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return await crud.update(db, db_obj=user, obj_in=user_in)
高级封装技巧
事务管理装饰器
from contextlib import asynccontextmanager from sqlalchemy.ext.asyncio import AsyncSession @asynccontextmanager async def transactional(db: AsyncSession): try: yield await db.commit() except Exception as e: await db.rollback() raise e # 使用示例 async def create_user_with_transaction(user_in: UserCreate, db: AsyncSession): async with transactional(db): user = User(**user_in.dict()) db.add(user) await db.flush() # 获取生成的 ID # 其他操作...
分页响应模型
from pydantic.generics import GenericModel from typing import Generic, TypeVar, Sequence T = TypeVar("T") class PaginatedResponse(GenericModel, Generic[T]): data: Sequence[T] total: int skip: int limit: int # 路由中使用 @router.get("/", response_model=PaginatedResponse[UserInDB]) async def read_users( skip: int = 0, limit: int = 100, crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud) ): crud, db = crud_and_db users = await crud.get_multi(db, skip=skip, limit=limit) total = await crud.count(db) return {"data": users, "total": total, "skip": skip, "limit": limit}
完整项目结构参考
project/ ├── app/ │ ├── crud/ │ │ ├── base.py # CRUD 基类 │ │ └── user.py # 用户 CRUD 实现 │ ├── models/ │ │ └── user.py # SQLAlchemy 模型 │ ├── schemas/ │ │ └── user.py # Pydantic 模型 │ ├── api/ │ │ ├── deps.py # 依赖注入 │ │ └── routers/ # 路由文件 │ └── database.py # 数据库配置 └── main.py # FastAPI 入口
性能优化建议
批量操作:实现 bulk_create 方法
async def bulk_create(self, db: AsyncSession, objs_in: list[CreateSchemaType]): db_objs = [self.model(**obj.dict()) for obj in objs_in] db.add_all(db_objs) await db.commit()
预加载关联数据:使用 selectinload
from sqlalchemy.orm import selectinload async def get_with_relations(self, db: AsyncSession, id: int): result = await db.execute( select(self.model) .options(selectinload(self.model.addresses)) .filter(self.model.id == id) ) return result.scalars().first()
缓存机制:集成 Redis 缓存高频查询结果