术→技巧, 研发

FastAPI学习之数据库连接

钱魏Way · · 85 次浏览

在 FastAPI 中,连接数据库通常使用 SQLAlchemy(适用于关系型数据库)或 Tortoise-ORM(异步 ORM)。以下是基于 SQLAlchemy 和 Tortoise-ORM 的常见方法:

目录

FastAPI 与 SQLAlchemy 的集成

FastAPI 与 SQLAlchemy 集成可以实现高效的数据库操作,支持同步和异步两种模式。FastAPI 与 SQLAlchemy 集成关键点:

  • 同步模式:适合简单应用,直接使用Session。
  • 异步模式:需使用AsyncSession 和异步驱动(如 asyncpg)。
  • 依赖注入:通过Depends 管理数据库会话生命周期。
  • 迁移工具:使用 Alembic 管理数据库版本。

安装依赖

# 同步模式
pip install sqlalchemy fastapi uvicorn

# 异步模式(需 Python 3.7+)
pip install sqlalchemy[asyncio] fastapi uvicorn asyncpg

同步模式集成

配置数据库连接

# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"

# 创建引擎
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    pool_size=10,          # 连接池大小
    max_overflow=5,        # 最大溢出连接数
    pool_recycle=1800      # 连接回收时间(秒)
)

# 会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 声明基类
Base = declarative_base()

定义数据模型

# models.py
from sqlalchemy import Column, Integer, String
from database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50))
    email = Column(String(100), unique=True)

依赖注入管理会话

from fastapi import Depends
from sqlalchemy.orm import Session

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.post("/users/")
def create_user(name: str, email: str, db: Session = Depends(get_db)):
    user = User(name=name, email=email)
    db.add(user)
    db.commit()
    db.refresh(user)
    return user

异步模式集成

配置异步数据库连接

# async_database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

SQLALCHEMY_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

# 创建异步引擎
async_engine = create_async_engine(
    SQLALCHEMY_DATABASE_URL,
    pool_size=10,
    max_overflow=5,
    pool_recycle=1800
)

# 异步会话工厂
AsyncSessionLocal = sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

Base = declarative_base()

定义异步依赖

async def get_async_db():
    async with AsyncSessionLocal() as db:
        yield db

@app.post("/async-users/")
async def create_async_user(
    name: str, 
    email: str, 
    db: AsyncSession = Depends(get_async_db)
):
    user = User(name=name, email=email)
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

数据库迁移(Alembic)

安装 Alembic

pip install alembic

初始化迁移环境

alembic init migrations

配置 alembic.ini

sqlalchemy.url = postgresql://user:password@localhost/dbname

修改 migrations/env.py

from models import Base  # 导入模型
target_metadata = Base.metadata

生成迁移脚本

alembic revision --autogenerate -m "Initial migration"
alembic upgrade head

最佳实践

会话生命周期管理:

  • 使用yield 或 async with 确保会话正确关闭。
  • 避免在全局范围长期持有会话。

事务管理:

async def update_user(user_id: int, new_name: str, db: AsyncSession):
    async with db.begin():
        user = await db.get(User, user_id)
        user.name = new_name
        await db.commit()

性能优化:

  • 批量操作代替逐条提交。
  • 使用selectinload 避免 N+1 查询问题:
from sqlalchemy.orm import selectinload

stmt = select(User).options(selectinload(User.addresses))
result = await db.execute(stmt)

FastAPI与 Tortoise-ORM的集成

在 FastAPI 中集成 Tortoise-ORM(专为异步设计的 Python ORM)可以实现高效的异步数据库操作。

FastAPI + Tortoise-ORM 集成优势:

  • 全异步支持:适合高并发 I/O 密集型应用
  • 简洁的模型定义:类似 Django ORM 的语法
  • 自动 OpenAPI 文档:通过 Pydantic 模型生成
  • 迁移工具集成:Aerich 提供数据库版本管理

安装依赖

pip install fastapi uvicorn tortoise-orm asyncpg  # PostgreSQL 示例
# 或 MySQL:pip install aiomysql

配置 Tortoise-ORM

在 FastAPI 中初始化 Tortoise

# main.py
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise

app = FastAPI()

