/** * StreamProcessor * 负责处理流式响应 * * 职责: * - 管理流式请求 * - 处理流式数据块 * - 缓冲和批量输出 * - 性能监控 * - 错误处理和取消 */ import type { Conversation } from '../../types/chat' import { modelServiceManager } from '../modelServiceManager' import { mcpClientService } from '../MCPClientService' import { logger } from '../../utils/logger' import { ServiceError, ErrorCode } from '../../utils/error' const log = logger.namespace('StreamProcessor') /** * 流式处理选项 */ export interface StreamOptions { conversation: Conversation model?: string mcpServerId?: string signal?: AbortSignal onChunk: (chunk: string) => void } /** * 流式处理结果 */ export interface StreamResult { success: boolean error?: string toolCalls?: any[] metrics?: { totalTime: number firstChunkDelay: number chunkCount: number } } export class StreamProcessor { private static readonly MAX_CONTEXT_MESSAGES = 20 private static readonly BATCH_SIZE = 3 // 每3个字符输出一次,增强流式效果 /** * 处理流式请求 */ async processStream(options: StreamOptions): Promise { const startTime = performance.now() log.info('开始流式处理') try { // 获取工具列表 const { tools, mcpServerName } = await this.prepareTools(options.mcpServerId) // 准备消息列表 const messages = this.prepareMessages(options.conversation, tools, mcpServerName) // 选择服务和模型 const { service, selectedModel } = this.selectServiceAndModel(options.model) log.info('流式处理配置', { service: service.name, model: selectedModel, mcpServer: options.mcpServerId || '未选择', toolCount: tools.length, messageCount: messages.length }) // 执行流式请求 const result = await this.executeStream( service.id, messages, selectedModel, options.onChunk, tools, options.signal, startTime ) const endTime = performance.now() log.info('流式处理完成', { totalTime: (endTime - startTime).toFixed(2) + 'ms', chunkCount: result.metrics?.chunkCount }) return result } catch (error) { log.error('流式处理失败', error) throw new ServiceError( error instanceof Error ? error.message : '流式请求失败', ErrorCode.STREAMING_ERROR, { conversation: options.conversation.id } ) } } /** * 准备工具列表 */ private async prepareTools(mcpServerId?: string): Promise<{ tools: any[] mcpServerName: string }> { let tools: any[] = [] let mcpServerName = '' if (mcpServerId) { log.debug('获取 MCP 工具', { mcpServerId }) const mcpTools = mcpClientService.getTools(mcpServerId) const serverInfo = mcpClientService.getServerInfo(mcpServerId) mcpServerName = serverInfo?.name || 'mcp' tools = this.convertToolsToOpenAIFormat(mcpTools, mcpServerName) log.info('MCP 工具已准备', { serverName: mcpServerName, toolCount: tools.length }) } else { log.debug('未选择 MCP 服务器,不注入工具') } return { tools, mcpServerName } } /** * 准备消息列表 */ private prepareMessages( conversation: Conversation, tools: any[], mcpServerName: string ): any[] { // 过滤成功的消息 let messages = conversation.messages .filter(m => m.status === 'success' || m.status === 'paused') .map(m => ({ role: m.role, content: m.content })) // 限制上下文 if (messages.length > StreamProcessor.MAX_CONTEXT_MESSAGES) { log.info('限制上下文', { from: messages.length, to: StreamProcessor.MAX_CONTEXT_MESSAGES }) messages = messages.slice(-StreamProcessor.MAX_CONTEXT_MESSAGES) } // 添加工具系统提示词 if (tools.length > 0 && messages.length > 0 && messages[0].role !== 'system') { const systemPrompt = this.createSystemPromptWithTools(tools, mcpServerName) messages = [ { role: 'system', content: systemPrompt }, ...messages ] } log.debug('消息列表已准备', { messageCount: messages.length, hasSystemPrompt: messages[0]?.role === 'system' }) return messages } /** * 选择服务和模型 */ private selectServiceAndModel(requestedModel?: string): { service: any selectedModel: string } { const allServices = modelServiceManager.getAllServices() const services = allServices.filter(s => s.status === 'connected') if (services.length === 0) { throw new ServiceError( '没有可用的模型服务,请先在"模型服务"中添加并连接服务', ErrorCode.MODEL_NOT_AVAILABLE ) } let service = services[0] let selectedModel = requestedModel || service.models?.[0] || 'default' // 如果指定了模型,尝试找到拥有该模型的服务 if (requestedModel) { const foundService = services.find(s => s.models && s.models.includes(requestedModel) ) if (foundService) { service = foundService selectedModel = requestedModel log.debug('找到匹配服务', { service: foundService.name }) } else { log.warn('未找到包含该模型的服务,使用默认服务', { requestedModel }) } } return { service, selectedModel } } /** * 执行流式请求 */ private async executeStream( serviceId: string, messages: any[], model: string, onChunk: (chunk: string) => void, tools: any[], signal?: AbortSignal, startTime?: number ): Promise { const beforeStreamCall = performance.now() let chunkCount = 0 let firstChunkDelay = 0 let buffer = '' log.info('开始流式请求') const result = await modelServiceManager.sendChatRequestStream( serviceId, messages, model, (chunk) => { chunkCount++ // 记录首字延迟 if (chunkCount === 1) { firstChunkDelay = performance.now() - beforeStreamCall log.debug('首字延迟', { delay: firstChunkDelay.toFixed(2) + 'ms' }) } // 批量输出,增强流式效果 buffer += chunk if (buffer.length >= StreamProcessor.BATCH_SIZE) { const output = buffer buffer = '' onChunk(output) } }, tools.length > 0 ? tools : undefined, signal ) // 输出剩余缓冲区内容 if (buffer.length > 0) { onChunk(buffer) } const afterStreamCall = performance.now() const totalTime = startTime ? afterStreamCall - startTime : afterStreamCall - beforeStreamCall log.info('流式请求完成', { chunkCount, totalTime: totalTime.toFixed(2) + 'ms', firstChunkDelay: firstChunkDelay.toFixed(2) + 'ms' }) if (!result.success) { throw new ServiceError( result.error || '流式请求失败', ErrorCode.STREAMING_ERROR ) } return { success: true, toolCalls: result.data?.toolCalls, metrics: { totalTime, firstChunkDelay, chunkCount } } } /** * 转换工具为 OpenAI 格式 */ private convertToolsToOpenAIFormat(mcpTools: any[], serverName: string): any[] { if (!Array.isArray(mcpTools)) { log.warn('工具列表不是数组', { mcpTools }) return [] } return mcpTools.map(tool => ({ type: 'function', function: { name: `${serverName}__${tool.name}`, description: tool.description || tool.name, parameters: tool.inputSchema || { type: 'object', properties: {}, required: [] } } })) } /** * 创建包含工具信息的系统提示词 */ private createSystemPromptWithTools(tools: any[], serverName: string): string { const toolList = tools.map(t => `- ${t.function.name}: ${t.function.description}`).join('\n') return `你是一个智能助手,可以使用以下工具来帮助用户: 可用工具列表: ${toolList} 当需要使用工具时,请按照 OpenAI 的 function calling 格式调用。工具名称格式为:${serverName}__工具名称 注意事项: 1. 仔细阅读工具的描述,确保理解其功能 2. 根据用户需求选择合适的工具 3. 提供准确的参数 4. 可以连续调用多个工具来完成复杂任务 现在,请根据用户的需求提供帮助。` } } // 导出单例 export const streamProcessor = new StreamProcessor()