术→技巧, 研发

FastAPI学习之权限管理

钱魏Way · · 80 次浏览

常见权限管理模式

权限管理是系统安全的核心组件,不同场景需适配不同模式。以下是 7 种常见方案及其适用场景、实现示例和选型指南:

RBAC(基于角色的访问控制)

  • 原理:用户关联角色 → 角色关联权限
  • 层级结构:用户 → 角色 → 权限
  • 适用场景
    • 企业后台管理系统(如OA、CRM)
    • 权限层级固定的标准化系统

FastAPI 示例:

from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer

# 模拟权限数据
ROLES = {
    "admin": ["create", "read", "update", "delete"],
    "user": ["read"]
}

def check_permission(current_user: User, required_permission: str):
    user_role = current_user.role
    if required_permission not in ROLES.get(user_role, []):
        raise HTTPException(403, "权限不足")

@app.post("/articles/")
async def create_article(user: User = Depends(get_current_user)):
    check_permission(user, "create")
    return {"message": "创建成功"}

ABAC(基于属性的访问控制)

  • 原理:通过属性规则动态决策(用户属性、资源属性、环境等)
  • 决策公式:IF (条件) THEN 允许/拒绝
  • 适用场景
    • 动态权限需求(如文档协作编辑权限)
    • 跨多维度条件判断(时间、位置、设备)

实现示例:

class AccessPolicy:
    def can_view_document(self, user, document):
        # 规则1:文档所有者可查看
        # 规则2:团队文档且在上班时间允许访问
        return (user.id == document.owner_id) or \
               (document.team_id in user.teams and 9 <= datetime.now().hour < 18)

@app.get("/documents/{doc_id}")
async def get_document(doc_id: str, user: User = Depends(get_current_user)):
    doc = get_document_by_id(doc_id)
    if not AccessPolicy().can_view_document(user, doc):
        raise HTTPException(403)
    return doc

ACL(访问控制列表)

  • 原理:直接为资源绑定允许/拒绝的实体列表
  • 数据结构:资源: [(用户/角色, 操作)]
  • 适用场景
    • 文件系统权限管理
    • 社交平台内容可见性控制
  • 数据库设计
资源类型 资源ID 主体类型 主体ID 操作
file 123 user 456 read
post 789 group 101 comment

PBAC(基于策略的访问控制)

  • 原理:集中管理策略规则库,解耦业务代码
  • 组件
    • 策略管理界面
    • 策略决策点(PDP)
    • 策略执行点(PEP)
  • 适用场景
    • 多系统统一权限管理
    • 合规要求严格(如金融、医疗)
  • 工具推荐
    • Open Policy Agent (OPA)
    • AWS IAM Policies

MAC(强制访问控制)

  • 原理:系统强制分级安全标签(如密级:公开→秘密→机密)
  • 模型
    • Bell-LaPadula 模型(防止上读下写)
    • Biba 模型(防止下读上写)
  • 适用场景
    • 政府/军事信息系统
    • 数据敏感性分级明确场景

DAC(自主访问控制)

  • 原理:资源所有者自主授权
  • 特点
    • Unix文件权限模式(ugo+rwe)
    • 授权易扩散(A可分享给B,B可再分享)
  • 风险:易出现权限泛滥问题

常见权限模式对比

模式 优点 缺点 适用场景
RBAC 简单易管理 不够灵活 固定角色系统
ABAC 高度灵活 实现复杂 动态权限需求
ACL 精确控制 维护成本高 资源级权限控制
PBAC 结合策略和属性 需要策略引擎支持 企业级复杂系统

权限模式的选择

选型决策树

  • 是否需要动态条件判断?
    • 是 → ABAC 或 PBAC
    • 否 → RBAC
  • 是否需精细到单个资源控制?
    • 是 → ACL 或 ABAC
    • 否 → RBAC
  • 是否需要集中策略管理?
    • 是 → PBAC(如OPA)
    • 否 → 原生实现
  • 是否涉及多级安全标签?
    • 是 → MAC
    • 否 → 其他方案

常见组合

  • RBAC + ABAC:角色为基础,属性做细化
  • ACL + RBAC:角色分配基础权限,资源单独设置白名单

各语言生态工具

模式 Python方案 Java方案 Node.js方案
RBAC FastAPI Depends Spring Security CASL
ABAC PyABAC Apache Shiro AccessControl
PBAC OPA Python SDK Keycloak OPA Node Adapter

总结

  • 中小系统:RBAC + 资源级ACL
  • 复杂企业级:ABAC + PBAC
  • 高安全需求:MAC + RBAC混合
  • 快速实现:直接使用框架内置方案(如FastAPI角色依赖)

