bookworm-smart-assistant/skills/security-expert/references/auth-patterns.md

17 KiB
Raw Permalink Blame History

认证授权实现模式与安全头配置

本文档为安全专家技能的参考资料,涵盖主流认证授权模式的实现细节和安全头配置速查。 代码示例基于 FastAPI (Python) 和 Next.js (TypeScript) 技术栈。


JWT 实现模式

Access/Refresh Token 双令牌机制

核心思想Access Token 短期有效15-30 分钟Refresh Token 长期有效7-30 天。Access Token 用于接口认证Refresh Token 用于获取新的 Access Token。

FastAPI JWT 完整实现

from datetime import datetime, timedelta
from typing import Optional
import jwt
import os
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel

# 配置
JWT_SECRET = os.getenv("JWT_SECRET")
JWT_REFRESH_SECRET = os.getenv("JWT_REFRESH_SECRET")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = timedelta(minutes=30)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)

security = HTTPBearer()

class TokenPair(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

def create_access_token(user_id: int, roles: list[str]) -> str:
    payload = {
        "sub": str(user_id),
        "roles": roles,
        "type": "access",
        "iat": datetime.utcnow(),
        "exp": datetime.utcnow() + ACCESS_TOKEN_EXPIRE,
    }
    return jwt.encode(payload, JWT_SECRET, algorithm=ALGORITHM)

def create_refresh_token(user_id: int) -> str:
    payload = {
        "sub": str(user_id),
        "type": "refresh",
        "iat": datetime.utcnow(),
        "exp": datetime.utcnow() + REFRESH_TOKEN_EXPIRE,
    }
    return jwt.encode(payload, JWT_REFRESH_SECRET, algorithm=ALGORITHM)

def create_token_pair(user_id: int, roles: list[str]) -> TokenPair:
    return TokenPair(
        access_token=create_access_token(user_id, roles),
        refresh_token=create_refresh_token(user_id),
    )

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
    token = credentials.credentials
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM])
        if payload.get("type") != "access":
            raise HTTPException(status_code=401, detail="无效的令牌类型")
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="令牌已过期")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="无效的令牌")

# Token 轮转 — 每次刷新时生成新的 Refresh Token
@app.post("/api/auth/refresh")
async def refresh_token(refresh_token: str):
    try:
        payload = jwt.decode(refresh_token, JWT_REFRESH_SECRET, algorithms=[ALGORITHM])
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=401, detail="无效的令牌类型")

        # 检查黑名单(已撤销的 Refresh Token
        jti = payload.get("jti")
        if jti and await redis.exists(f"revoked_token:{jti}"):
            raise HTTPException(status_code=401, detail="令牌已撤销")

        user_id = int(payload["sub"])
        user = await db.get_user(user_id)
        # 生成新的令牌对Token 轮转)
        return create_token_pair(user.id, user.roles)
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="无效的刷新令牌")

# Token 黑名单 — 使用 Redis 存储已撤销的令牌
async def revoke_token(token: str, token_type: str = "access"):
    secret = JWT_SECRET if token_type == "access" else JWT_REFRESH_SECRET
    payload = jwt.decode(token, secret, algorithms=[ALGORITHM])
    exp = datetime.fromtimestamp(payload["exp"])
    ttl = (exp - datetime.utcnow()).total_seconds()
    if ttl > 0:
        await redis.setex(f"revoked_token:{token}", int(ttl), "1")

Next.js Middleware JWT 验证

// middleware.ts
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
import { jwtVerify } from "jose";

const JWT_SECRET = new TextEncoder().encode(process.env.JWT_SECRET!);

interface JWTPayload {
  sub: string;
  roles: string[];
  type: string;
  exp: number;
}

async function verifyAccessToken(token: string): Promise<JWTPayload | null> {
  try {
    const { payload } = await jwtVerify(token, JWT_SECRET);
    if (payload.type !== "access") return null;
    return payload as unknown as JWTPayload;
  } catch {
    return null;
  }
}