# Tortoise 配置
TORTOISE_ORM = {
    "connections": {
        "default": "postgres://user:password@localhost/dbname"  # PostgreSQL 示例
        # MySQL 示例: "mysql://user:password@localhost/dbname"
    },
    "apps": {
        "models": {
            "models": ["app.models", "aerich.models"],  # 模型文件路径 + Aerich 迁移模型
            "default_connection": "default",
        }
    },
    "use_tz": True,  # 启用时区支持
    "timezone": "Asia/Shanghai",
}

# 注册 Tortoise 到 FastAPI
register_tortoise(
    app,
    config=TORTOISE_ORM,
    generate_schemas=True,  # 自动创建表(仅开发环境)
    add_exception_handlers=True,  # 启用 ORM 异常处理
)

定义数据模型

# app/models.py
from tortoise.models import Model
from tortoise import fields

class User(Model):
    id = fields.IntField(pk=True)
    username = fields.CharField(max_length=50, unique=True)
    email = fields.CharField(max_length=100)
    created_at = fields.DatetimeField(auto_now_add=True)

    class Meta:
        table = "users"  # 自定义表名

class Post(Model):
    id = fields.IntField(pk=True)
    title = fields.CharField(max_length=200)
    content = fields.TextField()
    author: fields.ForeignKeyRelation[User] = fields.ForeignKeyField(
        "models.User", related_name="posts"
    )

    class Meta:
        table = "posts"

创建 Pydantic 模型

# app/schemas.py
from pydantic import BaseModel
from tortoise.contrib.pydantic import pydantic_model_creator

# 自动从 Tortoise 模型生成 Pydantic 模型
User_Pydantic = pydantic_model_creator(User, name="User")
UserCreate_Pydantic = pydantic_model_creator(User, name="UserCreate", exclude_readonly=True)
Post_Pydantic = pydantic_model_creator(Post, name="Post")

实现 CRUD 路由

创建用户

from fastapi import APIRouter, HTTPException
from tortoise.exceptions import IntegrityError

router = APIRouter()

@router.post("/users/", response_model=User_Pydantic)
async def create_user(user: UserCreate_Pydantic):
    try:
        user_obj = await User.create(**user.dict())
        return await User_Pydantic.from_tortoise_orm(user_obj)
    except IntegrityError:
        raise HTTPException(status_code=400, detail="用户名已存在")

查询用户及其关联数据

@router.get("/users/{user_id}", response_model=User_Pydantic)
async def read_user(user_id: int):
    user = await User.get_or_none(id=user_id).prefetch_related("posts")
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return await User_Pydantic.from_tortoise_orm(user)

分页查询

@router.get("/users/", response_model=list[User_Pydantic])
async def list_users(skip: int = 0, limit: int = 100):
    return await User_Pydantic.from_queryset(User.all().offset(skip).limit(limit))

高级查询与过滤

条件过滤

from tortoise.expressions import Q

@router.get("/search/", response_model=list[User_Pydantic])
async def search_users(query: str):
    return await User_Pydantic.from_queryset(
        User.filter(Q(username__icontains=query) | Q(email__icontains=query))
    )

聚合查询

@router.get("/stats/")
async def user_stats():
    count = await User.all().count()
    return {"total_users": count}

