bookwiz.io / lib / services / chat-stream-handler.ts
chat-stream-handler.ts
Raw
import { ToolExecutor, type ToolCall, type ToolResult } from './tool-executor'
import { getOpenRouterModel } from '@/lib/config/models'
import { TOOL_COMPATIBLE_MODELS, DEFAULT_MODEL } from '@/lib/config/models'
import { usageTracker } from './usage-tracker'

const OPENROUTER_API_KEY = process.env.OPENROUTER_API_KEY || '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'
const OPENROUTER_URL = 'https://openrouter.ai/api/v1/chat/completions'

const MAX_ITERATIONS = 10 // Maximum number of tool call iterations
const MAX_CONTEXT_MESSAGES = 10 // Maximum number of messages to include in context - aligned with frontend

export interface StreamMessage {
  role: 'user' | 'assistant' | 'system' | 'tool'
  content?: string | null
  tool_calls?: ToolCall[]
  tool_call_id?: string
  name?: string
}

// Enhanced streaming message types for better UX
export interface EnhancedStreamChunk {
  type: 'thinking' | 'planning' | 'executing' | 'content' | 'tool_result' | 'error' | 'status'
  content?: string
  tool_name?: string
  tool_args?: any
  tool_result?: any
  status?: string
}

export class ChatStreamHandler {
  private toolExecutor: ToolExecutor
  private encoder: TextEncoder
  private decoder: TextDecoder

  constructor(customSupabaseClient?: any) {
    this.toolExecutor = new ToolExecutor(customSupabaseClient)
    this.encoder = new TextEncoder()
    this.decoder = new TextDecoder()
  }

  /**
   * Clean streaming content by filtering out tool call markers and unwanted patterns
   */
  private cleanStreamContent(content: string): string {
    if (!content) return content

    // Filter out tool call markers and other unwanted patterns - be very aggressive
    const cleaned = content
      .replace(/<\|tool_calls_section[^|]*\|>/g, '') // Remove tool call section markers
      .replace(/<\|tool_calls_sectioall_end\|>/g, '') // Remove malformed "sectioall_end" markers
      .replace(/<\|tool_calls_section_end\|>/g, '') // Remove section end markers
      .replace(/<\|tool_calls_[^|]*\|>/g, '') // Remove any tool_calls_* markers
      .replace(/<\|[^|]*tool[^|]*\|>/g, '') // Remove any markers containing "tool"
      .replace(/<\|[^|]*section[^|]*\|>/g, '') // Remove any markers containing "section"
      .replace(/<\|[^|]*end[^|]*\|>/g, '') // Remove any markers containing "end"
      .replace(/<\|[^|]*\|>/g, '') // Remove any other control markers
      .replace(/\s*\n\s*\n\s*\n/g, '\n\n') // Clean up excessive line breaks

    return cleaned
  }