根据业务扩展性需求选择,避免过早优化,保留策略升级路径。

权限实现最佳实践

  • 最小权限原则:默认拒绝,按需授予
  • 权限继承:实现角色/组的层级权限传递
  • 审计追踪:记录权限变更和访问日志
  • 定期清理:回收僵尸账户和过期权限
  • 防御性编程:即使前端隐藏按钮,后端仍需验证

FastAPI的权限管理

FastAPI本身没有内置的权限系统,但可以通过依赖注入系统来实现。常见的做法是使用OAuth2和JWT进行认证,然后在依赖项中检查权限。

核心实现步骤

定义权限模型

from enum import Enum

class Permission(str, Enum):
    USER_READ = "user:read"
    USER_WRITE = "user:write"
    ADMIN_ACCESS = "admin:all"

用户模型集成权限

from pydantic import BaseModel
from typing import List

class User(BaseModel):
    username: str
    permissions: List[Permission] = []

创建权限依赖项

from fastapi import Depends, HTTPException, status

def require_permission(required_perm: Permission):
    async def permission_checker(
        current_user: User = Depends(get_current_user)
    ):
        if required_perm not in current_user.permissions:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="权限不足"
            )
        return current_user
    return Depends(permission_checker)

权限验证实现

路由级权限控制

@app.get("/admin/dashboard", dependencies=[require_permission(Permission.ADMIN_ACCESS)])
async def admin_dashboard():
    return {"message": "管理员面板"}

方法级权限控制

@app.put("/users/{user_id}")
async def update_user(
    user_id: str,
    user_data: UserUpdate,
    _: User = Depends(require_permission(Permission.USER_WRITE))
):
    # 更新用户逻辑
    return update_user_in_db(user_id, user_data)

高级权限管理方案

基于角色的访问控制(RBAC

from enum import Enum

class Role(str, Enum):
    GUEST = "guest"
    USER = "user"
    ADMIN = "admin"

ROLE_PERMISSIONS = {
    Role.GUEST: [Permission.USER_READ],
    Role.USER: [Permission.USER_READ, Permission.USER_WRITE],
    Role.ADMIN: list(Permission)
}

def require_role(required_role: Role):
    async def role_checker(
        current_user: User = Depends(get_current_user)
    ):
        if required_role not in current_user.roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="角色权限不足"
            )
        return current_user
    return Depends(role_checker)

基于属性的访问控制(ABAC

def check_resource_ownership(
    resource_id: str,
    current_user: User = Depends(get_current_user)
):
    resource = get_resource(resource_id)
    if resource.owner != current_user.username:
        raise HTTPException(status_code=403, detail="无权操作该资源")
    return resource

数据库集成示例(SQLAlchemy

用户模型定义

from sqlalchemy import Column, String, ARRAY, Enum as SQLEnum

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True)
    permissions = Column(ARRAY(SQLEnum(Permission)))

权限查询

async def get_user_permissions(username: str) -> List[Permission]:
    user = await db.execute(
        select(User).where(User.username == username)
    )
    return user.scalar().permissions

安全增强措施

权限缓存优化

from fastapi_cache.decorator import cache

@cache(expire=300)
async def get_cached_permissions(user_id: str):
    return await get_user_permissions(user_id)

权限变更实时生效

# 当权限变更时发送事件
@app.post("/update-permissions")
async def update_permissions(
    user_id: str,
    new_perms: List[Permission],
    _: User = Depends(require_permission(Permission.ADMIN_ACCESS))
):
    update_db_permissions(user_id, new_perms)
    await redis.publish(f"perm_update:{user_id}", json.dumps(new_perms))

测试策略

单元测试示例

from fastapi.testclient import TestClient

def test_admin_access():
    client = TestClient(app)
    # 使用管理员令牌测试
    response = client.get(
        "/admin/dashboard",
        headers={"Authorization": "Bearer admin_token"}
    )
    assert response.status_code == 200

def test_unauthorized_access():
    client = TestClient(app)
    response = client.get("/admin/dashboard")
    assert response.status_code == 401

最佳实践

  • 最小权限原则:只授予必要权限
  • 定期审计:检查权限分配合理性
  • 日志记录:记录所有权限变更和访问尝试
  • 多因素认证:敏感操作增加二次验证
  • 权限分层:
class PermissionTier(IntEnum):
    TIER1 = 1  # 基础权限
    TIER2 = 2  # 敏感操作
    TIER3 = 3  # 管理权限

通过以上方案,可以在 FastAPI 中实现从简单到复杂的权限管理系统。建议根据业务规模选择合适模式,小型项目使用 RBAC,中大型项目推荐 ABAC 或 PBAC。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注