数据库迁移(Aerich

安装 Aerich

pip install aerich

初始化迁移配置

aerich init -t app.main.TORTOISE_ORM  # 指定配置路径
aerich init-db  # 创建初始迁移

生成迁移文件

修改模型后运行:

aerich migrate --name "add_user_table"
aerich upgrade  # 应用迁移

完整项目结构

project/
├── app/
│   ├── __init__.py
│   ├── main.py         # FastAPI 入口
│   ├── models.py       # Tortoise 模型
│   └── schemas.py      # Pydantic 模型
├── migrations/         # Aerich 迁移文件
├── pyproject.toml
└── aerich.toml         # Aerich 配置文件

最佳实践

依赖注入管理事务:

from contextlib import asynccontextmanager
from tortoise.transactions import in_transaction

@asynccontextmanager
async def get_transaction():
    async with in_transaction() as tx:
        yield tx

性能优化:

  • 使用.prefetch_related() 避免 N+1 查询问题
  • 使用.only() 限制返回字段
User.all().prefetch_related("posts").only("username", "email")

错误处理:

from tortoise.exceptions import DoesNotExist

@router.delete("/users/{user_id}")
async def delete_user(user_id: int):
    try:
        user = await User.get(id=user_id)
        await user.delete()
        return {"message": "用户已删除"}
    except DoesNotExist:
        raise HTTPException(status_code=404, detail="用户不存在")

FastAPI与databases的集成

Python 的 databases 库是一个异步数据库访问库,旨在简化与关系型数据库的交互,同时支持异步编程模式(如 asyncio)。它的设计目标是提供简单、一致的 API,并与多种数据库后端兼容(如 PostgreSQL、MySQL、SQLite),同时基于 SQLAlchemy Core 的查询构建功能。

核心特性

  • 异步支持:原生支持async/await 语法,适合异步框架(如 FastAPI、Starlette)。
  • 多数据库兼容:通过不同驱动支持多种数据库:
    • PostgreSQL:asyncpg 或 aiopg
    • MySQL:aiomysql
    • SQLite:aiosqlite
  • 轻量级 ORM 替代:不强制使用 ORM,而是允许直接执行 SQL 或使用 SQLAlchemy Core 的声明式查询。
  • 连接池管理:自动管理数据库连接池,优化高并发场景的性能。
  • 类型安全:支持类型注解,与 Python 的现代类型系统兼容。

安装依赖

pip install fastapi uvicorn databases[postgresql] sqlalchemy
  • databases[postgresql]:包含 PostgreSQL 异步驱动asyncpg。
  • sqlalchemy:用于表结构定义(databases依赖 SQLAlchemy Core)。

项目结构

.
├── app
│   ├── __init__.py
│   ├── main.py          # FastAPI 主入口
│   ├── database.py      # 数据库配置
│   ├── models.py        # 数据模型(SQLAlchemy Core)
│   └── routers          # API 路由
│       └── users.py

配置数据库连接

在 app/database.py 中:

from databases import Database

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
database = Database(DATABASE_URL)

定义数据模型(SQLAlchemy Core)

在 app/models.py 中:

from sqlalchemy import Table, Column, Integer, String, MetaData

metadata = MetaData()

users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String(50)),
    Column("email", String(100), unique=True),
)

创建 FastAPI 应用并集成数据库

在 app/main.py 中:

from fastapi import FastAPI
from app.database import database
from app.models import metadata

app = FastAPI()

# 启动时连接数据库
@app.on_event("startup")
async def startup():
    await database.connect()
    # 自动创建表(仅示例,生产环境建议用 Alembic 迁移)
    # 注意:metadata.create_all 是同步方法,需在同步上下文中执行
    # 此处仅为演示,实际项目应单独处理表创建

# 关闭时断开连接
@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

# 引入路由
from app.routers import users
app.include_router(users.router)

编写 API 路由

在 app/routers/users.py 中:

from fastapi import APIRouter, HTTPException
from app.database import database
from app.models import users
from pydantic import BaseModel

router = APIRouter(prefix="/users", tags=["users"])

# 请求/响应模型
class UserCreate(BaseModel):
    name: str
    email: str

class UserResponse(UserCreate):
    id: int

# 获取所有用户
@router.get("/", response_model=list[UserResponse])
async def get_all_users():
    query = users.select()
    return await database.fetch_all(query)

# 创建用户
@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    query = users.insert().values(**user.dict())
    user_id = await database.execute(query)
    return {**user.dict(), "id": user_id}

# 获取单个用户
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    query = users.select().where(users.c.id == user_id)
    user = await database.fetch_one(query)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

运行应用

uvicorn app.main:app --reload

访问 http://localhost:8000/docs 测试 API。

高级用法

事务处理

async def create_user_with_transaction(user: UserCreate):
    async with database.transaction():
        # 执行多个操作(原子性)
        query = users.insert().values(**user.dict())
        user_id = await database.execute(query)
        # 其他操作...
        return user_id

原始 SQL 查询

@router.get("/search/")
async def search_users(name: str):
    query = "SELECT * FROM users WHERE name = :name"
    return await database.fetch_all(query=query, values={"name": name})

连接池配置