export async function middleware(request: NextRequest) {
  // 公开路由跳过验证
  const publicPaths = ["/login", "/register", "/api/auth/refresh"];
  if (publicPaths.some((p) => request.nextUrl.pathname.startsWith(p))) {
    return NextResponse.next();
  }

  // 从 Cookie 或 Authorization Header 获取令牌
  const token =
    request.cookies.get("access_token")?.value ||
    request.headers.get("Authorization")?.replace("Bearer ", "");

  if (!token) {
    return NextResponse.redirect(new URL("/login", request.url));
  }

  const payload = await verifyAccessToken(token);
  if (!payload) {
    // 令牌无效或过期,尝试刷新
    return NextResponse.redirect(new URL("/login", request.url));
  }

  // 将用户信息注入请求头传递给 API Route
  const response = NextResponse.next();
  response.headers.set("x-user-id", payload.sub);
  response.headers.set("x-user-roles", JSON.stringify(payload.roles));
  return response;
}

export const config = {
  matcher: ["/dashboard/:path*", "/api/protected/:path*", "/admin/:path*"],
};

OAuth 2.0 / OpenID Connect

授权码流程 (Authorization Code Flow)

适用于有后端的 Web 应用,是最安全的 OAuth 流程。

用户 → 应用 → 授权服务器(获取授权码)→ 应用后端(用授权码换令牌)→ 资源服务器

PKCE 扩展 (Proof Key for Code Exchange)

适用于 SPA 和移动应用,防止授权码拦截攻击。

// Next.js — OAuth PKCE 实现
import crypto from "crypto";

function generateCodeVerifier(): string {
  return crypto.randomBytes(32).toString("base64url");
}

function generateCodeChallenge(verifier: string): string {
  return crypto.createHash("sha256").update(verifier).digest("base64url");
}

// 发起授权请求
export async function GET(request: Request) {
  const codeVerifier = generateCodeVerifier();
  const codeChallenge = generateCodeChallenge(codeVerifier);

  // 存储 code_verifier 到 session后续换令牌时需要
  cookies().set("code_verifier", codeVerifier, {
    httpOnly: true,
    secure: true,
    sameSite: "lax",
    maxAge: 600, // 10 分钟
  });

  const authUrl = new URL("https://provider.com/oauth/authorize");
  authUrl.searchParams.set("client_id", process.env.OAUTH_CLIENT_ID!);
  authUrl.searchParams.set("response_type", "code");
  authUrl.searchParams.set("redirect_uri", process.env.OAUTH_REDIRECT_URI!);
  authUrl.searchParams.set("scope", "openid profile email");
  authUrl.searchParams.set("code_challenge", codeChallenge);
  authUrl.searchParams.set("code_challenge_method", "S256");
  authUrl.searchParams.set("state", crypto.randomBytes(16).toString("hex"));

  return Response.redirect(authUrl.toString());
}

常见 OAuth 提供商配置

# FastAPI — Google OAuth 配置示例
OAUTH_PROVIDERS = {
    "google": {
        "client_id": os.getenv("GOOGLE_CLIENT_ID"),
        "client_secret": os.getenv("GOOGLE_CLIENT_SECRET"),
        "authorize_url": "https://accounts.google.com/o/oauth2/v2/auth",
        "token_url": "https://oauth2.googleapis.com/token",
        "userinfo_url": "https://www.googleapis.com/oauth2/v3/userinfo",
        "scopes": ["openid", "profile", "email"],
    },
    "github": {
        "client_id": os.getenv("GITHUB_CLIENT_ID"),
        "client_secret": os.getenv("GITHUB_CLIENT_SECRET"),
        "authorize_url": "https://github.com/login/oauth/authorize",
        "token_url": "https://github.com/login/oauth/access_token",
        "userinfo_url": "https://api.github.com/user",
        "scopes": ["read:user", "user:email"],
    },
}

Session 认证

# FastAPI — 安全 Cookie 设置
from fastapi.responses import JSONResponse

def set_auth_cookie(response: JSONResponse, session_id: str):
    response.set_cookie(
        key="session_id",
        value=session_id,
        httponly=True,     # 禁止 JavaScript 访问(防 XSS 窃取)
        secure=True,       # 仅通过 HTTPS 发送
        samesite="lax",    # 防 CSRFlax 允许顶级导航携带)
        max_age=3600,      # 1 小时过期
        path="/",
        domain=".yourdomain.com",  # 限定域名
    )

