'use strict'; /** * 多 LLM 路由 v2 — 稳定性增强版 * * 改进点: * 1. SSE 心跳 (每 15s :ping, 防止 Nginx 超时断连) * 2. backpressure 处理 (暂停上游 when 客户端消费慢) * 3. 非流式请求瞬态重试 (429/502/503 + ECONNRESET) * 4. 流式错误发送 SSE error event 而非静默断开 * 5. 连接/读取超时分离 (15s/180s) * * @module src/llm-router */ const https = require('https'); const http = require('http'); const { URL } = require('url'); // ─── DNS 预解析缓存 (B2 修复: 最大 100 条, 防内存泄露) ─── const dns = require('dns'); const _dnsCache = new Map(); const DNS_CACHE_TTL = 300000; // 5 分钟 const DNS_MAX_ENTRIES = 100; function cachedLookup(hostname, options, callback) { if (typeof options === 'function') { callback = options; options = {}; } const cached = _dnsCache.get(hostname); if (cached && Date.now() - cached.ts < DNS_CACHE_TTL) { return process.nextTick(() => callback(null, cached.address, cached.family)); } dns.lookup(hostname, options, (err, address, family) => { if (!err) { // 超限时淘汰最旧条目 if (_dnsCache.size >= DNS_MAX_ENTRIES) { const oldest = _dnsCache.keys().next().value; // Map 保持插入顺序 _dnsCache.delete(oldest); } _dnsCache.set(hostname, { address, family, ts: Date.now() }); } callback(err, address, family); }); } // ─── SSRF 防护 (B1 修复: 从 proxy.js 同步) ─── const ALLOWED_LLM_HOSTS = new Set([ 'api.anthropic.com', 'api.openai.com', 'dashscope.aliyuncs.com', 'api.deepseek.com', 'api.moonshot.cn', 'open.bigmodel.cn', 'ark.cn-beijing.volces.com', 'api.hunyuan.cloud.tencent.com', 'qianfan.baidubce.com', 'openrouter.ai', ]); if (process.env.ALLOWED_API_HOSTS) { for (const h of process.env.ALLOWED_API_HOSTS.split(',')) { if (h.trim()) ALLOWED_LLM_HOSTS.add(h.trim()); } } function _isPrivateHost(hostname) { const lower = hostname.replace(/^\[|\]$/g, '').toLowerCase(); if (/^f[cd][0-9a-f]{2}:/.test(lower)) return true; if (/^fe[89ab][0-9a-f]:/.test(lower)) return true; if (lower === '::1' || lower === '::') return true; if (hostname.startsWith('[::ffff:')) return _isPrivateHost(hostname.slice(8, -1)); const parts = hostname.split('.'); if (parts.length === 4 && parts.every(p => /^\d+$/.test(p))) { const [a, b] = parts.map(Number); if (a === 10 || a === 127 || a === 0) return true; if (a === 172 && b >= 16 && b <= 31) return true; if (a === 192 && b === 168) return true; if (a === 169 && b === 254) return true; } return hostname === 'localhost'; } function _validateLLMBaseUrl(baseUrl) { if (!baseUrl) return; let url; try { url = new URL(baseUrl); } catch { throw { status: 400, message: 'base_url 格式无效' }; } if (ALLOWED_LLM_HOSTS.has(url.hostname)) return; if (_isPrivateHost(url.hostname)) throw { status: 403, message: '不允许访问内网地址' }; throw { status: 403, message: '不允许的 LLM API 地址' }; } // 启动预热 for (const h of ['dashscope.aliyuncs.com', 'api.deepseek.com', 'api.moonshot.cn', 'open.bigmodel.cn', 'ark.cn-beijing.volces.com', 'api.hunyuan.cloud.tencent.com', 'qianfan.baidubce.com']) { dns.lookup(h, () => {}); } // ─── HTTP Agent: keepAlive 连接池 + DNS 缓存 ─── const httpsAgent = new https.Agent({ keepAlive: true, maxSockets: 10, maxFreeSockets: 5, keepAliveMsecs: 30000, lookup: cachedLookup, }); const httpAgent = new http.Agent({ keepAlive: true, maxSockets: 10, maxFreeSockets: 5, keepAliveMsecs: 30000, lookup: cachedLookup, }); // ─── 重试配置 ─── const RETRYABLE_STATUS = new Set([429, 502, 503, 504]); const RETRYABLE_ERRORS = new Set(['ECONNRESET', 'ETIMEDOUT', 'ECONNREFUSED', 'EPIPE', 'EAI_AGAIN']); const MAX_RETRIES = 2; function isRetryable(err, statusCode) { if (statusCode && RETRYABLE_STATUS.has(statusCode)) return true; if (err && RETRYABLE_ERRORS.has(err.code)) return true; if (err && /socket hang up|ECONNRESET|ETIMEDOUT/i.test(err.message)) return true; return false; } function retryDelay(attempt) { return 1000 * Math.pow(2, attempt) + Math.random() * 500; } function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } // ─── SSE 心跳 ─── const SSE_HEARTBEAT_MS = 15000; function startSSEHeartbeat(res) { const timer = setInterval(() => { if (res.writableEnded || res.destroyed) { clearInterval(timer); return; } try { res.write(':ping\n\n'); } catch { clearInterval(timer); } }, SSE_HEARTBEAT_MS); if (timer.unref) timer.unref(); return timer; } // ─── Provider 配置 ─── const PROVIDERS = { anthropic: { baseUrl: 'https://api.anthropic.com', pathPrefix: '/v1/messages', authHeader: 'x-api-key', versionHeader: { 'anthropic-version': process.env.ANTHROPIC_API_VERSION || '2023-06-01' }, modelPrefixes: ['claude-'], buildBody: (opts) => ({ model: opts.model, messages: opts.messages, max_tokens: opts.maxTokens || 8192, stream: opts.stream || false, ...(opts.systemPrompt ? { system: opts.systemPrompt } : {}), }), parseUsage: (data) => ({ input_tokens: data?.usage?.input_tokens || 0, output_tokens: data?.usage?.output_tokens || 0, }), }, openai: { baseUrl: 'https://api.openai.com', pathPrefix: '/v1/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['gpt-', 'o1-', 'o3-', 'o4-', 'chatgpt-'], buildBody: (opts) => { const isReasoningModel = /^(o1|o3|o4)-/i.test(opts.model); const tokenParam = isReasoningModel ? { max_completion_tokens: opts.maxTokens || 8192 } : { max_tokens: opts.maxTokens || 8192 }; return { model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], ...tokenParam, stream: opts.stream || false, }; }, parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.completion_tokens || 0, }), }, qwen: { baseUrl: 'https://dashscope.aliyuncs.com/compatible-mode', pathPrefix: '/v1/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['qwen'], buildBody: (opts) => ({ model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], max_tokens: opts.maxTokens || 8192, stream: opts.stream || false, enable_thinking: false, }), parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.completion_tokens || 0, }), }, deepseek: { baseUrl: 'https://api.deepseek.com', pathPrefix: '/v1/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['deepseek-'], buildBody: (opts) => ({ model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], max_tokens: opts.maxTokens || 8192, stream: opts.stream || false, }), parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.output_tokens || 0, }), }, kimi: { baseUrl: 'https://api.moonshot.cn', pathPrefix: '/v1/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['moonshot-', 'kimi'], buildBody: (opts) => ({ model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], max_tokens: opts.maxTokens || 8192, stream: opts.stream || false, }), parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.completion_tokens || 0, }), }, zhipu: { baseUrl: 'https://open.bigmodel.cn/api/paas', pathPrefix: '/v4/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['glm-'], buildBody: (opts) => ({ model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], max_tokens: opts.maxTokens || 4096, stream: opts.stream || false, }), parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.completion_tokens || 0, }), }, volcengine: { baseUrl: 'https://ark.cn-beijing.volces.com/api/v3', pathPrefix: '/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['doubao-', 'ep-'], buildBody: (opts) => ({ model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], max_tokens: opts.maxTokens || 4096, stream: opts.stream || false, }), parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.completion_tokens || 0, }), }, hunyuan: { baseUrl: 'https://api.hunyuan.cloud.tencent.com', pathPrefix: '/v1/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['hunyuan-'], buildBody: (opts) => ({ model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], ...(opts.maxTokens ? { max_tokens: opts.maxTokens } : {}), stream: opts.stream || false, }), parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.completion_tokens || 0, }), }, baidu: { baseUrl: 'https://qianfan.baidubce.com', pathPrefix: '/v2/chat/completions', authHeader: 'Authorization', authPrefix: 'Bearer ', versionHeader: {}, modelPrefixes: ['ernie-'], buildBody: (opts) => ({ model: opts.model, messages: [ ...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []), ...opts.messages, ], max_tokens: opts.maxTokens || 4096, stream: opts.stream || false, }), parseUsage: (data) => ({ input_tokens: data?.usage?.prompt_tokens || 0, output_tokens: data?.usage?.completion_tokens || 0, }), }, }; function detectProvider(model) { if (!model) return 'qwen'; const lower = model.toLowerCase(); for (const [name, config] of Object.entries(PROVIDERS)) { if (config.modelPrefixes.some(prefix => lower.startsWith(prefix))) { return name; } } return 'qwen'; } function getProviderConfig(providerName, overrideBaseUrl) { const config = PROVIDERS[providerName]; if (!config) throw { status: 400, message: `不支持的 provider: ${providerName}` }; // B1 修复: SSRF 防护 if (overrideBaseUrl) _validateLLMBaseUrl(overrideBaseUrl); return { ...config, baseUrl: overrideBaseUrl || config.baseUrl }; } function listProviders() { return Object.entries(PROVIDERS).map(([name, config]) => ({ name, models: config.modelPrefixes, baseUrl: config.baseUrl, })); } /** * 核心 LLM 请求 — 非流式 (支持重试) */ function _sendNonStream(provider, apiKey, body, isHttps, requestOpts) { return new Promise((resolve, reject) => { const transport = isHttps ? https : http; const proxyReq = transport.request(requestOpts, (proxyRes) => { proxyReq.setTimeout(180_000); const chunks = []; proxyRes.on('data', c => chunks.push(c)); proxyRes.on('end', () => { const raw = Buffer.concat(chunks).toString('utf8'); try { resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), headers: proxyRes.headers }); } catch { resolve({ status: proxyRes.statusCode, data: raw, headers: proxyRes.headers }); } }); proxyRes.on('error', reject); }); proxyReq.on('error', (err) => { err.message = `LLM 请求失败: ${err.message}`; reject(err); }); proxyReq.on('timeout', () => { proxyReq.destroy(new Error('LLM API 连接超时 (30s)')); }); proxyReq.write(JSON.stringify(body)); proxyReq.end(); }); } /** * 核心 LLM 请求 — 流式 (不重试, 带心跳+backpressure) */ function _sendStream(provider, apiKey, body, res, isHttps, requestOpts) { return new Promise((resolve, reject) => { const transport = isHttps ? https : http; const proxyReq = transport.request(requestOpts, (proxyRes) => { proxyReq.setTimeout(300_000); // 流式读取 5 分钟 if (proxyRes.statusCode !== 200) { const chunks = []; proxyRes.on('data', c => chunks.push(c)); proxyRes.on('end', () => { const raw = Buffer.concat(chunks).toString('utf8'); try { resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), streamed: false }); } catch { resolve({ status: proxyRes.statusCode, data: { error: raw }, streamed: false }); } }); proxyRes.on('error', reject); return; } // SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', }); // 启动心跳 const heartbeatTimer = startSSEHeartbeat(res); let lastDataLine = ''; let inThink = false; let fullText = ''; proxyRes.on('data', c => { const text = c.toString(); const lines = text.split('\n'); const outLines = []; for (const line of lines) { if (line.startsWith('data: ') && line !== 'data: [DONE]') { const jsonStr = line.slice(6); lastDataLine = jsonStr; try { const parsed = JSON.parse(jsonStr); const delta = parsed.choices?.[0]?.delta; if (delta) { if (delta.reasoning_content !== undefined) { delete delta.reasoning_content; } if (delta.content) { let content = delta.content; if (inThink) { const endIdx = content.indexOf(''); if (endIdx !== -1) { inThink = false; content = content.slice(endIdx + 8); } else content = ''; } if (!inThink && content.includes('')) { const startIdx = content.indexOf(''); const endIdx = content.indexOf('', startIdx); if (endIdx !== -1) { content = content.slice(0, startIdx) + content.slice(endIdx + 8); } else { content = content.slice(0, startIdx); inThink = true; } } delta.content = content; if (content) fullText += content; } } outLines.push('data: ' + JSON.stringify(parsed)); } catch { outLines.push(line); } } else { outLines.push(line); } } // backpressure 处理 const canWrite = res.write(outLines.join('\n')); if (!canWrite) { proxyRes.pause(); res.once('drain', () => proxyRes.resume()); } }); proxyRes.on('end', () => { clearInterval(heartbeatTimer); res.end(); let usage = null; try { const parsed = JSON.parse(lastDataLine); if (parsed.usage) { usage = { input_tokens: parsed.usage.prompt_tokens || parsed.usage.input_tokens || 0, output_tokens: parsed.usage.completion_tokens || parsed.usage.output_tokens || 0, }; } } catch { /* 无 usage */ } resolve({ streamed: true, status: 200, usage, fullText: fullText || undefined }); }); proxyRes.on('error', (err) => { clearInterval(heartbeatTimer); // 发送 SSE 错误事件 try { res.write(`data: ${JSON.stringify({ error: { message: err.message, type: 'stream_error' } })}\n\n`); } catch { /* ignore */ } res.end(); reject(err); }); res.on('close', () => { clearInterval(heartbeatTimer); proxyReq.destroy(); }); }); proxyReq.on('error', (err) => { err.message = `LLM 流式请求失败: ${err.message}`; reject(err); }); proxyReq.on('timeout', () => { proxyReq.destroy(new Error('LLM API 连接超时 (30s)')); }); proxyReq.write(JSON.stringify(body)); proxyReq.end(); }); } /** * 通用 LLM 请求 — 自动选择流式/非流式, 非流式带重试 */ async function sendLLMRequest(provider, apiKey, body, res, stream) { const config = PROVIDERS[provider.name || provider] || PROVIDERS.qwen; const base = provider.baseUrl || config.baseUrl; let baseUrl; try { baseUrl = new URL(base); } catch { throw { status: 400, message: `base_url 格式无效: ${base}` }; } const fullPath = baseUrl.pathname.replace(/\/$/, '') + config.pathPrefix; const url = new URL(fullPath, base); const isHttps = url.protocol === 'https:'; const headers = { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(JSON.stringify(body)), ...config.versionHeader, }; if (config.authHeader === 'Authorization') { headers['Authorization'] = (config.authPrefix || '') + apiKey; } else { headers[config.authHeader] = apiKey; } const requestOpts = { hostname: url.hostname, port: url.port || (isHttps ? 443 : 80), path: url.pathname, method: 'POST', headers, agent: isHttps ? httpsAgent : httpAgent, timeout: 30_000, // 连接超时 30s }; if (stream && res) { return _sendStream(provider, apiKey, body, res, isHttps, requestOpts); } // 非流式: 带重试 let lastError = null; for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { try { const result = await _sendNonStream(provider, apiKey, body, isHttps, requestOpts); if (RETRYABLE_STATUS.has(result.status) && attempt < MAX_RETRIES) { await sleep(retryDelay(attempt)); continue; } return result; } catch (err) { lastError = err; if (isRetryable(err) && attempt < MAX_RETRIES) { await sleep(retryDelay(attempt)); continue; } throw err; } } throw lastError; } // --- Circuit Breaker + EWMA hooks --- let _circuitBreaker = null; let _providerHealth = null; function initPerformanceHooks(opts) { _circuitBreaker = opts.circuitBreaker || null; _providerHealth = opts.providerHealth || null; } function sendLLMRequestWithCB(provider, apiKey, body, res, stream) { const providerName = provider.name || provider; const startMs = Date.now(); if (_circuitBreaker && !_circuitBreaker.canRequest(providerName)) { return Promise.reject(Object.assign(new Error('Circuit Breaker OPEN: ' + providerName), { status: 503 })); } return sendLLMRequest(provider, apiKey, body, res, stream).then( (result) => { const elapsed = Date.now() - startMs; const success = result.streamed ? true : (result.status >= 200 && result.status < 500); if (_circuitBreaker && success) _circuitBreaker.recordSuccess(providerName); if (_providerHealth && _providerHealth.recordRequestLatency) _providerHealth.recordRequestLatency(providerName, elapsed, success); return result; }, (err) => { const elapsed = Date.now() - startMs; if (_circuitBreaker) _circuitBreaker.recordFailure(providerName); if (_providerHealth && _providerHealth.recordRequestLatency) _providerHealth.recordRequestLatency(providerName, elapsed, false); throw err; } ); } module.exports = { detectProvider, getProviderConfig, listProviders, sendLLMRequest: sendLLMRequestWithCB, sendLLMRequestRaw: sendLLMRequest, initPerformanceHooks, PROVIDERS, };