bookworm-boot/patches/proxy-v2.js

350 lines
10 KiB
JavaScript
Raw Normal View History

'use strict';
/**
* BYOK 代理 v2 稳定性增强版
*
* 改进点:
* 1. keepAlive 连接池 (复用 TCP 连接, 减少 TLS 握手)
* 2. 可重试的瞬态错误 (ECONNRESET, ETIMEDOUT, 429, 502, 503)
* 3. SSE 心跳 ( 15s 发送 :ping, 防止 Nginx/CDN 超时断连)
* 4. backpressure 处理 (res.write 返回 false 时暂停上游)
* 5. 分离连接超时(15s)和读取超时(180s)
* 6. 详细错误分类与日志
*
* @module src/proxy
*/
const https = require('https');
const http = require('http');
const { URL } = require('url');
// ─── ❶ SSRF 防护base_url 白名单 ───
const ALLOWED_API_HOSTS = new Set([
'api.anthropic.com',
]);
if (process.env.ALLOWED_API_HOSTS) {
for (const h of process.env.ALLOWED_API_HOSTS.split(',')) {
if (h.trim()) ALLOWED_API_HOSTS.add(h.trim());
}
}
function isPrivateHost(hostname) {
// IPv6 mapped IPv4
if (hostname.startsWith('[::ffff:')) {
return isPrivateHost(hostname.slice(8, -1));
}
// IPv6 私有地址段: ULA (fc00::/7), Link-local (fe80::/10), loopback (::1), unspecified (::)
const lower = hostname.replace(/^\[|\]$/g, '').toLowerCase();
if (/^f[cd][0-9a-f]{2}:/.test(lower)) return true; // fc00::/7 (ULA)
if (/^fe[89ab][0-9a-f]:/.test(lower)) return true; // fe80::/10 (link-local)
if (lower === '::1' || lower === '::') return true;
// IPv4 私有地址
const parts = hostname.split('.');
if (parts.length === 4 && parts.every(p => /^\d+$/.test(p))) {
const [a, b] = parts.map(Number);
if (a === 10) return true;
if (a === 172 && b >= 16 && b <= 31) return true;
if (a === 192 && b === 168) return true;
if (a === 127) return true;
if (a === 169 && b === 254) return true;
if (a === 0) return true;
}
return hostname === 'localhost' || hostname === '[::1]';
}
function validateBaseUrl(baseUrl) {
if (!baseUrl) return;
let url;
try {
url = new URL(baseUrl);
} catch {
throw { status: 400, message: 'base_url 格式无效' };
}
if (ALLOWED_API_HOSTS.has(url.hostname)) return;
if (isPrivateHost(url.hostname)) {
throw { status: 403, message: '不允许访问内网地址' };
}
// W10 修复: 非白名单公网地址也拒绝 (防止变成开放代理)
throw { status: 403, message: '不允许的 API 地址,请联系管理员将域名加入白名单' };
}
// ─── ❷ keepAlive 连接池 ───
const httpsAgent = new https.Agent({
keepAlive: true,
maxSockets: 10,
maxFreeSockets: 5,
keepAliveMsecs: 30000,
timeout: 15000,
});
const httpAgent = new http.Agent({
keepAlive: true,
maxSockets: 10,
maxFreeSockets: 5,
keepAliveMsecs: 30000,
timeout: 15000,
});
// ─── ❸ 重试配置 ───
const RETRYABLE_CODES = new Set([429, 502, 503, 504]);
const RETRYABLE_ERRORS = new Set(['ECONNRESET', 'ETIMEDOUT', 'ECONNREFUSED', 'EPIPE', 'EAI_AGAIN', 'UND_ERR_SOCKET']);
const MAX_RETRIES = 2;
const RETRY_BASE_DELAY = 1000; // 1s 指数退避
function isRetryable(err, statusCode) {
if (statusCode && RETRYABLE_CODES.has(statusCode)) return true;
if (err && err.code && RETRYABLE_ERRORS.has(err.code)) return true;
if (err && err.message && /socket hang up|ECONNRESET|ETIMEDOUT/i.test(err.message)) return true;
return false;
}
function retryDelay(attempt) {
// 指数退避 + 抖动: 1s, 2s + random(0-500ms)
return RETRY_BASE_DELAY * Math.pow(2, attempt) + Math.random() * 500;
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
// ─── ❹ SSE 心跳 ───
const SSE_HEARTBEAT_INTERVAL = 15000; // 每 15 秒
function startSSEHeartbeat(res) {
const timer = setInterval(() => {
if (res.writableEnded || res.destroyed) {
clearInterval(timer);
return;
}
try {
res.write(':ping\n\n');
} catch {
clearInterval(timer);
}
}, SSE_HEARTBEAT_INTERVAL);
// 不阻止进程退出
if (timer.unref) timer.unref();
return timer;
}
// ─── BYOK 代理核心 ───
async function proxyChat(opts, res) {
const {
apiKey,
model = 'claude-sonnet-4-5-20250514',
messages,
maxTokens = 8192,
stream = false,
baseUrl,
systemPrompt,
} = opts;
validateBaseUrl(baseUrl);
const base = baseUrl || process.env.ANTHROPIC_BASE_URL || 'https://api.anthropic.com';
const url = new URL('/v1/messages', base);
const isHttps = url.protocol === 'https:';
const body = {
model,
messages,
max_tokens: maxTokens,
stream,
};
if (systemPrompt) body.system = systemPrompt;
const payload = JSON.stringify(body);
const requestOpts = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': apiKey,
'anthropic-version': process.env.ANTHROPIC_API_VERSION || '2023-06-01',
'Content-Length': Buffer.byteLength(payload),
},
agent: isHttps ? httpsAgent : httpAgent,
timeout: 15000, // 连接超时 15s
};
// ─── 流式请求 (不重试, 因为 headers 一旦发送不可回退) ───
if (stream) {
return _proxyChatStream(requestOpts, payload, res, isHttps);
}
// ─── 非流式请求 (支持重试) ───
let lastError = null;
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
const result = await _proxyChatOnce(requestOpts, payload, isHttps);
// 上游返回可重试状态码
if (result.status && RETRYABLE_CODES.has(result.status) && attempt < MAX_RETRIES) {
const delay = result.status === 429
? _parseRetryAfter(result.headers) || retryDelay(attempt)
: retryDelay(attempt);
await sleep(delay);
continue;
}
return result;
} catch (err) {
lastError = err;
if (isRetryable(err) && attempt < MAX_RETRIES) {
await sleep(retryDelay(attempt));
continue;
}
throw err;
}
}
throw lastError;
}
// 解析 Retry-After 头 (秒或日期)
function _parseRetryAfter(headers) {
const val = headers && headers['retry-after'];
if (!val) return null;
const secs = parseInt(val, 10);
if (!isNaN(secs) && secs > 0 && secs < 120) return secs * 1000;
return null;
}
// 单次非流式请求
function _proxyChatOnce(requestOpts, payload, isHttps) {
return new Promise((resolve, reject) => {
const transport = isHttps ? https : http;
const proxyReq = transport.request(requestOpts, (proxyRes) => {
// 连接成功 → 切换为读取超时 180s
proxyReq.setTimeout(180_000);
const chunks = [];
proxyRes.on('data', (chunk) => chunks.push(chunk));
proxyRes.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf8');
try {
resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), headers: proxyRes.headers });
} catch {
resolve({ status: proxyRes.statusCode, data: raw, headers: proxyRes.headers });
}
});
proxyRes.on('error', reject);
});
proxyReq.on('error', reject);
proxyReq.on('timeout', () => {
proxyReq.destroy(new Error('Claude API 连接超时 (15s)'));
});
proxyReq.write(payload);
proxyReq.end();
});
}
// 流式请求 (不重试)
function _proxyChatStream(requestOpts, payload, res, isHttps) {
return new Promise((resolve, reject) => {
const transport = isHttps ? https : http;
const proxyReq = transport.request(requestOpts, (proxyRes) => {
// 连接成功 → 切换为读取超时 300s (流式更长)
proxyReq.setTimeout(300_000);
// 上游返回错误: 不走 SSE, 收集后 JSON 返回
if (proxyRes.statusCode !== 200) {
const chunks = [];
proxyRes.on('data', (chunk) => chunks.push(chunk));
proxyRes.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf8');
try {
resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), streamed: false });
} catch {
resolve({ status: proxyRes.statusCode, data: { error: raw }, streamed: false });
}
});
proxyRes.on('error', reject);
return;
}
// 正常 SSE 透传
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
// 启动心跳
const heartbeatTimer = startSSEHeartbeat(res);
let tokensIn = 0, tokensOut = 0;
let fullText = '';
proxyRes.on('data', (chunk) => {
// backpressure: 如果客户端消费慢, 暂停上游
const canWrite = res.write(chunk);
if (!canWrite) {
proxyRes.pause();
res.once('drain', () => proxyRes.resume());
}
// 提取用量 + 全文
try {
const text = chunk.toString();
for (const line of text.split('\n')) {
if (!line.startsWith('data: ') || line === 'data: [DONE]') continue;
const obj = JSON.parse(line.slice(6));
if (obj.type === 'message_start' && obj.message?.usage) {
tokensIn = obj.message.usage.input_tokens || 0;
} else if (obj.type === 'content_block_delta' && obj.delta?.text) {
fullText += obj.delta.text;
} else if (obj.type === 'message_delta' && obj.usage) {
tokensOut = obj.usage.output_tokens || 0;
}
}
} catch { /* 解析失败不影响透传 */ }
});
proxyRes.on('end', () => {
clearInterval(heartbeatTimer);
res.end();
resolve({ streamed: true, status: 200, usage: { tokensIn, tokensOut }, fullText: fullText || undefined });
});
proxyRes.on('error', (err) => {
clearInterval(heartbeatTimer);
// 尝试发送 SSE 错误事件后关闭
try {
res.write(`data: ${JSON.stringify({ type: 'error', error: { type: 'stream_error', message: err.message } })}\n\n`);
} catch { /* ignore */ }
res.end();
reject(err);
});
// 客户端断开时清理
res.on('close', () => {
clearInterval(heartbeatTimer);
proxyReq.destroy();
});
});
proxyReq.on('error', (err) => {
reject(err);
});
proxyReq.on('timeout', () => {
proxyReq.destroy(new Error('Claude API 连接超时 (15s)'));
});
proxyReq.write(payload);
proxyReq.end();
});
}
module.exports = { proxyChat, validateBaseUrl };