22 KiB
22 KiB
OWASP Top 10 (2021) 防护指南
本文档为安全专家技能的参考资料,涵盖 OWASP Top 10 每一项的风险描述、攻击示例、防护代码和检查清单。 代码示例基于 FastAPI (Python) 和 Next.js (TypeScript) 技术栈。
A01: 访问控制失效 (Broken Access Control)
风险描述
访问控制确保用户只能在其权限范围内操作。失效的访问控制允许攻击者越权访问、修改或删除数据。常见问题包括:IDOR(不安全的直接对象引用)、越权访问、路径遍历、CORS 配置错误。
攻击示例
# IDOR 攻击:修改 URL 中的 ID 访问他人数据
GET /api/users/1001/profile → GET /api/users/1002/profile
# 越权操作:普通用户访问管理接口
POST /api/admin/delete-user
FastAPI 防护代码
from fastapi import Depends, HTTPException, status
# 资源级别权限校验 — 防止 IDOR
async def get_order(order_id: int, current_user: User = Depends(get_current_user)):
order = await db.get(Order, order_id)
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
# 关键:校验资源归属
if order.user_id != current_user.id and not current_user.has_role("admin"):
raise HTTPException(status_code=403, detail="无权访问此资源")
return order
# 基于装饰器的权限守卫
from functools import wraps
def require_roles(*roles: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, current_user: User = Depends(get_current_user), **kwargs):
if not any(role in current_user.roles for role in roles):
raise HTTPException(status_code=403, detail="权限不足")
return await func(*args, current_user=current_user, **kwargs)
return wrapper
return decorator
@app.delete("/api/admin/users/{user_id}")
@require_roles("admin", "super_admin")
async def delete_user(user_id: int, current_user: User = Depends(get_current_user)):
await db.delete(User, user_id)
return {"message": "用户已删除"}
Next.js 防护代码
// middleware.ts — 路由级别访问控制
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
import { verifyToken } from "@/lib/auth";
const PROTECTED_ROUTES: Record<string, string[]> = {
"/admin": ["admin", "super_admin"],
"/dashboard": ["admin", "user"],
};
export async function middleware(request: NextRequest) {
const token = request.cookies.get("access_token")?.value;
if (!token) {
return NextResponse.redirect(new URL("/login", request.url));
}
const payload = await verifyToken(token);
if (!payload) {
return NextResponse.redirect(new URL("/login", request.url));
}
// 检查路由权限
for (const [path, roles] of Object.entries(PROTECTED_ROUTES)) {
if (request.nextUrl.pathname.startsWith(path)) {
if (!roles.includes(payload.role)) {
return NextResponse.redirect(new URL("/unauthorized", request.url));
}
}
}
return NextResponse.next();
}
检查清单
- 每个 API 端点都有认证和授权检查
- 资源访问时校验所有者身份(防止 IDOR)
- 默认拒绝策略:未明确授权的请求一律拒绝
- CORS 仅允许受信任的来源
- 目录列表已禁用,
.git/.env等文件不可访问
A02: 加密失效 (Cryptographic Failures)
风险描述
敏感数据(密码、信用卡号、个人信息)未加密或使用弱加密算法。包括:明文传输、弱哈希算法(MD5/SHA1)、密钥硬编码、缺少 TLS。
攻击示例
# 错误:使用 MD5 存储密码
import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest() # 极易被彩虹表破解
# 错误:密钥硬编码
SECRET_KEY = "my-secret-key-123" # 泄露到版本控制
FastAPI 防护代码
# 密码哈希 — 推荐 Argon2
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(
time_cost=3, # 迭代次数
memory_cost=65536, # 内存消耗 64MB
parallelism=4, # 并行线程
hash_len=32, # 哈希长度
salt_len=16, # 盐长度
)
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password: str, hashed: str) -> bool:
try:
return ph.verify(hashed, password)
except VerifyMismatchError:
return False
# 数据加密 — AES-256-GCM
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
def encrypt_sensitive_data(plaintext: bytes, key: bytes) -> bytes:
"""使用 AES-256-GCM 加密敏感数据"""
nonce = os.urandom(12) # 96-bit nonce
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return nonce + ciphertext # nonce 拼接密文
def decrypt_sensitive_data(data: bytes, key: bytes) -> bytes:
nonce = data[:12]
ciphertext = data[12:]
aesgcm = AESGCM(key)
return aesgcm.decrypt(nonce, ciphertext, None)
# TLS 配置 — 生产环境使用 TLS 1.3
# uvicorn main:app --ssl-keyfile=key.pem --ssl-certfile=cert.pem --ssl-version=TLSv1.3
检查清单
- 密码使用 Argon2 或 bcrypt 哈希存储
- 敏感数据使用 AES-256-GCM 加密
- 传输层强制 TLS 1.2+,推荐 TLS 1.3
- 密钥通过环境变量或密钥管理服务注入,不硬编码
- 禁用 MD5、SHA1 等弱哈希算法
A03: 注入 (Injection)
风险描述
攻击者将恶意数据作为命令或查询的一部分发送。常见类型包括 SQL 注入、NoSQL 注入、OS Command 注入、LDAP 注入。
攻击示例
# SQL 注入
query = f"SELECT * FROM users WHERE name = '{user_input}'"
# 输入: ' OR '1'='1' -- → 返回所有用户
# OS Command 注入
os.system(f"ping {user_input}")
# 输入: 127.0.0.1; rm -rf / → 执行恶意命令
FastAPI 防护代码
# SQL 注入防护 — 使用 ORM(SQLAlchemy)
from sqlalchemy.orm import Session
from sqlalchemy import text
# 正确:使用 ORM 查询
def get_user_by_name(db: Session, name: str):
return db.query(User).filter(User.name == name).first()
# 正确:使用参数化查询
def search_users(db: Session, keyword: str):
stmt = text("SELECT * FROM users WHERE name LIKE :keyword")
return db.execute(stmt, {"keyword": f"%{keyword}%"}).fetchall()
# 错误示范(绝不使用):
# db.execute(f"SELECT * FROM users WHERE name = '{name}'")
# OS Command 注入防护 — 使用 subprocess 参数列表
import subprocess
import shlex
def safe_ping(host: str):
# 输入验证
import re
if not re.match(r'^[a-zA-Z0-9.\-]+$', host):
raise ValueError("无效的主机名")
# 使用参数列表而非字符串拼接
result = subprocess.run(
["ping", "-c", "4", host],
capture_output=True, text=True, timeout=10
)
return result.stdout
# NoSQL 注入防护 — MongoDB
from bson import ObjectId
async def get_user(user_id: str):
# 验证 ObjectId 格式
if not ObjectId.is_valid(user_id):
raise HTTPException(status_code=400, detail="无效的用户 ID")
return await collection.find_one({"_id": ObjectId(user_id)})
# 输入验证 — Pydantic 模型
from pydantic import BaseModel, validator
import re
class UserSearch(BaseModel):
keyword: str
@validator("keyword")
def sanitize_keyword(cls, v):
if not re.match(r'^[\w\s\u4e00-\u9fff]+$', v):
raise ValueError("搜索关键词包含非法字符")
if len(v) > 100:
raise ValueError("搜索关键词过长")
return v.strip()
Next.js 防护代码
// API Route 输入验证 — 使用 zod
import { z } from "zod";
const searchSchema = z.object({
keyword: z.string().min(1).max(100).regex(/^[\w\s\u4e00-\u9fff]+$/),
page: z.number().int().positive().default(1),
});
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const result = searchSchema.safeParse({
keyword: searchParams.get("keyword"),
page: Number(searchParams.get("page")),
});
if (!result.success) {
return Response.json({ error: "参数验证失败" }, { status: 400 });
}
// 使用 Prisma ORM 查询(自动防 SQL 注入)
const users = await prisma.user.findMany({
where: { name: { contains: result.data.keyword } },
skip: (result.data.page - 1) * 20,
take: 20,
});
return Response.json(users);
}
检查清单
- 所有 SQL 使用 ORM 或参数化查询
- 系统命令使用 subprocess 参数列表,禁止字符串拼接
- 所有用户输入使用 Pydantic/zod 校验
- NoSQL 查询验证数据类型和格式
- HTML 输出自动转义(React/Jinja2 默认行为)
A04: 不安全设计 (Insecure Design)
风险描述
设计层面的安全缺陷,无法通过代码修补解决。包括缺少威胁建模、业务逻辑漏洞、缺乏速率限制。
防护策略
# 速率限制 — 使用 slowapi
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/auth/login")
@limiter.limit("5/minute") # 每分钟最多 5 次登录尝试
async def login(request: Request, credentials: LoginForm):
# 登录逻辑
pass
# 业务逻辑保护 — 幂等性检查
from fastapi import Header
@app.post("/api/orders")
async def create_order(
order: OrderCreate,
idempotency_key: str = Header(...),
current_user: User = Depends(get_current_user),
):
# 幂等性检查:防止重复提交
existing = await db.get_order_by_idempotency_key(idempotency_key)
if existing:
return existing
return await db.create_order(order, current_user.id, idempotency_key)
检查清单
- 关键业务流程已进行威胁建模(STRIDE)
- 敏感操作有速率限制
- 支付/转账等操作有幂等性保护
- 业务规则在服务端验证,不依赖前端
A05: 安全配置错误 (Security Misconfiguration)
风险描述
默认配置、不完整配置、开放的云存储、不必要的 HTTP 头、详细的错误信息泄露敏感信息。
FastAPI 防护代码
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
docs_url=None if IS_PRODUCTION else "/docs", # 生产环境禁用文档
redoc_url=None if IS_PRODUCTION else "/redoc",
openapi_url=None if IS_PRODUCTION else "/openapi.json",
)
# CORS 严格配置
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-domain.com"], # 不使用 ["*"]
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
# 全局异常处理 — 隐藏内部错误细节
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
logger.error(f"未处理异常: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "服务器内部错误"}, # 不暴露堆栈信息
)
加固清单
- 生产环境禁用调试模式和 API 文档
- CORS 配置白名单模式,不使用通配符
- 异常处理器隐藏内部错误详情
- 移除 Server/X-Powered-By 等信息泄露头
- 文件上传限制大小和类型
- 禁用不必要的 HTTP 方法(TRACE、OPTIONS)
A06: 易受攻击和过时组件 (Vulnerable and Outdated Components)
风险描述
使用存在已知漏洞的库、框架或组件。依赖链中的任何环节都可能引入风险。
依赖扫描工具
# Python 依赖扫描
pip install pip-audit
pip-audit # 扫描已安装的包
pip-audit -r requirements.txt # 扫描依赖文件
# 使用 safety 扫描
pip install safety
safety check
# Node.js 依赖扫描
npm audit # 内置审计
npm audit fix # 自动修复
npx audit-ci --moderate # CI 集成,中等以上阻断
# pnpm 依赖扫描
pnpm audit
pnpm audit --fix
# 通用工具 — Trivy
trivy fs . # 扫描项目目录
trivy image myapp:latest # 扫描 Docker 镜像
CI/CD 集成
# GitHub Actions — 依赖扫描
- name: Python 依赖审计
run: |
pip install pip-audit
pip-audit -r requirements.txt --strict
- name: Node.js 依赖审计
run: pnpm audit --audit-level=moderate
检查清单
- CI/CD 流水线集成依赖扫描
- 定期更新依赖版本(至少每月一次)
- 使用 Dependabot / Renovate 自动化依赖更新
- 监控 CVE 公告(NVD、GitHub Advisory)
A07: 身份识别和认证失败 (Identification and Authentication Failures)
风险描述
弱密码策略、缺乏暴力破解保护、未实现 MFA、Session 固定攻击。
FastAPI 防护代码
# 密码复杂度验证
import re
from pydantic import BaseModel, validator
class PasswordPolicy(BaseModel):
password: str
@validator("password")
def validate_strength(cls, v):
if len(v) < 12:
raise ValueError("密码长度至少 12 位")
if not re.search(r'[A-Z]', v):
raise ValueError("需要至少一个大写字母")
if not re.search(r'[a-z]', v):
raise ValueError("需要至少一个小写字母")
if not re.search(r'\d', v):
raise ValueError("需要至少一个数字")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError("需要至少一个特殊字符")
return v
# 暴力破解防护 — 账户锁定
from datetime import datetime, timedelta
class LoginAttemptTracker:
MAX_ATTEMPTS = 5
LOCKOUT_DURATION = timedelta(minutes=15)
async def check_and_record(self, username: str, success: bool):
key = f"login_attempts:{username}"
attempts = await redis.get(key)
if attempts and int(attempts) >= self.MAX_ATTEMPTS:
ttl = await redis.ttl(key)
raise HTTPException(
status_code=429,
detail=f"账户已锁定,请在 {ttl} 秒后重试"
)
if not success:
await redis.incr(key)
await redis.expire(key, int(self.LOCKOUT_DURATION.total_seconds()))
else:
await redis.delete(key)
# TOTP 双因素认证
import pyotp
def generate_totp_secret() -> str:
return pyotp.random_base32()
def verify_totp(secret: str, code: str) -> bool:
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1) # 允许前后 30 秒偏移
检查清单
- 密码策略:最少 12 位,含大小写、数字、特殊字符
- 暴力破解防护:5 次失败后锁定 15 分钟
- 敏感操作启用 MFA(TOTP/WebAuthn)
- Session ID 在登录后重新生成(防 Session 固定)
- 使用安全的密码重置流程(带过期的一次性令牌)
A08: 软件和数据完整性失败 (Software and Data Integrity Failures)
风险描述
软件更新、CI/CD 管道和反序列化缺乏完整性验证。供应链攻击正在成为主要威胁。
防护策略
# npm 锁文件完整性 — 使用 --frozen-lockfile
pnpm install --frozen-lockfile # CI 中确保使用锁文件
# Python 依赖哈希校验
pip install --require-hashes -r requirements.txt
# requirements.txt 带哈希
# flask==3.0.0 --hash=sha256:xxxx
# 反序列化安全 — 禁止 pickle 反序列化不可信数据
import json
# 正确:使用 JSON
data = json.loads(user_input)
# 错误:不要对不可信数据使用 pickle
# import pickle
# data = pickle.loads(user_input) # 远程代码执行风险!
# Webhook 签名验证
import hmac
import hashlib
def verify_webhook_signature(payload: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(
secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
检查清单
- CI/CD 使用锁文件和哈希校验
- 禁止反序列化不可信数据(pickle/yaml.load)
- Webhook 回调验证签名
- Docker 镜像使用固定摘要而非 latest 标签
- 代码签名和发布流程有完整性校验
A09: 安全日志和监控失败 (Security Logging and Monitoring Failures)
风险描述
缺乏充分的日志记录和监控使攻击难以被检测。平均攻击检测时间超过 200 天。
FastAPI 安全日志
import logging
import structlog
from datetime import datetime
# 结构化安全日志
security_logger = structlog.get_logger("security")
# 记录认证事件
async def log_auth_event(event_type: str, username: str, ip: str, success: bool, detail: str = ""):
security_logger.info(
"auth_event",
event_type=event_type,
username=username,
ip_address=ip,
success=success,
detail=detail,
timestamp=datetime.utcnow().isoformat(),
)
# 记录访问控制事件
async def log_access_event(user_id: int, resource: str, action: str, allowed: bool):
security_logger.info(
"access_event",
user_id=user_id,
resource=resource,
action=action,
allowed=allowed,
timestamp=datetime.utcnow().isoformat(),
)
# 关键:不要在日志中记录敏感数据
# 错误:logger.info(f"用户登录: password={password}")
# 正确:logger.info(f"用户登录: username={username}")
告警规则示例
# Prometheus 告警规则
groups:
- name: security_alerts
rules:
- alert: BruteForceDetected
expr: rate(auth_failures_total[5m]) > 10
for: 1m
annotations:
summary: "检测到暴力破解攻击"
- alert: UnauthorizedAccessSpike
expr: rate(http_responses_total{status="403"}[5m]) > 20
for: 2m
annotations:
summary: "403 响应异常增多"
检查清单
- 记录所有认证事件(登录、登出、失败)
- 记录访问控制失败事件
- 日志中不包含密码、令牌等敏感信息
- 日志集中存储且防篡改
- 配置告警规则:暴力破解、异常访问模式
A10: 服务器端请求伪造 (Server-Side Request Forgery, SSRF)
风险描述
攻击者让服务器发起非预期的请求,访问内部服务、云元数据端点或内网资源。
攻击示例
# 访问云实例元数据
POST /api/fetch-url
{"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/"}
# 访问内网服务
{"url": "http://192.168.1.100:6379/"} # 直接访问内网 Redis
FastAPI 防护代码
import ipaddress
from urllib.parse import urlparse
import socket
ALLOWED_SCHEMES = {"http", "https"}
BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"), # 云元数据
ipaddress.ip_network("0.0.0.0/8"),
]
def validate_url(url: str) -> str:
"""验证 URL 安全性,防止 SSRF"""
parsed = urlparse(url)
# 协议白名单
if parsed.scheme not in ALLOWED_SCHEMES:
raise ValueError(f"不允许的协议: {parsed.scheme}")
# 域名白名单(推荐)
ALLOWED_DOMAINS = ["api.example.com", "cdn.example.com"]
if parsed.hostname not in ALLOWED_DOMAINS:
raise ValueError(f"不允许的域名: {parsed.hostname}")
# 解析 IP 并检查是否为内网地址
try:
ip = ipaddress.ip_address(socket.gethostbyname(parsed.hostname))
for network in BLOCKED_NETWORKS:
if ip in network:
raise ValueError(f"不允许访问内网地址: {ip}")
except socket.gaierror:
raise ValueError(f"无法解析域名: {parsed.hostname}")
return url
@app.post("/api/fetch-url")
async def fetch_external_url(url: str):
safe_url = validate_url(url)
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(safe_url, follow_redirects=False)
return {"status": response.status_code, "body": response.text[:1000]}
检查清单
- 用户提供的 URL 进行协议白名单检查
- DNS 解析结果检查,阻止内网 IP
- 使用域名白名单而非黑名单
- 禁止 HTTP 重定向跟踪(或重定向后再次验证)
- 云环境中使用 IMDSv2 或禁用实例元数据
快速参考表
| 编号 | 名称 | 核心防护 |
|---|---|---|
| A01 | 访问控制失效 | RBAC + 资源归属校验 + 默认拒绝 |
| A02 | 加密失效 | Argon2 + AES-256-GCM + TLS 1.3 |
| A03 | 注入 | ORM + 参数化查询 + 输入验证 |
| A04 | 不安全设计 | 威胁建模 + 速率限制 + 幂等性 |
| A05 | 安全配置错误 | 生产加固 + CORS 白名单 + 错误隐藏 |
| A06 | 易受攻击组件 | pip-audit + npm audit + CI 集成 |
| A07 | 认证失败 | 强密码策略 + 账户锁定 + MFA |
| A08 | 数据完整性失败 | 锁文件 + 签名验证 + 禁止 pickle |
| A09 | 日志监控不足 | 结构化日志 + 告警规则 + 集中存储 |
| A10 | SSRF | URL 白名单 + IP 检查 + 禁止重定向 |