'use strict'; /** * BYOK 代理 v2 — 稳定性增强版 * * 改进点: * 1. keepAlive 连接池 (复用 TCP 连接, 减少 TLS 握手) * 2. 可重试的瞬态错误 (ECONNRESET, ETIMEDOUT, 429, 502, 503) * 3. SSE 心跳 (每 15s 发送 :ping, 防止 Nginx/CDN 超时断连) * 4. backpressure 处理 (res.write 返回 false 时暂停上游) * 5. 分离连接超时(15s)和读取超时(180s) * 6. 详细错误分类与日志 * * @module src/proxy */ const https = require('https'); const http = require('http'); const { URL } = require('url'); // ─── ❶ SSRF 防护:base_url 白名单 ─── const ALLOWED_API_HOSTS = new Set([ 'api.anthropic.com', ]); if (process.env.ALLOWED_API_HOSTS) { for (const h of process.env.ALLOWED_API_HOSTS.split(',')) { if (h.trim()) ALLOWED_API_HOSTS.add(h.trim()); } } function isPrivateHost(hostname) { // IPv6 mapped IPv4 if (hostname.startsWith('[::ffff:')) { return isPrivateHost(hostname.slice(8, -1)); } // IPv6 私有地址段: ULA (fc00::/7), Link-local (fe80::/10), loopback (::1), unspecified (::) const lower = hostname.replace(/^\[|\]$/g, '').toLowerCase(); if (/^f[cd][0-9a-f]{2}:/.test(lower)) return true; // fc00::/7 (ULA) if (/^fe[89ab][0-9a-f]:/.test(lower)) return true; // fe80::/10 (link-local) if (lower === '::1' || lower === '::') return true; // IPv4 私有地址 const parts = hostname.split('.'); if (parts.length === 4 && parts.every(p => /^\d+$/.test(p))) { const [a, b] = parts.map(Number); if (a === 10) return true; if (a === 172 && b >= 16 && b <= 31) return true; if (a === 192 && b === 168) return true; if (a === 127) return true; if (a === 169 && b === 254) return true; if (a === 0) return true; } return hostname === 'localhost' || hostname === '[::1]'; } function validateBaseUrl(baseUrl) { if (!baseUrl) return; let url; try { url = new URL(baseUrl); } catch { throw { status: 400, message: 'base_url 格式无效' }; } if (ALLOWED_API_HOSTS.has(url.hostname)) return; if (isPrivateHost(url.hostname)) { throw { status: 403, message: '不允许访问内网地址' }; } // W10 修复: 非白名单公网地址也拒绝 (防止变成开放代理) throw { status: 403, message: '不允许的 API 地址,请联系管理员将域名加入白名单' }; } // ─── ❷ keepAlive 连接池 ─── const httpsAgent = new https.Agent({ keepAlive: true, maxSockets: 10, maxFreeSockets: 5, keepAliveMsecs: 30000, timeout: 15000, }); const httpAgent = new http.Agent({ keepAlive: true, maxSockets: 10, maxFreeSockets: 5, keepAliveMsecs: 30000, timeout: 15000, }); // ─── ❸ 重试配置 ─── const RETRYABLE_CODES = new Set([429, 502, 503, 504]); const RETRYABLE_ERRORS = new Set(['ECONNRESET', 'ETIMEDOUT', 'ECONNREFUSED', 'EPIPE', 'EAI_AGAIN', 'UND_ERR_SOCKET']); const MAX_RETRIES = 2; const RETRY_BASE_DELAY = 1000; // 1s 指数退避 function isRetryable(err, statusCode) { if (statusCode && RETRYABLE_CODES.has(statusCode)) return true; if (err && err.code && RETRYABLE_ERRORS.has(err.code)) return true; if (err && err.message && /socket hang up|ECONNRESET|ETIMEDOUT/i.test(err.message)) return true; return false; } function retryDelay(attempt) { // 指数退避 + 抖动: 1s, 2s + random(0-500ms) return RETRY_BASE_DELAY * Math.pow(2, attempt) + Math.random() * 500; } function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } // ─── ❹ SSE 心跳 ─── const SSE_HEARTBEAT_INTERVAL = 15000; // 每 15 秒 function startSSEHeartbeat(res) { const timer = setInterval(() => { if (res.writableEnded || res.destroyed) { clearInterval(timer); return; } try { res.write(':ping\n\n'); } catch { clearInterval(timer); } }, SSE_HEARTBEAT_INTERVAL); // 不阻止进程退出 if (timer.unref) timer.unref(); return timer; } // ─── BYOK 代理核心 ─── async function proxyChat(opts, res) { const { apiKey, model = 'claude-opus-4-7', messages, maxTokens = 8192, stream = false, baseUrl, systemPrompt, } = opts; validateBaseUrl(baseUrl); const base = baseUrl || process.env.ANTHROPIC_BASE_URL || 'https://api.anthropic.com'; const url = new URL('/v1/messages', base); const isHttps = url.protocol === 'https:'; const body = { model, messages, max_tokens: maxTokens, stream, }; if (systemPrompt) body.system = systemPrompt; const payload = JSON.stringify(body); const requestOpts = { hostname: url.hostname, port: url.port || (isHttps ? 443 : 80), path: url.pathname, method: 'POST', headers: { 'Content-Type': 'application/json', 'x-api-key': apiKey, 'anthropic-version': process.env.ANTHROPIC_API_VERSION || '2023-06-01', 'Content-Length': Buffer.byteLength(payload), }, agent: isHttps ? httpsAgent : httpAgent, timeout: 15000, // 连接超时 15s }; // ─── 流式请求 (不重试, 因为 headers 一旦发送不可回退) ─── if (stream) { return _proxyChatStream(requestOpts, payload, res, isHttps); } // ─── 非流式请求 (支持重试) ─── let lastError = null; for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { try { const result = await _proxyChatOnce(requestOpts, payload, isHttps); // 上游返回可重试状态码 if (result.status && RETRYABLE_CODES.has(result.status) && attempt < MAX_RETRIES) { const delay = result.status === 429 ? _parseRetryAfter(result.headers) || retryDelay(attempt) : retryDelay(attempt); await sleep(delay); continue; } return result; } catch (err) { lastError = err; if (isRetryable(err) && attempt < MAX_RETRIES) { await sleep(retryDelay(attempt)); continue; } throw err; } } throw lastError; } // 解析 Retry-After 头 (秒或日期) function _parseRetryAfter(headers) { const val = headers && headers['retry-after']; if (!val) return null; const secs = parseInt(val, 10); if (!isNaN(secs) && secs > 0 && secs < 120) return secs * 1000; return null; } // 单次非流式请求 function _proxyChatOnce(requestOpts, payload, isHttps) { return new Promise((resolve, reject) => { const transport = isHttps ? https : http; const proxyReq = transport.request(requestOpts, (proxyRes) => { // 连接成功 → 切换为读取超时 180s proxyReq.setTimeout(180_000); const chunks = []; proxyRes.on('data', (chunk) => chunks.push(chunk)); proxyRes.on('end', () => { const raw = Buffer.concat(chunks).toString('utf8'); try { resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), headers: proxyRes.headers }); } catch { resolve({ status: proxyRes.statusCode, data: raw, headers: proxyRes.headers }); } }); proxyRes.on('error', reject); }); proxyReq.on('error', reject); proxyReq.on('timeout', () => { proxyReq.destroy(new Error('Claude API 连接超时 (15s)')); }); proxyReq.write(payload); proxyReq.end(); }); } // 流式请求 (不重试) function _proxyChatStream(requestOpts, payload, res, isHttps) { return new Promise((resolve, reject) => { const transport = isHttps ? https : http; const proxyReq = transport.request(requestOpts, (proxyRes) => { // 连接成功 → 切换为读取超时 300s (流式更长) proxyReq.setTimeout(300_000); // 上游返回错误: 不走 SSE, 收集后 JSON 返回 if (proxyRes.statusCode !== 200) { const chunks = []; proxyRes.on('data', (chunk) => chunks.push(chunk)); proxyRes.on('end', () => { const raw = Buffer.concat(chunks).toString('utf8'); try { resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), streamed: false }); } catch { resolve({ status: proxyRes.statusCode, data: { error: raw }, streamed: false }); } }); proxyRes.on('error', reject); return; } // 正常 SSE 透传 res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', }); // 启动心跳 const heartbeatTimer = startSSEHeartbeat(res); let tokensIn = 0, tokensOut = 0; let fullText = ''; proxyRes.on('data', (chunk) => { // backpressure: 如果客户端消费慢, 暂停上游 const canWrite = res.write(chunk); if (!canWrite) { proxyRes.pause(); res.once('drain', () => proxyRes.resume()); } // 提取用量 + 全文 try { const text = chunk.toString(); for (const line of text.split('\n')) { if (!line.startsWith('data: ') || line === 'data: [DONE]') continue; const obj = JSON.parse(line.slice(6)); if (obj.type === 'message_start' && obj.message?.usage) { tokensIn = obj.message.usage.input_tokens || 0; } else if (obj.type === 'content_block_delta' && obj.delta?.text) { fullText += obj.delta.text; } else if (obj.type === 'message_delta' && obj.usage) { tokensOut = obj.usage.output_tokens || 0; } } } catch { /* 解析失败不影响透传 */ } }); proxyRes.on('end', () => { clearInterval(heartbeatTimer); res.end(); resolve({ streamed: true, status: 200, usage: { tokensIn, tokensOut }, fullText: fullText || undefined }); }); proxyRes.on('error', (err) => { clearInterval(heartbeatTimer); // 尝试发送 SSE 错误事件后关闭 try { res.write(`data: ${JSON.stringify({ type: 'error', error: { type: 'stream_error', message: err.message } })}\n\n`); } catch { /* ignore */ } res.end(); reject(err); }); // 客户端断开时清理 res.on('close', () => { clearInterval(heartbeatTimer); proxyReq.destroy(); }); }); proxyReq.on('error', (err) => { reject(err); }); proxyReq.on('timeout', () => { proxyReq.destroy(new Error('Claude API 连接超时 (15s)')); }); proxyReq.write(payload); proxyReq.end(); }); } module.exports = { proxyChat, validateBaseUrl };