bookworm-boot/patches/llm-router-v2.js

612 lines
20 KiB
JavaScript
Raw Permalink Normal View History

'use strict';
/**
* LLM 路由 v2 稳定性增强版
*
* 改进点:
* 1. SSE 心跳 ( 15s :ping, 防止 Nginx 超时断连)
* 2. backpressure 处理 (暂停上游 when 客户端消费慢)
* 3. 非流式请求瞬态重试 (429/502/503 + ECONNRESET)
* 4. 流式错误发送 SSE error event 而非静默断开
* 5. 连接/读取超时分离 (15s/180s)
*
* @module src/llm-router
*/
const https = require('https');
const http = require('http');
const { URL } = require('url');
// ─── DNS 预解析缓存 (B2 修复: 最大 100 条, 防内存泄露) ───
const dns = require('dns');
const _dnsCache = new Map();
const DNS_CACHE_TTL = 300000; // 5 分钟
const DNS_MAX_ENTRIES = 100;
function cachedLookup(hostname, options, callback) {
if (typeof options === 'function') { callback = options; options = {}; }
const cached = _dnsCache.get(hostname);
if (cached && Date.now() - cached.ts < DNS_CACHE_TTL) {
return process.nextTick(() => callback(null, cached.address, cached.family));
}
dns.lookup(hostname, options, (err, address, family) => {
if (!err) {
// 超限时淘汰最旧条目
if (_dnsCache.size >= DNS_MAX_ENTRIES) {
const oldest = _dnsCache.keys().next().value; // Map 保持插入顺序
_dnsCache.delete(oldest);
}
_dnsCache.set(hostname, { address, family, ts: Date.now() });
}
callback(err, address, family);
});
}
// ─── SSRF 防护 (B1 修复: 从 proxy.js 同步) ───
const ALLOWED_LLM_HOSTS = new Set([
'api.anthropic.com', 'api.openai.com',
'dashscope.aliyuncs.com', 'api.deepseek.com', 'api.moonshot.cn',
'open.bigmodel.cn', 'ark.cn-beijing.volces.com',
'api.hunyuan.cloud.tencent.com', 'qianfan.baidubce.com',
'openrouter.ai',
]);
if (process.env.ALLOWED_API_HOSTS) {
for (const h of process.env.ALLOWED_API_HOSTS.split(',')) {
if (h.trim()) ALLOWED_LLM_HOSTS.add(h.trim());
}
}
function _isPrivateHost(hostname) {
const lower = hostname.replace(/^\[|\]$/g, '').toLowerCase();
if (/^f[cd][0-9a-f]{2}:/.test(lower)) return true;
if (/^fe[89ab][0-9a-f]:/.test(lower)) return true;
if (lower === '::1' || lower === '::') return true;
if (hostname.startsWith('[::ffff:')) return _isPrivateHost(hostname.slice(8, -1));
const parts = hostname.split('.');
if (parts.length === 4 && parts.every(p => /^\d+$/.test(p))) {
const [a, b] = parts.map(Number);
if (a === 10 || a === 127 || a === 0) return true;
if (a === 172 && b >= 16 && b <= 31) return true;
if (a === 192 && b === 168) return true;
if (a === 169 && b === 254) return true;
}
return hostname === 'localhost';
}
function _validateLLMBaseUrl(baseUrl) {
if (!baseUrl) return;
let url;
try { url = new URL(baseUrl); } catch { throw { status: 400, message: 'base_url 格式无效' }; }
if (ALLOWED_LLM_HOSTS.has(url.hostname)) return;
if (_isPrivateHost(url.hostname)) throw { status: 403, message: '不允许访问内网地址' };
throw { status: 403, message: '不允许的 LLM API 地址' };
}
// 启动预热
for (const h of ['dashscope.aliyuncs.com', 'api.deepseek.com', 'api.moonshot.cn', 'open.bigmodel.cn', 'ark.cn-beijing.volces.com', 'api.hunyuan.cloud.tencent.com', 'qianfan.baidubce.com']) {
dns.lookup(h, () => {});
}
// ─── HTTP Agent: keepAlive 连接池 + DNS 缓存 ───
const httpsAgent = new https.Agent({
keepAlive: true, maxSockets: 10, maxFreeSockets: 5,
keepAliveMsecs: 30000, lookup: cachedLookup,
});
const httpAgent = new http.Agent({
keepAlive: true, maxSockets: 10, maxFreeSockets: 5,
keepAliveMsecs: 30000, lookup: cachedLookup,
});
// ─── 重试配置 ───
const RETRYABLE_STATUS = new Set([429, 502, 503, 504]);
const RETRYABLE_ERRORS = new Set(['ECONNRESET', 'ETIMEDOUT', 'ECONNREFUSED', 'EPIPE', 'EAI_AGAIN']);
const MAX_RETRIES = 2;
function isRetryable(err, statusCode) {
if (statusCode && RETRYABLE_STATUS.has(statusCode)) return true;
if (err && RETRYABLE_ERRORS.has(err.code)) return true;
if (err && /socket hang up|ECONNRESET|ETIMEDOUT/i.test(err.message)) return true;
return false;
}
function retryDelay(attempt) {
return 1000 * Math.pow(2, attempt) + Math.random() * 500;
}
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
// ─── SSE 心跳 ───
const SSE_HEARTBEAT_MS = 15000;
function startSSEHeartbeat(res) {
const timer = setInterval(() => {
if (res.writableEnded || res.destroyed) { clearInterval(timer); return; }
try { res.write(':ping\n\n'); } catch { clearInterval(timer); }
}, SSE_HEARTBEAT_MS);
if (timer.unref) timer.unref();
return timer;
}
// ─── Provider 配置 ───
const PROVIDERS = {
anthropic: {
baseUrl: 'https://api.anthropic.com',
pathPrefix: '/v1/messages',
authHeader: 'x-api-key',
versionHeader: { 'anthropic-version': process.env.ANTHROPIC_API_VERSION || '2023-06-01' },
modelPrefixes: ['claude-'],
buildBody: (opts) => ({
model: opts.model,
messages: opts.messages,
max_tokens: opts.maxTokens || 8192,
stream: opts.stream || false,
...(opts.systemPrompt ? { system: opts.systemPrompt } : {}),
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.input_tokens || 0,
output_tokens: data?.usage?.output_tokens || 0,
}),
},
openai: {
baseUrl: 'https://api.openai.com',
pathPrefix: '/v1/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['gpt-', 'o1-', 'o3-', 'o4-', 'chatgpt-'],
buildBody: (opts) => {
const isReasoningModel = /^(o1|o3|o4)-/i.test(opts.model);
const tokenParam = isReasoningModel
? { max_completion_tokens: opts.maxTokens || 8192 }
: { max_tokens: opts.maxTokens || 8192 };
return {
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
...tokenParam,
stream: opts.stream || false,
};
},
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.completion_tokens || 0,
}),
},
qwen: {
baseUrl: 'https://dashscope.aliyuncs.com/compatible-mode',
pathPrefix: '/v1/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['qwen'],
buildBody: (opts) => ({
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
max_tokens: opts.maxTokens || 8192,
stream: opts.stream || false,
enable_thinking: false,
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.completion_tokens || 0,
}),
},
deepseek: {
baseUrl: 'https://api.deepseek.com',
pathPrefix: '/v1/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['deepseek-'],
buildBody: (opts) => ({
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
max_tokens: opts.maxTokens || 8192,
stream: opts.stream || false,
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.output_tokens || 0,
}),
},
kimi: {
baseUrl: 'https://api.moonshot.cn',
pathPrefix: '/v1/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['moonshot-', 'kimi'],
buildBody: (opts) => ({
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
max_tokens: opts.maxTokens || 8192,
stream: opts.stream || false,
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.completion_tokens || 0,
}),
},
zhipu: {
baseUrl: 'https://open.bigmodel.cn/api/paas',
pathPrefix: '/v4/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['glm-'],
buildBody: (opts) => ({
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
max_tokens: opts.maxTokens || 4096,
stream: opts.stream || false,
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.completion_tokens || 0,
}),
},
volcengine: {
baseUrl: 'https://ark.cn-beijing.volces.com/api/v3',
pathPrefix: '/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['doubao-', 'ep-'],
buildBody: (opts) => ({
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
max_tokens: opts.maxTokens || 4096,
stream: opts.stream || false,
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.completion_tokens || 0,
}),
},
hunyuan: {
baseUrl: 'https://api.hunyuan.cloud.tencent.com',
pathPrefix: '/v1/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['hunyuan-'],
buildBody: (opts) => ({
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
...(opts.maxTokens ? { max_tokens: opts.maxTokens } : {}),
stream: opts.stream || false,
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.completion_tokens || 0,
}),
},
baidu: {
baseUrl: 'https://qianfan.baidubce.com',
pathPrefix: '/v2/chat/completions',
authHeader: 'Authorization',
authPrefix: 'Bearer ',
versionHeader: {},
modelPrefixes: ['ernie-'],
buildBody: (opts) => ({
model: opts.model,
messages: [
...(opts.systemPrompt ? [{ role: 'system', content: opts.systemPrompt }] : []),
...opts.messages,
],
max_tokens: opts.maxTokens || 4096,
stream: opts.stream || false,
}),
parseUsage: (data) => ({
input_tokens: data?.usage?.prompt_tokens || 0,
output_tokens: data?.usage?.completion_tokens || 0,
}),
},
};
function detectProvider(model) {
if (!model) return 'qwen';
const lower = model.toLowerCase();
for (const [name, config] of Object.entries(PROVIDERS)) {
if (config.modelPrefixes.some(prefix => lower.startsWith(prefix))) {
return name;
}
}
return 'qwen';
}
function getProviderConfig(providerName, overrideBaseUrl) {
const config = PROVIDERS[providerName];
if (!config) throw { status: 400, message: `不支持的 provider: ${providerName}` };
// B1 修复: SSRF 防护
if (overrideBaseUrl) _validateLLMBaseUrl(overrideBaseUrl);
return { ...config, baseUrl: overrideBaseUrl || config.baseUrl };
}
function listProviders() {
return Object.entries(PROVIDERS).map(([name, config]) => ({
name, models: config.modelPrefixes, baseUrl: config.baseUrl,
}));
}
/**
* 核心 LLM 请求 非流式 (支持重试)
*/
function _sendNonStream(provider, apiKey, body, isHttps, requestOpts) {
return new Promise((resolve, reject) => {
const transport = isHttps ? https : http;
const proxyReq = transport.request(requestOpts, (proxyRes) => {
proxyReq.setTimeout(180_000);
const chunks = [];
proxyRes.on('data', c => chunks.push(c));
proxyRes.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf8');
try { resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), headers: proxyRes.headers }); }
catch { resolve({ status: proxyRes.statusCode, data: raw, headers: proxyRes.headers }); }
});
proxyRes.on('error', reject);
});
proxyReq.on('error', (err) => {
err.message = `LLM 请求失败: ${err.message}`;
reject(err);
});
proxyReq.on('timeout', () => {
proxyReq.destroy(new Error('LLM API 连接超时 (30s)'));
});
proxyReq.write(JSON.stringify(body));
proxyReq.end();
});
}
/**
* 核心 LLM 请求 流式 (不重试, 带心跳+backpressure)
*/
function _sendStream(provider, apiKey, body, res, isHttps, requestOpts) {
return new Promise((resolve, reject) => {
const transport = isHttps ? https : http;
const proxyReq = transport.request(requestOpts, (proxyRes) => {
proxyReq.setTimeout(300_000); // 流式读取 5 分钟
if (proxyRes.statusCode !== 200) {
const chunks = [];
proxyRes.on('data', c => chunks.push(c));
proxyRes.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf8');
try { resolve({ status: proxyRes.statusCode, data: JSON.parse(raw), streamed: false }); }
catch { resolve({ status: proxyRes.statusCode, data: { error: raw }, streamed: false }); }
});
proxyRes.on('error', reject);
return;
}
// SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
// 启动心跳
const heartbeatTimer = startSSEHeartbeat(res);
let lastDataLine = '';
let inThink = false;
let fullText = '';
proxyRes.on('data', c => {
const text = c.toString();
const lines = text.split('\n');
const outLines = [];
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
const jsonStr = line.slice(6);
lastDataLine = jsonStr;
try {
const parsed = JSON.parse(jsonStr);
const delta = parsed.choices?.[0]?.delta;
if (delta) {
if (delta.reasoning_content !== undefined) {
delete delta.reasoning_content;
}
if (delta.content) {
let content = delta.content;
if (inThink) {
const endIdx = content.indexOf('</think>');
if (endIdx !== -1) { inThink = false; content = content.slice(endIdx + 8); }
else content = '';
}
if (!inThink && content.includes('<think>')) {
const startIdx = content.indexOf('<think>');
const endIdx = content.indexOf('</think>', startIdx);
if (endIdx !== -1) { content = content.slice(0, startIdx) + content.slice(endIdx + 8); }
else { content = content.slice(0, startIdx); inThink = true; }
}
delta.content = content;
if (content) fullText += content;
}
}
outLines.push('data: ' + JSON.stringify(parsed));
} catch {
outLines.push(line);
}
} else {
outLines.push(line);
}
}
// backpressure 处理
const canWrite = res.write(outLines.join('\n'));
if (!canWrite) {
proxyRes.pause();
res.once('drain', () => proxyRes.resume());
}
});
proxyRes.on('end', () => {
clearInterval(heartbeatTimer);
res.end();
let usage = null;
try {
const parsed = JSON.parse(lastDataLine);
if (parsed.usage) {
usage = {
input_tokens: parsed.usage.prompt_tokens || parsed.usage.input_tokens || 0,
output_tokens: parsed.usage.completion_tokens || parsed.usage.output_tokens || 0,
};
}
} catch { /* 无 usage */ }
resolve({ streamed: true, status: 200, usage, fullText: fullText || undefined });
});
proxyRes.on('error', (err) => {
clearInterval(heartbeatTimer);
// 发送 SSE 错误事件
try {
res.write(`data: ${JSON.stringify({ error: { message: err.message, type: 'stream_error' } })}\n\n`);
} catch { /* ignore */ }
res.end();
reject(err);
});
res.on('close', () => {
clearInterval(heartbeatTimer);
proxyReq.destroy();
});
});
proxyReq.on('error', (err) => {
err.message = `LLM 流式请求失败: ${err.message}`;
reject(err);
});
proxyReq.on('timeout', () => {
proxyReq.destroy(new Error('LLM API 连接超时 (30s)'));
});
proxyReq.write(JSON.stringify(body));
proxyReq.end();
});
}
/**
* 通用 LLM 请求 自动选择流式/非流式, 非流式带重试
*/
async function sendLLMRequest(provider, apiKey, body, res, stream) {
const config = PROVIDERS[provider.name || provider] || PROVIDERS.qwen;
const base = provider.baseUrl || config.baseUrl;
let baseUrl;
try { baseUrl = new URL(base); }
catch { throw { status: 400, message: `base_url 格式无效: ${base}` }; }
const fullPath = baseUrl.pathname.replace(/\/$/, '') + config.pathPrefix;
const url = new URL(fullPath, base);
const isHttps = url.protocol === 'https:';
const headers = {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(JSON.stringify(body)),
...config.versionHeader,
};
if (config.authHeader === 'Authorization') {
headers['Authorization'] = (config.authPrefix || '') + apiKey;
} else {
headers[config.authHeader] = apiKey;
}
const requestOpts = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname,
method: 'POST',
headers,
agent: isHttps ? httpsAgent : httpAgent,
timeout: 30_000, // 连接超时 30s
};
if (stream && res) {
return _sendStream(provider, apiKey, body, res, isHttps, requestOpts);
}
// 非流式: 带重试
let lastError = null;
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
const result = await _sendNonStream(provider, apiKey, body, isHttps, requestOpts);
if (RETRYABLE_STATUS.has(result.status) && attempt < MAX_RETRIES) {
await sleep(retryDelay(attempt));
continue;
}
return result;
} catch (err) {
lastError = err;
if (isRetryable(err) && attempt < MAX_RETRIES) {
await sleep(retryDelay(attempt));
continue;
}
throw err;
}
}
throw lastError;
}
// --- Circuit Breaker + EWMA hooks ---
let _circuitBreaker = null;
let _providerHealth = null;
function initPerformanceHooks(opts) {
_circuitBreaker = opts.circuitBreaker || null;
_providerHealth = opts.providerHealth || null;
}
function sendLLMRequestWithCB(provider, apiKey, body, res, stream) {
const providerName = provider.name || provider;
const startMs = Date.now();
if (_circuitBreaker && !_circuitBreaker.canRequest(providerName)) {
return Promise.reject(Object.assign(new Error('Circuit Breaker OPEN: ' + providerName), { status: 503 }));
}
return sendLLMRequest(provider, apiKey, body, res, stream).then(
(result) => {
const elapsed = Date.now() - startMs;
const success = result.streamed ? true : (result.status >= 200 && result.status < 500);
if (_circuitBreaker && success) _circuitBreaker.recordSuccess(providerName);
if (_providerHealth && _providerHealth.recordRequestLatency) _providerHealth.recordRequestLatency(providerName, elapsed, success);
return result;
},
(err) => {
const elapsed = Date.now() - startMs;
if (_circuitBreaker) _circuitBreaker.recordFailure(providerName);
if (_providerHealth && _providerHealth.recordRequestLatency) _providerHealth.recordRequestLatency(providerName, elapsed, false);
throw err;
}
);
}
module.exports = {
detectProvider, getProviderConfig, listProviders,
sendLLMRequest: sendLLMRequestWithCB,
sendLLMRequestRaw: sendLLMRequest,
initPerformanceHooks,
PROVIDERS,
};