# 在 database.py 中调整连接池大小
database = Database(DATABASE_URL, min_size=5, max_size=20)

批量操作

query = users.insert()
values = [{"name": "Alice"}, {"name": "Bob"}]
await database.execute_many(query, values)

结果集处理:

  • fetch_one():返回单条记录。
  • fetch_all():返回所有记录。
  • execute():执行写操作(INSERT/UPDATE/DELETE)并返回最后一行 ID(如适用)。

错误处理与中间件

在 app/main.py 中添加:

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# 全局异常处理(示例)
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc):
    return JSONResponse(
        status_code=exc.status_code,
        content={"detail": exc.detail},
    )

生产环境优化

  • 使用环境变量:通过pydantic-settings 管理数据库 URL。
  • 连接池监控:定期检查连接池状态(如connection())。
  • 异步 Alembic:使用alembic + asyncpg 实现异步数据库迁移。

常见问题

  • 驱动安装错误:确保安装了正确的异步驱动(如asyncpg)。
  • 同步代码调用异步方法:避免在同步函数中直接调用await,需使用 run() 包装。
  • 连接泄漏:确保每个请求结束后释放连接(FastAPI 自动处理)。

通过以上步骤,你可以快速将 FastAPI 与 databases 集成,构建高性能异步 API。根据项目需求选择是否引入 ORM(如 Tortoise-ORM)或保持轻量级 SQL 操作。

SQLAlchemy、Tortoise-ORM还是databases?

在 FastAPI 中选择 SQLAlchemyTortoise-ORM 还是 databases,取决于具体需求(如性能、开发体验、是否需要 ORM 功能等)。

SQLAlchemy(同步/异步混合)

  • 定位:成熟的 Python ORM 和 SQL 工具包,支持同步和异步(需搭配 databases 库)。
  • 特点
    • 功能全面:支持复杂查询、事务、关系模型、迁移(需 Alembic)。
    • 同步为主:原生为同步设计,异步需通过databases + asyncpg/aiomysql 等驱动实现。
    • 学习曲线陡峭:API 较复杂,适合熟悉 ORM 的开发者。
  • 适用场景
    • 需要复杂 ORM 功能(如多表关联、混合属性)。
    • 已有基于 SQLAlchemy 的代码,需迁移到 FastAPI。
    • 同步和异步混合使用(如后台任务用同步,API 用异步)。

Tortoise-ORM(纯异步 ORM)

  • 定位:专为异步设计的 ORM,语法类似 Django ORM。
  • 特点
    • 异步原生:直接支持async/await。
    • 轻量 ORM:提供模型定义、迁移工具(需aerich)、关系管理。
    • FastAPI 集成友好:官方支持 FastAPI 插件(contrib.fastapi)。
  • 适用场景
    • 需要完整的异步 ORM(如模型、外键、预加载查询)。
    • 快速开发中小型项目,希望减少样板代码。
    • 习惯 Django ORM 风格。

databases(纯异步 SQL 工具)

  • 定位:轻量级异步数据库访问库,基于 SQLAlchemy Core。
  • 特点
    • 无 ORM:直接执行 SQL 或使用 SQLAlchemy Core 查询构造器。
    • 灵活性高:适合喜欢手写 SQL 或简单查询的场景。
    • 依赖少:需配合其他库(如 Pydantic)实现数据验证。
  • 适用场景
    • 不需要复杂 ORM 功能,只需执行原始 SQL 或简单查询。
    • 已有 SQLAlchemy Core 查询逻辑,需异步化。
    • 追求极致轻量,避免 ORM 开销。

对比总结

维度 SQLAlchemy + databases Tortoise-ORM databases
异步支持 是(需异步驱动) 是(原生) 是(原生)
ORM 功能 全面(复杂查询、关系) 完整(轻量 ORM) 无(需手写 SQL 或 SQLAlchemy Core)
学习曲线 高(需掌握 SQLAlchemy) 中(类似 Django ORM) 低(直接执行 SQL)
性能 高(连接池优化) 中(ORM 有开销) 高(无 ORM 开销)
适用项目规模 中大型(复杂业务逻辑) 中小型(快速开发) 小型或微服务(轻量查询)
迁移工具 需 Alembic 内置 Aerich 无(需手动处理)