  // Helper to emit enhanced stream chunks
  private emitStreamChunk(controller: ReadableStreamDefaultController, chunk: EnhancedStreamChunk) {
    controller.enqueue(this.encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`))
  }

  async handleChatStream(
    messages: StreamMessage[],
    model: string,
    bookId?: string,
    contextFiles: any[] = [],
    userId?: string,
    abortSignal?: AbortSignal,
    chatId?: string
  ): Promise<ReadableStream> {
    if (!OPENROUTER_API_KEY) {
      throw new Error('OpenRouter API key not configured')
    }

    return new ReadableStream({
      start: async (controller) => {
        // Initialize conversation state
        // Only include the most recent MAX_CONTEXT_MESSAGES messages
        let conversationHistoryForAI: StreamMessage[] = messages.slice(-MAX_CONTEXT_MESSAGES)
        
        // If we had to truncate messages, add a system message to inform the AI
        if (messages.length > MAX_CONTEXT_MESSAGES) {
          conversationHistoryForAI.unshift({
            role: 'system',
            content: `Note: This conversation has been truncated to show only the most recent ${MAX_CONTEXT_MESSAGES} messages for context. There are ${messages.length - MAX_CONTEXT_MESSAGES} older messages not shown.`
          })
        }
        
        let currentIteration = 1
        let isFirstStreamChunk = true
        
        // Track cumulative usage for the entire user prompt
        let totalPromptTokens = 0
        let totalCompletionTokens = 0
        let totalTokens = 0
        let hasSuccessfulCall = false
        let lastError: string | undefined

        try {
          // Send initial connection confirmation
          if (isFirstStreamChunk) {
            this.emitStreamChunk(controller, {
              type: 'status',
              status: 'connected'
            })
            isFirstStreamChunk = false
          }

          while (currentIteration <= MAX_ITERATIONS) {
            // Check if request was aborted
            if (abortSignal?.aborted) {
              this.emitStreamChunk(controller, {
                type: 'error',
                content: 'Request cancelled by user'
              })
              controller.close()
              return
            }

            console.log(`Tool iteration: ${currentIteration}`)

            // Enable tools if we have a bookId (even without context files)
            // This allows the AI to search for files and build context dynamically
            const shouldIncludeTools = !!bookId
            const currentModel = (shouldIncludeTools && !TOOL_COMPATIBLE_MODELS.includes(getOpenRouterModel(model)))
              ? DEFAULT_MODEL.openRouterModel
              : getOpenRouterModel(model)

            console.log('Model for iteration:', { 
              requested: model, 
              current: currentModel, 
              toolsEnabled: shouldIncludeTools, 
              iteration: currentIteration 
            })

            // Show initial status for first iteration
            if (currentIteration === 1) {
              this.emitStreamChunk(controller, {
                type: 'status',
                content: shouldIncludeTools 
                  ? 'Preparing AI response with file access capabilities...'
                  : 'Preparing AI response...'
              })
            } else {
              // Show thinking for subsequent iterations
              this.emitStreamChunk(controller, {
                type: 'thinking',
                content: `Analyzing request (iteration ${currentIteration})...`
              })
            }

            const requestBody: any = {
              model: currentModel,
              messages: conversationHistoryForAI,
              stream: true,
              temperature: 0.7,
              max_tokens: 2000
            }

            if (shouldIncludeTools) {
              requestBody.tools = this.toolExecutor.getToolDefinitions()
              requestBody.tool_choice = 'auto'
              console.log('Tools enabled for iteration ' + currentIteration)
            }

            // Track usage start time for this request
            const requestStartTime = new Date()

            const response = await fetch(OPENROUTER_URL, {
              method: 'POST',
              headers: {
                'Authorization': `Bearer ${OPENROUTER_API_KEY}`,
                'Content-Type': 'application/json',
                'HTTP-Referer': process.env.NEXT_PUBLIC_SITE_URL || 'http://localhost:3000',
                'X-Title': 'Bookwiz'
              },
              body: JSON.stringify(requestBody),
              signal: abortSignal
            })

            if (!response.ok) {
              const errorData = await response.text()
              console.error('OpenRouter API error:', errorData)
              lastError = `API error: ${response.status} - ${errorData}`
              
              // More informative error messages
              let errorMessage = 'Failed to get response from AI model'
              if (response.status === 429) {
                errorMessage = 'AI model is rate limited. Please try again in a moment.'
              } else if (response.status === 401) {
                errorMessage = 'API authentication failed. Please check configuration.'
              } else if (response.status >= 500) {
                errorMessage = 'AI service is temporarily unavailable. Please try again.'
              }
              
              this.emitStreamChunk(controller, {
                type: 'error',
                content: errorMessage
              })
              controller.close()
              
              // Track failed request only once at the end
              if (userId) {
                await this.trackAIUsage({
                  userId,
                  model,
                  bookId,
                  chatId,
                  success: false,
                  errorMessage: lastError,
                  promptTokens: totalPromptTokens,
                  completionTokens: totalCompletionTokens,
                  totalTokens: totalTokens
                })
              }
              return
            }

            const streamResult = await this.processStreamResponse(response, controller, currentIteration)
            
            // Accumulate token usage across all iterations
            totalPromptTokens += streamResult.promptTokens || 0
            totalCompletionTokens += streamResult.completionTokens || 0
            totalTokens = totalPromptTokens + totalCompletionTokens
            hasSuccessfulCall = true
            
            // Add assistant message to history only if there's content or valid tool calls
            if (streamResult.content?.trim() || streamResult.toolCalls.length > 0) {
              conversationHistoryForAI.push({
                role: 'assistant',
                content: streamResult.content?.trim() || '',
                tool_calls: streamResult.toolCalls.length > 0 ? streamResult.toolCalls : undefined
              })
            }

            if (streamResult.toolCalls.length > 0) {
              // Show planning phase
              this.emitStreamChunk(controller, {
                type: 'planning',
                content: `I need to execute ${streamResult.toolCalls.length} tool${streamResult.toolCalls.length > 1 ? 's' : ''}: ${streamResult.toolCalls.map(tc => tc.function.name).join(', ')}`
              })

              // Execute tools and add results to conversation
              const toolResults = await this.executeTools(
                streamResult.toolCalls, 
                bookId!, 
                controller,
                userId
              )
              conversationHistoryForAI.push(...toolResults)
              currentIteration++
              continue
            } else {
              // Check if we need self-reflection (but prevent infinite loops)
              if (currentIteration < MAX_ITERATIONS && !streamResult.content?.trim() && streamResult.toolCalls.length === 0) {
                // Only self-reflect if we haven't done it already for this conversation
                const hasReflected = conversationHistoryForAI.some(msg => 
                  msg.role === 'system' && msg.content?.includes('TASK COMPLETION CHECK')
                )
                
                if (!hasReflected) {
                  console.log('Adding self-reflection prompt')
                  this.emitStreamChunk(controller, {
                    type: 'thinking', 
                    content: 'Reviewing task completion...'
                  })
                  
                  conversationHistoryForAI.push({
                    role: 'system',
                    content: `STRATEGIC ANALYSIS: You haven't provided a response. Let's think strategically:

1. What did the user originally ask for?
2. What searches/actions have you already tried? Did they work?
3. If search_files returned empty, try smart_search with the same term, or list_files to see what's available.
4. If you found files but haven't read them yet, use read_file.
5. Adapt your approach based on what worked and what didn't.

Be strategic: if one approach failed, try a different one. Don't repeat the same failed action.`
                  })
                  currentIteration++
                  continue
                } else {
                  // We've already reflected once, just provide a default response
                  this.emitStreamChunk(controller, {
                    type: 'content',
                    content: "I'm having trouble processing your request. Could you please rephrase or provide more specific details about what you'd like me to help you with?"
                  })
                  break
                }
              }
              
              // Final response received
              break
            }
          }

          if (currentIteration > MAX_ITERATIONS) {
            console.log('Reached maximum iterations')
          }

          // Track successful AI usage once for the entire user prompt
          if (userId && hasSuccessfulCall) {
            await this.trackAIUsage({
              userId,
              model,
              bookId,
              chatId,
              success: true,
              promptTokens: totalPromptTokens,
              completionTokens: totalCompletionTokens,
              totalTokens: totalTokens
            })
          }

          // Send completion status
          this.emitStreamChunk(controller, {
            type: 'status',
            status: 'completed'
          })

          controller.close()

        } catch (error: any) {
          // Handle different types of abort/cancellation errors gracefully
          if (error.name === 'AbortError' || 
              error.message.includes('aborted') || 
              error.message.includes('ResponseAborted') ||
              error.message === 'Request cancelled by client') {
            console.log('Chat stream was cancelled:', error.message)
            // Don't try to write to the controller if it's already closed due to abort
            // The client-side will handle showing the stop message
          } else {
            console.error('Chat stream error:', error)
            
            // Track failed request only once for the entire user prompt
            if (userId && !hasSuccessfulCall) {
              await this.trackAIUsage({
                userId,
                model,
                bookId,
                chatId,
                success: false,
                errorMessage: `Stream error: ${error.message}`,
                promptTokens: totalPromptTokens,
                completionTokens: totalCompletionTokens,
                totalTokens: totalTokens
              })
            }
            
            try {
              this.emitStreamChunk(controller, {
                type: 'error',
                content: 'Stream error: ' + error.message
              })
            } catch (e) { /* controller might be closed */ }
          }
          
          try {
            controller.close()
          } catch (e) { /* already closed, which is expected when aborted */ }
        }
      }
    })
  }

  /**
   * Track AI usage for billing and analytics
   */
  private async trackAIUsage({
    userId,
    model,
    bookId,
    chatId,
    success = true,
    promptTokens = 0,
    completionTokens = 0,
    totalTokens = 0,
    errorMessage
  }: {
    userId: string
    model: string
    bookId?: string
    chatId?: string
    success?: boolean
    promptTokens?: number
    completionTokens?: number
    totalTokens?: number
    errorMessage?: string
  }) {
    try {
      // Get model provider from the model name
      const getModelProvider = (modelName: string): string => {
        if (modelName.toLowerCase().includes('claude') || modelName.toLowerCase().includes('sonnet')) {
          return 'anthropic'
        }
        if (modelName.toLowerCase().includes('gemini')) {
          return 'google'
        }
        if (modelName.toLowerCase().includes('gpt')) {
          return 'openai'
        }
        return 'unknown'
      }

      await usageTracker.recordUsage({
        user_id: userId,
        book_id: bookId,
        chat_id: chatId,
        model_name: model,
        model_provider: getModelProvider(model),
        prompt_tokens: promptTokens,
        completion_tokens: completionTokens,
        total_tokens: totalTokens || (promptTokens + completionTokens),
        request_type: 'chat',
        success,
        error_message: errorMessage
      })
    } catch (error) {
      // Silent fail - usage tracking shouldn't break the main flow
      console.error('Failed to track AI usage:', error)
    }
  }

  /**
   * Process the raw streaming response from OpenRouter API
   * This is the SINGLE place where raw JSON buffering and parsing happens.
   * The frontend receives clean, structured chunks from this processing.
   */
  private async processStreamResponse(
    response: Response,
    controller: ReadableStreamDefaultController,
    iteration: number
  ): Promise<{ content: string; toolCalls: ToolCall[]; promptTokens?: number; completionTokens?: number; totalTokens?: number }> {
    const reader = response.body!.getReader()
    let assistantContent = ""
    let toolCalls: ToolCall[] = []
    let promptTokens = 0
    let completionTokens = 0
    let totalTokens = 0
    let toolCallsAccumulator = new Map()
    let indexToIdMap = new Map<number, string>()
    let buffer = '' // Buffer for incomplete chunks

    try {
      while (true) {
        const { done, value } = await reader.read()
        if (done) break

        // Decode chunk and add to buffer
        const chunk = this.decoder.decode(value, { stream: true })
        buffer += chunk

        // Process complete lines from buffer
        const lines = buffer.split('\n')
        
        // Keep the last line in buffer (it might be incomplete)
        buffer = lines.pop() || ''

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6).trim()
            
            // Skip empty data
            if (!data || data === '') continue
            
            if (data === '[DONE]') {
              // Finalize any remaining tool calls that weren't completed during streaming
              Array.from(toolCallsAccumulator.entries()).forEach(([key, toolCall]) => {
                this.validateAndFinalizeToolCall(toolCall, toolCalls, key)
              })
              // Final cleaning pass on accumulated content to catch any missed markers
              const finalCleanedContent = this.cleanStreamContent(assistantContent)
              return { content: finalCleanedContent, toolCalls, promptTokens, completionTokens, totalTokens }
            }

            try {
              const parsed = JSON.parse(data)
              const delta = parsed.choices?.[0]?.delta
              const messageFromChoice = parsed.choices?.[0]?.message

              // Capture usage statistics when available (usually in the last chunk)
              if (parsed.usage) {
                promptTokens = parsed.usage.prompt_tokens || 0
                completionTokens = parsed.usage.completion_tokens || 0
                totalTokens = parsed.usage.total_tokens || 0
              }

              // Handle content streaming with enhanced protocol
              if (delta?.content) {
                // Filter out tool call markers and other unwanted patterns
                const cleanContent = this.cleanStreamContent(delta.content)
                if (cleanContent) {
                  assistantContent += cleanContent
                  this.emitStreamChunk(controller, {
                    type: 'content',
                    content: cleanContent
                  })
                }
              }

              // Handle tool calls (streaming)
              if (delta?.tool_calls) {
                for (const toolCallDelta of delta.tool_calls) {
                  // Get the correct ID - either from the delta or from our index mapping
                  const id = toolCallDelta.id || indexToIdMap.get(toolCallDelta.index)
                  const key = id || `index_${toolCallDelta.index}`
                  
                  // If this is a new tool call with an ID, store the mapping
                  if (toolCallDelta.id && toolCallDelta.index !== undefined) {
                    indexToIdMap.set(toolCallDelta.index, toolCallDelta.id)
                  }
                  
                  if (!toolCallsAccumulator.has(key)) {
                    // Initialize new tool call with required fields
                    toolCallsAccumulator.set(key, {
                      id: id || `temp_${Date.now()}_${key}`,
                      type: 'function',
                      function: {
                        name: '',
                        arguments: ''
                      }
                    })
                  }
                  
                  const existing = toolCallsAccumulator.get(key)
                  
                  // Handle name updates
                  if (toolCallDelta.function?.name) {
                    existing.function.name = toolCallDelta.function.name
                  }
                  
                  // Handle arguments updates
                  if (toolCallDelta.function?.arguments) {
                    existing.function.arguments += toolCallDelta.function.arguments
                    
                    // Check if this tool call looks complete (has both name and valid JSON arguments)
                    if (existing.function.name && this.isToolCallComplete(existing)) {
                      this.validateAndFinalizeToolCall(existing, toolCalls, key)
                    }
                  }
                }
              }

              // Handle complete tool calls (non-streaming)
              if (messageFromChoice?.tool_calls && !delta?.tool_calls) {
                const validToolCalls = messageFromChoice.tool_calls.filter((tc: ToolCall) => {
                  try {
                    JSON.parse(tc.function.arguments)
                    return true
                  } catch (e) {
                    console.warn('Skipping tool call with invalid JSON arguments:', tc)
                    return false
                  }
                })
                
                if (validToolCalls.length > 0) {
                  toolCalls.push(...validToolCalls)
                }
              }

            } catch (e) {
              console.warn('Failed to parse stream data:', {
                error: e instanceof Error ? e.message : String(e),
                data: data?.substring(0, 200) + (data && data.length > 200 ? '...' : ''),
                dataLength: data?.length || 0
              })
              // Skip invalid JSON - but we've already tried to buffer it properly
            }
          }
        }
      }

      // Process any remaining data in buffer
      if (buffer.startsWith('data: ')) {
        const data = buffer.slice(6).trim()
        if (data && data !== '[DONE]') {
          try {
            const parsed = JSON.parse(data)
            const delta = parsed.choices?.[0]?.delta
            const messageFromChoice = parsed.choices?.[0]?.message

            // Capture usage statistics when available (usually in the last chunk)
            if (parsed.usage) {
              promptTokens = parsed.usage.prompt_tokens || 0
              completionTokens = parsed.usage.completion_tokens || 0
              totalTokens = parsed.usage.total_tokens || 0
            }

            // Handle content streaming with enhanced protocol
            if (delta?.content) {
              // Filter out tool call markers and other unwanted patterns
              const cleanContent = this.cleanStreamContent(delta.content)
              if (cleanContent) {
                assistantContent += cleanContent
                this.emitStreamChunk(controller, {
                  type: 'content',
                  content: cleanContent
                })
              }
            }

            // Handle tool calls (streaming)
            if (delta?.tool_calls) {
              for (const toolCallDelta of delta.tool_calls) {
                // Get the correct ID - either from the delta or from our index mapping
                const id = toolCallDelta.id || indexToIdMap.get(toolCallDelta.index)
                const key = id || `index_${toolCallDelta.index}`
                
                // If this is a new tool call with an ID, store the mapping
                if (toolCallDelta.id && toolCallDelta.index !== undefined) {
                  indexToIdMap.set(toolCallDelta.index, toolCallDelta.id)
                }
                
                if (!toolCallsAccumulator.has(key)) {
                  // Initialize new tool call with required fields
                  toolCallsAccumulator.set(key, {
                    id: id || `temp_${Date.now()}_${key}`,
                    type: 'function',
                    function: {
                      name: '',
                      arguments: ''
                    }
                  })
                }
                
                const existing = toolCallsAccumulator.get(key)
                
                // Handle name updates
                if (toolCallDelta.function?.name) {
                  existing.function.name = toolCallDelta.function.name
                }
                
                // Handle arguments updates
                if (toolCallDelta.function?.arguments) {
                  existing.function.arguments += toolCallDelta.function.arguments
                  
                  // Check if this tool call looks complete (has both name and valid JSON arguments)
                  if (existing.function.name && this.isToolCallComplete(existing)) {
                    this.validateAndFinalizeToolCall(existing, toolCalls, key)
                  }
                }
              }
            }

            // Handle complete tool calls (non-streaming)
            if (messageFromChoice?.tool_calls && !delta?.tool_calls) {
              const validToolCalls = messageFromChoice.tool_calls.filter((tc: ToolCall) => {
                try {
                  JSON.parse(tc.function.arguments)
                  return true
                } catch (e) {
                  console.warn('Skipping tool call with invalid JSON arguments:', tc)
                  return false
                }
              })
              
              if (validToolCalls.length > 0) {
                toolCalls.push(...validToolCalls)
              }
            }
          } catch (e) {
            console.warn('Failed to parse final buffer data:', {
              error: e instanceof Error ? e.message : String(e),
              data: data?.substring(0, 200) + (data && data.length > 200 ? '...' : ''),
              dataLength: data?.length || 0
            })
          }
        }
      }
    } finally {
      reader.releaseLock()
    }

    // Final cleaning pass on accumulated content to catch any missed markers
    const finalCleanedContent = this.cleanStreamContent(assistantContent)
    return { content: finalCleanedContent, toolCalls, promptTokens, completionTokens, totalTokens }
  }

  private async executeTools(
    toolCalls: ToolCall[],
    bookId: string,
    controller: ReadableStreamDefaultController,
    userId?: string
  ): Promise<ToolResult[]> {
    const toolResults: ToolResult[] = []

    for (const toolCall of toolCalls) {
      const toolName = toolCall.function.name
      
      try {
        // Handle empty or invalid tool arguments more gracefully
        let toolArgs: any = {}
        if (toolCall.function.arguments && toolCall.function.arguments.trim()) {
          try {
            toolArgs = JSON.parse(toolCall.function.arguments)
          } catch (parseError: any) {
            console.error('Failed to parse tool arguments:', toolCall.function.arguments)
            throw new Error(`Invalid tool arguments format: ${parseError?.message || 'Unknown parsing error'}`)
          }
        }
        
        // Show tool execution start with enhanced streaming
        this.emitStreamChunk(controller, {
          type: 'executing',
          tool_name: toolName,
          tool_args: toolArgs,
          content: this.toolExecutor.getToolDescription(toolName, toolArgs)
        })

        // Execute the tool
        const result = await this.toolExecutor.executeToolCall(bookId, toolCall, userId)
        console.log('Tool result:', result)

        // Show tool result with enhanced streaming
        this.emitStreamChunk(controller, {
          type: 'tool_result',
          tool_name: toolName,
          tool_result: result,
          content: this.toolExecutor.formatToolResult(toolName, result)
        })

        toolResults.push({
          role: 'tool',
          tool_call_id: toolCall.id,
          name: toolName,
          content: JSON.stringify(result)
        })

      } catch (error: any) {
        console.error('Tool execution error:', error)
        
        // Show error with enhanced streaming
        this.emitStreamChunk(controller, {
          type: 'error',
          tool_name: toolName,
          content: `Error with ${toolName}: ${error.message}`
        })

        toolResults.push({
          role: 'tool',
          tool_call_id: toolCall.id,
          name: toolName,
          content: JSON.stringify({ error: error.message, details: "Tool execution failed" })
        })
      }
    }

    return toolResults
  }

  /**
   * Check if a tool call looks complete (has name and potentially valid JSON arguments)
   */
  private isToolCallComplete(toolCall: any): boolean {
    if (!toolCall.function.name || !toolCall.function.arguments) {
      return false
    }
    
    const args = toolCall.function.arguments.trim()
    if (!args) return false
    
    // Basic heuristics for JSON completeness
    if (!args.startsWith('{')) return false
    
    // Count braces to see if they're balanced
    const openBraces = (args.match(/{/g) || []).length
    const closeBraces = (args.match(/}/g) || []).length
    
    // If braces are balanced, it's likely complete
    return openBraces === closeBraces && openBraces > 0
  }

  /**
   * Validate and finalize a tool call, adding it to the final list if valid
   */
  private validateAndFinalizeToolCall(toolCall: any, toolCalls: any[], key: string): boolean {
    // Skip if already finalized
    if (toolCalls.some(tc => tc.id === toolCall.id)) {
      return true
    }
    
    try {
      // Try to validate/fix the JSON
      const fixedArgs = this.tryFixJsonArguments(toolCall.function.arguments)
      JSON.parse(fixedArgs) // Ensure it's valid
      
      // Update with fixed arguments
      toolCall.function.arguments = fixedArgs
      
      console.log('✅ Tool call completed during streaming:', { 
        id: toolCall.id, 
        name: toolCall.function.name,
        argsLength: toolCall.function.arguments.length
      })
      
      toolCalls.push(toolCall)
      return true
    } catch (e) {
      // Not complete or invalid yet - keep accumulating
      return false
    }
  }

  /**
   * Simple JSON validation and basic fixing
   * The AI should generate proper JSON - if it doesn't, we log it and skip
   */
  private tryFixJsonArguments(args: string): string {
    if (!args || args.trim() === '') {
      return '{}'
    }

    const trimmed = args.trim()

    // Try to parse as-is first
    try {
      JSON.parse(trimmed)
      return trimmed
    } catch (e) {
      // Only attempt very basic fixes for common streaming issues
      
      // 1. Missing closing braces (common in truncated streams)
      const openBraces = (trimmed.match(/{/g) || []).length
      const closeBraces = (trimmed.match(/}/g) || []).length
      
      if (openBraces > closeBraces) {
        const withBraces = trimmed + '}'.repeat(openBraces - closeBraces)
        try {
          JSON.parse(withBraces)
          console.log('🔧 Fixed incomplete JSON by adding closing braces')
          return withBraces
        } catch (e2) {
          // If adding braces doesn't fix it, give up
        }
      }
      
      // 2. Unclosed strings (very common in streaming)
      const quotes = (trimmed.match(/"/g) || []).length
      if (quotes % 2 === 1) {
        const withQuote = trimmed + '"'
        try {
          JSON.parse(withQuote)
          console.log('🔧 Fixed incomplete JSON by adding closing quote')
          return withQuote
        } catch (e2) {
          // If adding quote doesn't fix it, give up
        }
      }
      
      // If we can't fix it with simple methods, log and throw
      console.warn('❌ Unable to fix malformed JSON arguments:', {
        originalLength: trimmed.length,
        preview: trimmed.substring(0, 150) + (trimmed.length > 150 ? '...' : ''),
        error: e instanceof Error ? e.message : String(e)
      })
      
      throw new Error(`Invalid JSON arguments: ${e instanceof Error ? e.message : 'Unknown error'}`)
    }
  }

  /**
   * Remove the complex JSON reconstruction - we shouldn't need it
   */
  private reconstructJsonFromBrokenString(broken: string): string {
    // This method is no longer used - the AI should generate proper JSON
    // If we're hitting this often, we need to fix the streaming buffer logic instead
    throw new Error('Complex JSON reconstruction should not be needed - check streaming logic')
  }
}