Redis Session Store

import secrets
from datetime import timedelta

class RedisSessionStore:
    PREFIX = "session:"
    DEFAULT_TTL = timedelta(hours=1)

    def __init__(self, redis_client):
        self.redis = redis_client

    async def create(self, user_id: int, data: dict) -> str:
        session_id = secrets.token_urlsafe(32)
        session_data = {"user_id": user_id, **data}
        await self.redis.setex(
            f"{self.PREFIX}{session_id}",
            int(self.DEFAULT_TTL.total_seconds()),
            json.dumps(session_data),
        )
        return session_id

    async def get(self, session_id: str) -> dict | None:
        data = await self.redis.get(f"{self.PREFIX}{session_id}")
        if not data:
            return None
        # 滑动过期:每次访问续期
        await self.redis.expire(
            f"{self.PREFIX}{session_id}",
            int(self.DEFAULT_TTL.total_seconds()),
        )
        return json.loads(data)

    async def destroy(self, session_id: str):
        await self.redis.delete(f"{self.PREFIX}{session_id}")

    async def regenerate(self, old_session_id: str) -> str:
        """登录后重新生成 Session ID防 Session 固定攻击)"""
        data = await self.get(old_session_id)
        if not data:
            raise ValueError("Session 不存在")
        await self.destroy(old_session_id)
        return await self.create(data["user_id"], data)

RBAC 实现模式

角色-权限模型

# FastAPI — 完整 RBAC 实现
from enum import Enum
from typing import List
from fastapi import Depends, HTTPException

class Permission(str, Enum):
    USER_READ = "user:read"
    USER_WRITE = "user:write"
    USER_DELETE = "user:delete"
    ORDER_READ = "order:read"
    ORDER_WRITE = "order:write"
    ADMIN_PANEL = "admin:panel"

class Role(str, Enum):
    VIEWER = "viewer"
    EDITOR = "editor"
    ADMIN = "admin"

ROLE_PERMISSIONS: dict[Role, list[Permission]] = {
    Role.VIEWER: [Permission.USER_READ, Permission.ORDER_READ],
    Role.EDITOR: [Permission.USER_READ, Permission.USER_WRITE, Permission.ORDER_READ, Permission.ORDER_WRITE],
    Role.ADMIN: list(Permission),  # 管理员拥有所有权限
}

def require_permission(*permissions: Permission):
    """权限守卫依赖注入"""
    async def checker(current_user: dict = Depends(get_current_user)):
        user_roles = current_user.get("roles", [])
        user_permissions = set()
        for role in user_roles:
            user_permissions.update(ROLE_PERMISSIONS.get(Role(role), []))

        for perm in permissions:
            if perm not in user_permissions:
                raise HTTPException(status_code=403, detail=f"缺少权限: {perm.value}")
        return current_user
    return checker

# 使用示例
@app.get("/api/users")
async def list_users(user=Depends(require_permission(Permission.USER_READ))):
    return await db.get_all_users()

@app.delete("/api/users/{user_id}")
async def delete_user(
    user_id: int,
    user=Depends(require_permission(Permission.USER_DELETE)),
):
    await db.delete_user(user_id)
    return {"message": "用户已删除"}

Next.js 前端路由守卫

// hooks/useAuth.ts
"use client";
import { useEffect } from "react";
import { useRouter } from "next/navigation";

interface AuthOptions {
  requiredRoles?: string[];
  redirectTo?: string;
}

export function useAuth(options: AuthOptions = {}) {
  const { requiredRoles = [], redirectTo = "/login" } = options;
  const router = useRouter();

  useEffect(() => {
    const token = localStorage.getItem("access_token");
    if (!token) {
      router.replace(redirectTo);
      return;
    }

    // 解析 JWT payload仅用于前端路由守卫后端仍需完整验证
    try {
      const payload = JSON.parse(atob(token.split(".")[1]));
      const userRoles: string[] = payload.roles || [];

      if (requiredRoles.length > 0) {
        const hasRequired = requiredRoles.some((r) => userRoles.includes(r));
        if (!hasRequired) {
          router.replace("/unauthorized");
        }
      }
    } catch {
      router.replace(redirectTo);
    }
  }, [requiredRoles, redirectTo, router]);
}