选择建议

  • 需要完整 ORM 且纯异步Tortoise-ORM
    • 适合快速开发、模型关系复杂的项目,如用户管理系统、博客平台。
  • 复杂查询 + 异步/同步混合SQLAlchemy + databases
    • 适合企业级应用(如电商后台),需处理事务、复杂连接查询。
  • 轻量级 + 直接控制 SQLdatabases
    • 适合微服务、API 网关等简单查询场景,如缓存层、数据分析接口。

最终推荐组合

  • 大多数 FastAPI 项目Tortoise-ORM(平衡开发效率和异步性能) + Pydantic(数据验证)。
  • 高性能复杂场景databases(异步 SQL) + SQLAlchemy Core(查询构造) + Pydantic(验证)。
  • 遗留系统迁移:保持 SQLAlchemy(同步) + 逐步异步化部分接口。

FastAPI 数据库连接池管理

在 FastAPI 中管理数据库连接池是确保应用高性能和稳定性的关键步骤。FastAPI 中数据库连接池管理的核心步骤:

  • 配置连接池参数:根据负载调整pool_size、max_overflow 等。
  • 依赖注入管理会话:使用yield 或 async with 确保连接释放。
  • 异步支持:选择适合的异步驱动(如asyncpg、aiomysql)。
  • 监控与调优:通过日志和数据库监控工具优化连接池行为。

以下是详细的数据库连接池管理方法,以 SQLAlchemy(同步/异步)和 Tortoise-ORM(异步)为例:

连接池的核心概念

  • 连接池的作用:复用数据库连接,避免频繁创建/关闭连接的开销。
  • 关键参数:
    • pool_size:连接池保持的最小连接数。
    • max_overflow:允许超出pool_size 的最大临时连接数。
    • pool_recycle:连接自动回收时间(秒),避免数据库主动断开空闲连接。
    • pool_timeout:获取连接的超时时间(秒)。

使用 SQLAlchemy 管理同步连接池

配置连接池

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 连接池配置
DATABASE_URL = "postgresql://user:password@localhost/dbname"

engine = create_engine(
    DATABASE_URL,
    pool_size=10,          # 最小连接数
    max_overflow=5,        # 最大临时连接数
    pool_recycle=1800,     # 30 分钟回收连接
    pool_timeout=30,       # 获取连接超时时间
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

依赖注入管理会话

from fastapi import Depends
from sqlalchemy.orm import Session

def get_db():
    db = SessionLocal()
    try:
        yield db  # 使用生成器确保会话关闭
    finally:
        db.close()

@app.get("/items/{item_id}")
def read_item(item_id: int, db: Session = Depends(get_db)):
    item = db.query(Item).filter(Item.id == item_id).first()
    return item

使用 SQLAlchemy 管理异步连接池

配置异步引擎

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

async_engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=5,
    pool_recycle=1800,
    pool_timeout=30,
)

AsyncSessionLocal = sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

异步依赖注入

async def get_async_db():
    async with AsyncSessionLocal() as db:
        yield db

@app.get("/async-items/{item_id}")
async def read_async_item(item_id: int, db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(Item).filter(Item.id == item_id))
    item = result.scalars().first()
    return item

使用 Tortoise-ORM 管理异步连接池

配置 Tortoise-ORM

from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise

app = FastAPI()

register_tortoise(
    app,
    db_url="postgres://user:password@localhost/dbname",
    modules={"models": ["app.models"]},
    generate_schemas=True,  # 自动生成表结构(仅开发环境)
    add_exception_handlers=True,
    config={
        "connections": {
            "default": {
                "engine": "tortoise.backends.asyncpg",
                "credentials": {
                    "host": "localhost",
                    "port": "5432",
                    "user": "user",
                    "password": "password",
                    "database": "dbname",
                },
                "pool_size": 10,      # 连接池大小
                "max_overflow": 5,    # 最大溢出连接数
                "pool_recycle": 1800, # 连接回收时间
            }
        }
    }
)

使用依赖注入

from tortoise.transactions import in_transaction

@app.post("/items/")
async def create_item(item_data: dict):
    async with in_transaction() as db:
        item = await Item.create(**item_data, using_db=db)
        return item

连接池最佳实践

参数调优建议

