import { v4 as uuidv4 } from 'uuid'; /** * SSE (Server-Sent Events) 传输层实现 * 用于MCP协议的单向数据流传输 */ export class SSETransport { private eventSource: EventSource | null = null; private url: string; private pendingRequests = new Map(); private listeners = new Map(); private connected = false; constructor(url: string) { this.url = url; } async connect(): Promise { return new Promise(async (resolve, reject) => { try { // 首先建立SSE连接,获取sessionId console.log('📡 连接SSE端点:', this.url); // 第一步:连接SSE获取endpoint信息 this.eventSource = new EventSource(this.url); let resolveTimeout: NodeJS.Timeout; this.eventSource.addEventListener('endpoint', (event: any) => { const endpointData = event.data; console.log('✅ 收到SSE endpoint:', endpointData); // 提取sessionId(格式: /message?sessionId=xxx) const match = endpointData.match(/sessionId=([^&]+)/); if (match) { const sessionId = match[1]; console.log('📝 SSE sessionId:', sessionId); // 保存sessionId以便后续请求使用 (this as any).sessionId = sessionId; } this.connected = true; resolve(); }); this.eventSource.onopen = () => { console.log('📡 SSE连接已打开'); // 设置超时,如果10秒内没有收到endpoint事件则认为失败 resolveTimeout = setTimeout(() => { if (!this.connected) { reject(new Error('SSE连接超时:未收到endpoint')); } }, 10000); }; this.eventSource.onmessage = (event) => { try { const data = JSON.parse(event.data); console.log('📨 收到SSE消息:', data); this.handleMessage(data); } catch (error) { // 如果不是JSON,可能是普通文本消息 console.log('📨 收到SSE文本消息:', event.data); } }; this.eventSource.onerror = (error) => { console.error('❌ SSE连接错误:', error); this.connected = false; this.emit('disconnected'); if (this.eventSource?.readyState === EventSource.CLOSED) { reject(new Error('SSE连接失败')); } }; // 监听message事件(MCP响应) this.eventSource.addEventListener('message', (event: any) => { try { const message = JSON.parse(event.data); console.log('📨 收到MCP消息:', message); this.handleMessage(message); } catch (error) { console.error('❌ MCP消息解析失败:', error, event.data); } }); // 清理resolve超时 this.eventSource.addEventListener('endpoint', () => { if (resolveTimeout) { clearTimeout(resolveTimeout); } }); } catch (error) { console.error('❌ 创建SSE连接失败:', error); reject(error); } }); } async sendRequest(method: string, params?: any): Promise { const id = uuidv4(); const request = { jsonrpc: '2.0', id, method, params: params || {} }; return new Promise(async (resolve, reject) => { // 设置超时 const timeout = setTimeout(() => { this.pendingRequests.delete(id); reject(new Error(`SSE请求超时: ${method}`)); }, 30000); // 30秒超时 this.pendingRequests.set(id, { resolve, reject, timeout }); try { console.log(`📤 发送SSE请求 (${method}):`, request); // 获取sessionId const sessionId = (this as any).sessionId; if (!sessionId) { throw new Error('SSE sessionId未就绪'); } // 根据服务器endpoint构建URL // 例如: http://localhost:3200/message?sessionId=xxx const baseUrl = this.url.replace('/sse', ''); const messageUrl = `${baseUrl}/message?sessionId=${sessionId}`; console.log(`📤 发送到: ${messageUrl}`); // SSE模式:通过HTTP POST发送请求到/message端点,响应通过SSE事件流返回 const response = await fetch(messageUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json', }, body: JSON.stringify(request) }); if (!response.ok) { clearTimeout(timeout); this.pendingRequests.delete(id); const errorText = await response.text(); throw new Error(`HTTP ${response.status}: ${response.statusText} - ${errorText}`); } // 对于某些简单请求,可能直接返回JSON响应 const contentType = response.headers.get('content-type'); if (contentType && contentType.includes('application/json')) { clearTimeout(timeout); this.pendingRequests.delete(id); const result = await response.json(); if (result.error) { reject(new Error(result.error.message || '请求失败')); } else { resolve(result.result); } } // 否则等待SSE响应 } catch (error) { const pending = this.pendingRequests.get(id); if (pending) { clearTimeout(pending.timeout); this.pendingRequests.delete(id); } reject(error); } }); } private handleMessage(message: any): void { if (message.id && this.pendingRequests.has(message.id)) { const pending = this.pendingRequests.get(message.id)!; clearTimeout(pending.timeout); this.pendingRequests.delete(message.id); if (message.error) { pending.reject(new Error(message.error.message || '请求失败')); } else { pending.resolve(message.result); } } else if (!message.id && message.method) { // 处理通知消息 console.log('📢 收到通知:', message.method, message.params); this.emit('notification', message); } } on(event: string, callback: Function): void { if (!this.listeners.has(event)) { this.listeners.set(event, []); } this.listeners.get(event)!.push(callback); } off(event: string, callback: Function): void { const callbacks = this.listeners.get(event) || []; const index = callbacks.indexOf(callback); if (index > -1) { callbacks.splice(index, 1); } } private emit(event: string, data?: any): void { const callbacks = this.listeners.get(event) || []; callbacks.forEach(callback => { try { callback(data); } catch (error) { console.error('❌ SSE事件回调错误:', error); } }); } async disconnect(): Promise { console.log('🔌 断开SSE连接'); if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } this.connected = false; // 清理待处理的请求 this.pendingRequests.forEach(pending => { clearTimeout(pending.timeout); pending.reject(new Error('连接已断开')); }); this.pendingRequests.clear(); this.listeners.clear(); } get isConnected(): boolean { return this.connected && this.eventSource?.readyState === EventSource.OPEN; } }