import { ToolExecutor, type ToolCall, type ToolResult } from './tool-executor' import { getOpenRouterModel } from '@/lib/config/models' import { TOOL_COMPATIBLE_MODELS, DEFAULT_MODEL } from '@/lib/config/models' import { usageTracker } from './usage-tracker' const OPENROUTER_API_KEY = process.env.OPENROUTER_API_KEY || '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' const OPENROUTER_URL = 'https://openrouter.ai/api/v1/chat/completions' const MAX_ITERATIONS = 10 // Maximum number of tool call iterations const MAX_CONTEXT_MESSAGES = 10 // Maximum number of messages to include in context - aligned with frontend export interface StreamMessage { role: 'user' | 'assistant' | 'system' | 'tool' content?: string | null tool_calls?: ToolCall[] tool_call_id?: string name?: string } // Enhanced streaming message types for better UX export interface EnhancedStreamChunk { type: 'thinking' | 'planning' | 'executing' | 'content' | 'tool_result' | 'error' | 'status' content?: string tool_name?: string tool_args?: any tool_result?: any status?: string } export class ChatStreamHandler { private toolExecutor: ToolExecutor private encoder: TextEncoder private decoder: TextDecoder constructor(customSupabaseClient?: any) { this.toolExecutor = new ToolExecutor(customSupabaseClient) this.encoder = new TextEncoder() this.decoder = new TextDecoder() } /** * Clean streaming content by filtering out tool call markers and unwanted patterns */ private cleanStreamContent(content: string): string { if (!content) return content // Filter out tool call markers and other unwanted patterns - be very aggressive const cleaned = content .replace(/<\|tool_calls_section[^|]*\|>/g, '') // Remove tool call section markers .replace(/<\|tool_calls_sectioall_end\|>/g, '') // Remove malformed "sectioall_end" markers .replace(/<\|tool_calls_section_end\|>/g, '') // Remove section end markers .replace(/<\|tool_calls_[^|]*\|>/g, '') // Remove any tool_calls_* markers .replace(/<\|[^|]*tool[^|]*\|>/g, '') // Remove any markers containing "tool" .replace(/<\|[^|]*section[^|]*\|>/g, '') // Remove any markers containing "section" .replace(/<\|[^|]*end[^|]*\|>/g, '') // Remove any markers containing "end" .replace(/<\|[^|]*\|>/g, '') // Remove any other control markers .replace(/\s*\n\s*\n\s*\n/g, '\n\n') // Clean up excessive line breaks return cleaned } // Helper to emit enhanced stream chunks private emitStreamChunk(controller: ReadableStreamDefaultController, chunk: EnhancedStreamChunk) { controller.enqueue(this.encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`)) } async handleChatStream( messages: StreamMessage[], model: string, bookId?: string, contextFiles: any[] = [], userId?: string, abortSignal?: AbortSignal, chatId?: string ): Promise { if (!OPENROUTER_API_KEY) { throw new Error('OpenRouter API key not configured') } return new ReadableStream({ start: async (controller) => { // Initialize conversation state // Only include the most recent MAX_CONTEXT_MESSAGES messages let conversationHistoryForAI: StreamMessage[] = messages.slice(-MAX_CONTEXT_MESSAGES) // If we had to truncate messages, add a system message to inform the AI if (messages.length > MAX_CONTEXT_MESSAGES) { conversationHistoryForAI.unshift({ role: 'system', content: `Note: This conversation has been truncated to show only the most recent ${MAX_CONTEXT_MESSAGES} messages for context. There are ${messages.length - MAX_CONTEXT_MESSAGES} older messages not shown.` }) } let currentIteration = 1 let isFirstStreamChunk = true // Track cumulative usage for the entire user prompt let totalPromptTokens = 0 let totalCompletionTokens = 0 let totalTokens = 0 let hasSuccessfulCall = false let lastError: string | undefined try { // Send initial connection confirmation if (isFirstStreamChunk) { this.emitStreamChunk(controller, { type: 'status', status: 'connected' }) isFirstStreamChunk = false } while (currentIteration <= MAX_ITERATIONS) { // Check if request was aborted if (abortSignal?.aborted) { this.emitStreamChunk(controller, { type: 'error', content: 'Request cancelled by user' }) controller.close() return } console.log(`Tool iteration: ${currentIteration}`) // Enable tools if we have a bookId (even without context files) // This allows the AI to search for files and build context dynamically const shouldIncludeTools = !!bookId const currentModel = (shouldIncludeTools && !TOOL_COMPATIBLE_MODELS.includes(getOpenRouterModel(model))) ? DEFAULT_MODEL.openRouterModel : getOpenRouterModel(model) console.log('Model for iteration:', { requested: model, current: currentModel, toolsEnabled: shouldIncludeTools, iteration: currentIteration }) // Show initial status for first iteration if (currentIteration === 1) { this.emitStreamChunk(controller, { type: 'status', content: shouldIncludeTools ? 'Preparing AI response with file access capabilities...' : 'Preparing AI response...' }) } else { // Show thinking for subsequent iterations this.emitStreamChunk(controller, { type: 'thinking', content: `Analyzing request (iteration ${currentIteration})...` }) } const requestBody: any = { model: currentModel, messages: conversationHistoryForAI, stream: true, temperature: 0.7, max_tokens: 2000 } if (shouldIncludeTools) { requestBody.tools = this.toolExecutor.getToolDefinitions() requestBody.tool_choice = 'auto' console.log('Tools enabled for iteration ' + currentIteration) } // Track usage start time for this request const requestStartTime = new Date() const response = await fetch(OPENROUTER_URL, { method: 'POST', headers: { 'Authorization': `Bearer ${OPENROUTER_API_KEY}`, 'Content-Type': 'application/json', 'HTTP-Referer': process.env.NEXT_PUBLIC_SITE_URL || 'http://localhost:3000', 'X-Title': 'Bookwiz' }, body: JSON.stringify(requestBody), signal: abortSignal }) if (!response.ok) { const errorData = await response.text() console.error('OpenRouter API error:', errorData) lastError = `API error: ${response.status} - ${errorData}` // More informative error messages let errorMessage = 'Failed to get response from AI model' if (response.status === 429) { errorMessage = 'AI model is rate limited. Please try again in a moment.' } else if (response.status === 401) { errorMessage = 'API authentication failed. Please check configuration.' } else if (response.status >= 500) { errorMessage = 'AI service is temporarily unavailable. Please try again.' } this.emitStreamChunk(controller, { type: 'error', content: errorMessage }) controller.close() // Track failed request only once at the end if (userId) { await this.trackAIUsage({ userId, model, bookId, chatId, success: false, errorMessage: lastError, promptTokens: totalPromptTokens, completionTokens: totalCompletionTokens, totalTokens: totalTokens }) } return } const streamResult = await this.processStreamResponse(response, controller, currentIteration) // Accumulate token usage across all iterations totalPromptTokens += streamResult.promptTokens || 0 totalCompletionTokens += streamResult.completionTokens || 0 totalTokens = totalPromptTokens + totalCompletionTokens hasSuccessfulCall = true // Add assistant message to history only if there's content or valid tool calls if (streamResult.content?.trim() || streamResult.toolCalls.length > 0) { conversationHistoryForAI.push({ role: 'assistant', content: streamResult.content?.trim() || '', tool_calls: streamResult.toolCalls.length > 0 ? streamResult.toolCalls : undefined }) } if (streamResult.toolCalls.length > 0) { // Show planning phase this.emitStreamChunk(controller, { type: 'planning', content: `I need to execute ${streamResult.toolCalls.length} tool${streamResult.toolCalls.length > 1 ? 's' : ''}: ${streamResult.toolCalls.map(tc => tc.function.name).join(', ')}` }) // Execute tools and add results to conversation const toolResults = await this.executeTools( streamResult.toolCalls, bookId!, controller, userId ) conversationHistoryForAI.push(...toolResults) currentIteration++ continue } else { // Check if we need self-reflection (but prevent infinite loops) if (currentIteration < MAX_ITERATIONS && !streamResult.content?.trim() && streamResult.toolCalls.length === 0) { // Only self-reflect if we haven't done it already for this conversation const hasReflected = conversationHistoryForAI.some(msg => msg.role === 'system' && msg.content?.includes('TASK COMPLETION CHECK') ) if (!hasReflected) { console.log('Adding self-reflection prompt') this.emitStreamChunk(controller, { type: 'thinking', content: 'Reviewing task completion...' }) conversationHistoryForAI.push({ role: 'system', content: `STRATEGIC ANALYSIS: You haven't provided a response. Let's think strategically: 1. What did the user originally ask for? 2. What searches/actions have you already tried? Did they work? 3. If search_files returned empty, try smart_search with the same term, or list_files to see what's available. 4. If you found files but haven't read them yet, use read_file. 5. Adapt your approach based on what worked and what didn't. Be strategic: if one approach failed, try a different one. Don't repeat the same failed action.` }) currentIteration++ continue } else { // We've already reflected once, just provide a default response this.emitStreamChunk(controller, { type: 'content', content: "I'm having trouble processing your request. Could you please rephrase or provide more specific details about what you'd like me to help you with?" }) break } } // Final response received break } } if (currentIteration > MAX_ITERATIONS) { console.log('Reached maximum iterations') } // Track successful AI usage once for the entire user prompt if (userId && hasSuccessfulCall) { await this.trackAIUsage({ userId, model, bookId, chatId, success: true, promptTokens: totalPromptTokens, completionTokens: totalCompletionTokens, totalTokens: totalTokens }) } // Send completion status this.emitStreamChunk(controller, { type: 'status', status: 'completed' }) controller.close() } catch (error: any) { // Handle different types of abort/cancellation errors gracefully if (error.name === 'AbortError' || error.message.includes('aborted') || error.message.includes('ResponseAborted') || error.message === 'Request cancelled by client') { console.log('Chat stream was cancelled:', error.message) // Don't try to write to the controller if it's already closed due to abort // The client-side will handle showing the stop message } else { console.error('Chat stream error:', error) // Track failed request only once for the entire user prompt if (userId && !hasSuccessfulCall) { await this.trackAIUsage({ userId, model, bookId, chatId, success: false, errorMessage: `Stream error: ${error.message}`, promptTokens: totalPromptTokens, completionTokens: totalCompletionTokens, totalTokens: totalTokens }) } try { this.emitStreamChunk(controller, { type: 'error', content: 'Stream error: ' + error.message }) } catch (e) { /* controller might be closed */ } } try { controller.close() } catch (e) { /* already closed, which is expected when aborted */ } } } }) } /** * Track AI usage for billing and analytics */ private async trackAIUsage({ userId, model, bookId, chatId, success = true, promptTokens = 0, completionTokens = 0, totalTokens = 0, errorMessage }: { userId: string model: string bookId?: string chatId?: string success?: boolean promptTokens?: number completionTokens?: number totalTokens?: number errorMessage?: string }) { try { // Get model provider from the model name const getModelProvider = (modelName: string): string => { if (modelName.toLowerCase().includes('claude') || modelName.toLowerCase().includes('sonnet')) { return 'anthropic' } if (modelName.toLowerCase().includes('gemini')) { return 'google' } if (modelName.toLowerCase().includes('gpt')) { return 'openai' } return 'unknown' } await usageTracker.recordUsage({ user_id: userId, book_id: bookId, chat_id: chatId, model_name: model, model_provider: getModelProvider(model), prompt_tokens: promptTokens, completion_tokens: completionTokens, total_tokens: totalTokens || (promptTokens + completionTokens), request_type: 'chat', success, error_message: errorMessage }) } catch (error) { // Silent fail - usage tracking shouldn't break the main flow console.error('Failed to track AI usage:', error) } } /** * Process the raw streaming response from OpenRouter API * This is the SINGLE place where raw JSON buffering and parsing happens. * The frontend receives clean, structured chunks from this processing. */ private async processStreamResponse( response: Response, controller: ReadableStreamDefaultController, iteration: number ): Promise<{ content: string; toolCalls: ToolCall[]; promptTokens?: number; completionTokens?: number; totalTokens?: number }> { const reader = response.body!.getReader() let assistantContent = "" let toolCalls: ToolCall[] = [] let promptTokens = 0 let completionTokens = 0 let totalTokens = 0 let toolCallsAccumulator = new Map() let indexToIdMap = new Map() let buffer = '' // Buffer for incomplete chunks try { while (true) { const { done, value } = await reader.read() if (done) break // Decode chunk and add to buffer const chunk = this.decoder.decode(value, { stream: true }) buffer += chunk // Process complete lines from buffer const lines = buffer.split('\n') // Keep the last line in buffer (it might be incomplete) buffer = lines.pop() || '' for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6).trim() // Skip empty data if (!data || data === '') continue if (data === '[DONE]') { // Finalize any remaining tool calls that weren't completed during streaming Array.from(toolCallsAccumulator.entries()).forEach(([key, toolCall]) => { this.validateAndFinalizeToolCall(toolCall, toolCalls, key) }) // Final cleaning pass on accumulated content to catch any missed markers const finalCleanedContent = this.cleanStreamContent(assistantContent) return { content: finalCleanedContent, toolCalls, promptTokens, completionTokens, totalTokens } } try { const parsed = JSON.parse(data) const delta = parsed.choices?.[0]?.delta const messageFromChoice = parsed.choices?.[0]?.message // Capture usage statistics when available (usually in the last chunk) if (parsed.usage) { promptTokens = parsed.usage.prompt_tokens || 0 completionTokens = parsed.usage.completion_tokens || 0 totalTokens = parsed.usage.total_tokens || 0 } // Handle content streaming with enhanced protocol if (delta?.content) { // Filter out tool call markers and other unwanted patterns const cleanContent = this.cleanStreamContent(delta.content) if (cleanContent) { assistantContent += cleanContent this.emitStreamChunk(controller, { type: 'content', content: cleanContent }) } } // Handle tool calls (streaming) if (delta?.tool_calls) { for (const toolCallDelta of delta.tool_calls) { // Get the correct ID - either from the delta or from our index mapping const id = toolCallDelta.id || indexToIdMap.get(toolCallDelta.index) const key = id || `index_${toolCallDelta.index}` // If this is a new tool call with an ID, store the mapping if (toolCallDelta.id && toolCallDelta.index !== undefined) { indexToIdMap.set(toolCallDelta.index, toolCallDelta.id) } if (!toolCallsAccumulator.has(key)) { // Initialize new tool call with required fields toolCallsAccumulator.set(key, { id: id || `temp_${Date.now()}_${key}`, type: 'function', function: { name: '', arguments: '' } }) } const existing = toolCallsAccumulator.get(key) // Handle name updates if (toolCallDelta.function?.name) { existing.function.name = toolCallDelta.function.name } // Handle arguments updates if (toolCallDelta.function?.arguments) { existing.function.arguments += toolCallDelta.function.arguments // Check if this tool call looks complete (has both name and valid JSON arguments) if (existing.function.name && this.isToolCallComplete(existing)) { this.validateAndFinalizeToolCall(existing, toolCalls, key) } } } } // Handle complete tool calls (non-streaming) if (messageFromChoice?.tool_calls && !delta?.tool_calls) { const validToolCalls = messageFromChoice.tool_calls.filter((tc: ToolCall) => { try { JSON.parse(tc.function.arguments) return true } catch (e) { console.warn('Skipping tool call with invalid JSON arguments:', tc) return false } }) if (validToolCalls.length > 0) { toolCalls.push(...validToolCalls) } } } catch (e) { console.warn('Failed to parse stream data:', { error: e instanceof Error ? e.message : String(e), data: data?.substring(0, 200) + (data && data.length > 200 ? '...' : ''), dataLength: data?.length || 0 }) // Skip invalid JSON - but we've already tried to buffer it properly } } } } // Process any remaining data in buffer if (buffer.startsWith('data: ')) { const data = buffer.slice(6).trim() if (data && data !== '[DONE]') { try { const parsed = JSON.parse(data) const delta = parsed.choices?.[0]?.delta const messageFromChoice = parsed.choices?.[0]?.message // Capture usage statistics when available (usually in the last chunk) if (parsed.usage) { promptTokens = parsed.usage.prompt_tokens || 0 completionTokens = parsed.usage.completion_tokens || 0 totalTokens = parsed.usage.total_tokens || 0 } // Handle content streaming with enhanced protocol if (delta?.content) { // Filter out tool call markers and other unwanted patterns const cleanContent = this.cleanStreamContent(delta.content) if (cleanContent) { assistantContent += cleanContent this.emitStreamChunk(controller, { type: 'content', content: cleanContent }) } } // Handle tool calls (streaming) if (delta?.tool_calls) { for (const toolCallDelta of delta.tool_calls) { // Get the correct ID - either from the delta or from our index mapping const id = toolCallDelta.id || indexToIdMap.get(toolCallDelta.index) const key = id || `index_${toolCallDelta.index}` // If this is a new tool call with an ID, store the mapping if (toolCallDelta.id && toolCallDelta.index !== undefined) { indexToIdMap.set(toolCallDelta.index, toolCallDelta.id) } if (!toolCallsAccumulator.has(key)) { // Initialize new tool call with required fields toolCallsAccumulator.set(key, { id: id || `temp_${Date.now()}_${key}`, type: 'function', function: { name: '', arguments: '' } }) } const existing = toolCallsAccumulator.get(key) // Handle name updates if (toolCallDelta.function?.name) { existing.function.name = toolCallDelta.function.name } // Handle arguments updates if (toolCallDelta.function?.arguments) { existing.function.arguments += toolCallDelta.function.arguments // Check if this tool call looks complete (has both name and valid JSON arguments) if (existing.function.name && this.isToolCallComplete(existing)) { this.validateAndFinalizeToolCall(existing, toolCalls, key) } } } } // Handle complete tool calls (non-streaming) if (messageFromChoice?.tool_calls && !delta?.tool_calls) { const validToolCalls = messageFromChoice.tool_calls.filter((tc: ToolCall) => { try { JSON.parse(tc.function.arguments) return true } catch (e) { console.warn('Skipping tool call with invalid JSON arguments:', tc) return false } }) if (validToolCalls.length > 0) { toolCalls.push(...validToolCalls) } } } catch (e) { console.warn('Failed to parse final buffer data:', { error: e instanceof Error ? e.message : String(e), data: data?.substring(0, 200) + (data && data.length > 200 ? '...' : ''), dataLength: data?.length || 0 }) } } } } finally { reader.releaseLock() } // Final cleaning pass on accumulated content to catch any missed markers const finalCleanedContent = this.cleanStreamContent(assistantContent) return { content: finalCleanedContent, toolCalls, promptTokens, completionTokens, totalTokens } } private async executeTools( toolCalls: ToolCall[], bookId: string, controller: ReadableStreamDefaultController, userId?: string ): Promise { const toolResults: ToolResult[] = [] for (const toolCall of toolCalls) { const toolName = toolCall.function.name try { // Handle empty or invalid tool arguments more gracefully let toolArgs: any = {} if (toolCall.function.arguments && toolCall.function.arguments.trim()) { try { toolArgs = JSON.parse(toolCall.function.arguments) } catch (parseError: any) { console.error('Failed to parse tool arguments:', toolCall.function.arguments) throw new Error(`Invalid tool arguments format: ${parseError?.message || 'Unknown parsing error'}`) } } // Show tool execution start with enhanced streaming this.emitStreamChunk(controller, { type: 'executing', tool_name: toolName, tool_args: toolArgs, content: this.toolExecutor.getToolDescription(toolName, toolArgs) }) // Execute the tool const result = await this.toolExecutor.executeToolCall(bookId, toolCall, userId) console.log('Tool result:', result) // Show tool result with enhanced streaming this.emitStreamChunk(controller, { type: 'tool_result', tool_name: toolName, tool_result: result, content: this.toolExecutor.formatToolResult(toolName, result) }) toolResults.push({ role: 'tool', tool_call_id: toolCall.id, name: toolName, content: JSON.stringify(result) }) } catch (error: any) { console.error('Tool execution error:', error) // Show error with enhanced streaming this.emitStreamChunk(controller, { type: 'error', tool_name: toolName, content: `Error with ${toolName}: ${error.message}` }) toolResults.push({ role: 'tool', tool_call_id: toolCall.id, name: toolName, content: JSON.stringify({ error: error.message, details: "Tool execution failed" }) }) } } return toolResults } /** * Check if a tool call looks complete (has name and potentially valid JSON arguments) */ private isToolCallComplete(toolCall: any): boolean { if (!toolCall.function.name || !toolCall.function.arguments) { return false } const args = toolCall.function.arguments.trim() if (!args) return false // Basic heuristics for JSON completeness if (!args.startsWith('{')) return false // Count braces to see if they're balanced const openBraces = (args.match(/{/g) || []).length const closeBraces = (args.match(/}/g) || []).length // If braces are balanced, it's likely complete return openBraces === closeBraces && openBraces > 0 } /** * Validate and finalize a tool call, adding it to the final list if valid */ private validateAndFinalizeToolCall(toolCall: any, toolCalls: any[], key: string): boolean { // Skip if already finalized if (toolCalls.some(tc => tc.id === toolCall.id)) { return true } try { // Try to validate/fix the JSON const fixedArgs = this.tryFixJsonArguments(toolCall.function.arguments) JSON.parse(fixedArgs) // Ensure it's valid // Update with fixed arguments toolCall.function.arguments = fixedArgs console.log('✅ Tool call completed during streaming:', { id: toolCall.id, name: toolCall.function.name, argsLength: toolCall.function.arguments.length }) toolCalls.push(toolCall) return true } catch (e) { // Not complete or invalid yet - keep accumulating return false } } /** * Simple JSON validation and basic fixing * The AI should generate proper JSON - if it doesn't, we log it and skip */ private tryFixJsonArguments(args: string): string { if (!args || args.trim() === '') { return '{}' } const trimmed = args.trim() // Try to parse as-is first try { JSON.parse(trimmed) return trimmed } catch (e) { // Only attempt very basic fixes for common streaming issues // 1. Missing closing braces (common in truncated streams) const openBraces = (trimmed.match(/{/g) || []).length const closeBraces = (trimmed.match(/}/g) || []).length if (openBraces > closeBraces) { const withBraces = trimmed + '}'.repeat(openBraces - closeBraces) try { JSON.parse(withBraces) console.log('🔧 Fixed incomplete JSON by adding closing braces') return withBraces } catch (e2) { // If adding braces doesn't fix it, give up } } // 2. Unclosed strings (very common in streaming) const quotes = (trimmed.match(/"/g) || []).length if (quotes % 2 === 1) { const withQuote = trimmed + '"' try { JSON.parse(withQuote) console.log('🔧 Fixed incomplete JSON by adding closing quote') return withQuote } catch (e2) { // If adding quote doesn't fix it, give up } } // If we can't fix it with simple methods, log and throw console.warn('❌ Unable to fix malformed JSON arguments:', { originalLength: trimmed.length, preview: trimmed.substring(0, 150) + (trimmed.length > 150 ? '...' : ''), error: e instanceof Error ? e.message : String(e) }) throw new Error(`Invalid JSON arguments: ${e instanceof Error ? e.message : 'Unknown error'}`) } } /** * Remove the complex JSON reconstruction - we shouldn't need it */ private reconstructJsonFromBrokenString(broken: string): string { // This method is no longer used - the AI should generate proper JSON // If we're hitting this often, we need to fix the streaming buffer logic instead throw new Error('Complex JSON reconstruction should not be needed - check streaming logic') } }