参数 推荐值 说明
pool_size 5-20 根据并发量调整,避免过大占用数据库资源。
max_overflow pool_size * 0.5 突发流量时允许临时扩容。
pool_recycle 1800(30分钟) 防止数据库主动断开空闲连接(如 MySQL 默认 8 小时断开)。
pool_timeout 30 避免长时间等待连接导致请求堆积。

监控与调试

监控连接数:

-- PostgreSQL
SELECT * FROM pg_stat_activity;

-- MySQL
SHOW STATUS LIKE 'Threads_connected';

日志跟踪:

import logging

logging.basicConfig()
logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG)

避免常见问题

  • 连接泄漏:确保每次操作后关闭会话(使用yield 或 async with)。
  • 长事务:尽量缩短事务持有时间,避免占用连接过久。
  • 连接耗尽:合理设置pool_size 和 max_overflow,结合负载测试调整。

FastAPI CRUD操作封装

在 FastAPI 中,通过封装 CRUD(Create, Read, Update, Delete)操作可以提高代码复用性和可维护性。通过封装你可以获得以下优势:

  • 代码复用:所有模型的 CRUD 操作继承基类逻辑
  • 类型安全:利用 Python 类型提示和 Pydantic 模型
  • 灵活扩展:每个模型可自定义额外方法(如get_by_email)
  • 事务安全:通过上下文管理器确保原子性操作
  • API 一致性:统一的分页、错误响应格式

以下是详细的 CRUD 封装方案,支持同步和异步模式,并兼容 Pydantic 模型与 SQLAlchemy ORM。

基础 CRUD 类封装

定义通用 CRUD 基类

# crud/base.py
from typing import Any, Generic, TypeVar, Optional
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update, delete

ModelType = TypeVar("ModelType")          # SQLAlchemy 模型类型
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)  # 创建请求模型
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)  # 更新请求模型

class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
    def __init__(self, model: ModelType):
        self.model = model

    async def get(self, db: AsyncSession, id: Any) -> Optional[ModelType]:
        """根据 ID 查询单条记录"""
        result = await db.execute(select(self.model).filter(self.model.id == id))
        return result.scalars().first()

    async def get_multi(
        self, 
        db: AsyncSession, 
        skip: int = 0, 
        limit: int = 100,
        **filters: Any
    ) -> list[ModelType]:
        """分页查询 + 过滤"""
        query = select(self.model)
        # 动态添加过滤条件
        for field, value in filters.items():
            if hasattr(self.model, field):
                query = query.filter(getattr(self.model, field) == value)
        result = await db.execute(query.offset(skip).limit(limit))
        return result.scalars().all()

    async def create(self, db: AsyncSession, obj_in: CreateSchemaType) -> ModelType:
        """创建记录"""
        db_obj = self.model(**obj_in.dict())  # 假设 Pydantic 模型与 ORM 模型字段一致
        db.add(db_obj)
        await db.commit()
        await db.refresh(db_obj)
        return db_obj

    async def update(
        self, 
        db: AsyncSession, 
        db_obj: ModelType,
        obj_in: UpdateSchemaType | dict[str, Any]
    ) -> ModelType:
        """更新记录"""
        if isinstance(obj_in, dict):
            update_data = obj_in
        else:
            update_data = obj_in.dict(exclude_unset=True)
        
        await db.execute(
            update(self.model)
            .where(self.model.id == db_obj.id)
            .values(**update_data)
        )
        await db.commit()
        await db.refresh(db_obj)
        return db_obj

    async def delete(self, db: AsyncSession, id: Any) -> None:
        """删除记录"""
        await db.execute(delete(self.model).where(self.model.id == id))
        await db.commit()

自定义模型 CRUD

# crud/user.py
from app.models.user import User  # SQLAlchemy 模型
from app.schemas.user import UserCreate, UserUpdate  # Pydantic 模型
from .base import CRUDBase

class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
    async def get_by_email(self, db: AsyncSession, email: str) -> Optional[User]:
        """根据邮箱查询用户(自定义方法)"""
        result = await db.execute(select(User).filter(User.email == email))
        return result.scalars().first()

user_crud = CRUDUser(User)  # 实例化,供路由直接调用

Pydantic 模型与 ORM 模型映射