// 页面使用示例
// app/admin/page.tsx
export default function AdminPage() {
  useAuth({ requiredRoles: ["admin"] });

  return <div>管理后台内容</div>;
}

安全头配置速查

Content-Security-Policy (CSP)

控制浏览器允许加载的资源来源,有效防止 XSS。

# FastAPI — CSP 配置
CSP_POLICY = "; ".join([
    "default-src 'self'",
    "script-src 'self' 'nonce-{nonce}'",    # 允许带 nonce 的内联脚本
    "style-src 'self' 'unsafe-inline'",      # 样式通常需要 inline
    "img-src 'self' data: https:",           # 允许 HTTPS 图片
    "font-src 'self'",
    "connect-src 'self' https://api.example.com",
    "frame-ancestors 'none'",                # 禁止被嵌入 iframe
    "base-uri 'self'",
    "form-action 'self'",
])

Strict-Transport-Security (HSTS)

强制浏览器使用 HTTPS 连接。

Strict-Transport-Security: max-age=31536000; includeSubDomains; preload
  • max-age=31536000: 一年内强制 HTTPS
  • includeSubDomains: 子域名也强制 HTTPS
  • preload: 申请加入浏览器 HSTS 预加载列表

X-Frame-Options

防止页面被嵌入 iframeClickjacking 防护)。

X-Frame-Options: DENY          # 完全禁止
X-Frame-Options: SAMEORIGIN    # 仅同源允许

X-Content-Type-Options

防止浏览器 MIME 类型嗅探。

X-Content-Type-Options: nosniff

Permissions-Policy

控制浏览器功能的使用权限。

Permissions-Policy: camera=(), microphone=(), geolocation=(self), payment=()

完整安全头中间件

# FastAPI — 完整安全头中间件
import secrets
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        nonce = secrets.token_urlsafe(16)
        request.state.csp_nonce = nonce

        response = await call_next(request)

        # CSP
        response.headers["Content-Security-Policy"] = (
            f"default-src 'self'; "
            f"script-src 'self' 'nonce-{nonce}'; "
            f"style-src 'self' 'unsafe-inline'; "
            f"img-src 'self' data: https:; "
            f"font-src 'self'; "
            f"frame-ancestors 'none'; "
            f"base-uri 'self'"
        )
        # HSTS
        response.headers["Strict-Transport-Security"] = (
            "max-age=31536000; includeSubDomains; preload"
        )
        # 其他安全头
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        response.headers["Permissions-Policy"] = (
            "camera=(), microphone=(), geolocation=(self), payment=()"
        )
        # 移除信息泄露头
        response.headers.pop("Server", None)
        response.headers.pop("X-Powered-By", None)

        return response

app.add_middleware(SecurityHeadersMiddleware)
// Next.js — next.config.js 安全头配置
const securityHeaders = [
  { key: "X-Content-Type-Options", value: "nosniff" },
  { key: "X-Frame-Options", value: "DENY" },
  { key: "Strict-Transport-Security", value: "max-age=31536000; includeSubDomains; preload" },
  { key: "Referrer-Policy", value: "strict-origin-when-cross-origin" },
  { key: "Permissions-Policy", value: "camera=(), microphone=(), geolocation=(self)" },
  {
    key: "Content-Security-Policy",
    value: "default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self'; frame-ancestors 'none'",
  },
];

module.exports = {
  async headers() {
    return [{ source: "/:path*", headers: securityHeaders }];
  },
};

安全头速查表

安全头 作用 推荐值
Content-Security-Policy 防 XSS控制资源加载 default-src 'self' + 按需放宽
Strict-Transport-Security 强制 HTTPS max-age=31536000; includeSubDomains
X-Frame-Options 防 Clickjacking DENY
X-Content-Type-Options 防 MIME 嗅探 nosniff
Referrer-Policy 控制 Referer 泄露 strict-origin-when-cross-origin
Permissions-Policy 限制浏览器 API 按需配置,默认关闭摄像头/麦克风