定义 Pydantic 模型

# schemas/user.py
from pydantic import BaseModel, EmailStr

class UserBase(BaseModel):
    email: EmailStr
    name: str | None = None

class UserCreate(UserBase):
    password: str

class UserUpdate(UserBase):
    password: str | None = None

class UserInDB(UserBase):
    id: int
    is_active: bool

    class Config:
        orm_mode = True  # 允许从 ORM 对象转换

SQLAlchemy 模型

# models/user.py
from sqlalchemy import Column, Integer, String, Boolean
from app.database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(100), unique=True, index=True)
    name = Column(String(50))
    password = Column(String(100))
    is_active = Column(Boolean, default=True)

路由层调用封装

定义路由依赖

# api/deps.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_async_db

async def get_user_crud(db: AsyncSession = Depends(get_async_db)):
    from app.crud.user import user_crud
    return user_crud, db  # 返回 CRUD 实例和数据库会话

路由实现

# api/routers/user.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.user import UserCreate, UserUpdate, UserInDB
from app.crud.user import user_crud
from .deps import get_user_crud

router = APIRouter()

@router.post("/", response_model=UserInDB)
async def create_user(
    user_in: UserCreate,
    crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
    crud, db = crud_and_db
    existing_user = await crud.get_by_email(db, email=user_in.email)
    if existing_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return await crud.create(db, obj_in=user_in)

@router.get("/{user_id}", response_model=UserInDB)
async def read_user(
    user_id: int,
    crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
    crud, db = crud_and_db
    user = await crud.get(db, id=user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@router.put("/{user_id}", response_model=UserInDB)
async def update_user(
    user_id: int,
    user_in: UserUpdate,
    crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
    crud, db = crud_and_db
    user = await crud.get(db, id=user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return await crud.update(db, db_obj=user, obj_in=user_in)

高级封装技巧

事务管理装饰器

from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession

@asynccontextmanager
async def transactional(db: AsyncSession):
    try:
        yield
        await db.commit()
    except Exception as e:
        await db.rollback()
        raise e

# 使用示例
async def create_user_with_transaction(user_in: UserCreate, db: AsyncSession):
    async with transactional(db):
        user = User(**user_in.dict())
        db.add(user)
        await db.flush()  # 获取生成的 ID
        # 其他操作...

分页响应模型

from pydantic.generics import GenericModel
from typing import Generic, TypeVar, Sequence

T = TypeVar("T")

class PaginatedResponse(GenericModel, Generic[T]):
    data: Sequence[T]
    total: int
    skip: int
    limit: int

# 路由中使用
@router.get("/", response_model=PaginatedResponse[UserInDB])
async def read_users(
    skip: int = 0,
    limit: int = 100,
    crud_and_db: tuple[CRUDUser, AsyncSession] = Depends(get_user_crud)
):
    crud, db = crud_and_db
    users = await crud.get_multi(db, skip=skip, limit=limit)
    total = await crud.count(db)
    return {"data": users, "total": total, "skip": skip, "limit": limit}

完整项目结构参考

project/
├── app/
│   ├── crud/
│   │   ├── base.py       # CRUD 基类
│   │   └── user.py       # 用户 CRUD 实现
│   ├── models/
│   │   └── user.py       # SQLAlchemy 模型
│   ├── schemas/
│   │   └── user.py       # Pydantic 模型
│   ├── api/
│   │   ├── deps.py       # 依赖注入
│   │   └── routers/      # 路由文件
│   └── database.py       # 数据库配置
└── main.py               # FastAPI 入口

性能优化建议

批量操作:实现 bulk_create 方法

async def bulk_create(self, db: AsyncSession, objs_in: list[CreateSchemaType]):
    db_objs = [self.model(**obj.dict()) for obj in objs_in]
    db.add_all(db_objs)
    await db.commit()

预加载关联数据:使用 selectinload

from sqlalchemy.orm import selectinload

async def get_with_relations(self, db: AsyncSession, id: int):
    result = await db.execute(
        select(self.model)
        .options(selectinload(self.model.addresses))
        .filter(self.model.id == id)
    )
    return result.scalars().first()

缓存机制:集成 Redis 缓存高频查询结果

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注