bookwiz.io / lib / utils / streamingUtils.ts
streamingUtils.ts
Raw
import { ToolResult } from '@/lib/services/tool-executor'

export interface ParsedStreamData {
  type?: string
  content?: string
  tool_name?: string
  tool_args?: any
  tool_result?: any
  status?: string
  contextInfo?: {
    filesFound: number
    fileNames: string[]
  }
  toolResult?: ToolResult
}

/**
 * Parse a single line from the streaming response
 */
export function parseStreamLine(line: string): ParsedStreamData | null {
  if (!line.startsWith('data: ')) {
    return null
  }

  const data = line.slice(6)
  try {
    return JSON.parse(data)
  } catch (e) {
    // Skip invalid JSON
    return null
  }
}

/**
 * Process multiple lines from a streaming chunk
 */
export function processStreamChunk(chunk: string): ParsedStreamData[] {
  const lines = chunk.split('\n')
  const results: ParsedStreamData[] = []

  for (const line of lines) {
    const parsed = parseStreamLine(line)
    if (parsed) {
      results.push(parsed)
    }
  }

  return results
}

/**
 * Handle legacy streaming protocol (old format)
 */
export function handleLegacyStreamData(
  parsed: ParsedStreamData,
  setContextInfo: (info: { filesFound: number; fileNames: string[] }) => void,
  setToolResults: (setter: (prev: ToolResult[]) => ToolResult[]) => void,
  updateContent: (content: string) => void,
  accumulatedContent: string
): string {
  let newContent = accumulatedContent

  // Handle context information
  if (parsed.contextInfo) {
    setContextInfo(parsed.contextInfo)
  }

  // Handle legacy tool results
  if (parsed.toolResult) {
    setToolResults(prev => [...prev, parsed.toolResult!])
  }

  // Handle legacy content
  if (parsed.content) {
    newContent += parsed.content
    updateContent(newContent)
  }

  return newContent
}

/**
 * Create request body for chat API
 */
export function createChatRequestBody(
  messages: any[],
  newMessage: any,
  selectedModel: string,
  includedFiles: string[],
  userId: string | null,
  bookId?: string,
  chatId?: string
): any {
  // Limit conversation context to last 10 messages to prevent old context bleeding
  const MAX_CONTEXT_MESSAGES = 10
  const recentMessages = messages.slice(-MAX_CONTEXT_MESSAGES)
  
  const requestBody: any = {
    messages: [...recentMessages, newMessage],
    model: selectedModel,
    includedFiles: includedFiles,
    userId: userId
  }
  
  // Include bookId if available
  if (bookId) {
    requestBody.bookId = bookId
  }

  // Include chatId if available
  if (chatId) {
    requestBody.chatId = chatId
  }

  return requestBody
}

/**
 * Read and process stream response
 */
export async function processStreamResponse(
  response: Response,
  onStreamData: (parsed: ParsedStreamData, fullResponse: string) => void,
  abortSignal?: AbortSignal
): Promise<string> {
  const reader = response.body?.getReader()
  if (!reader) {
    throw new Error('No response body')
  }

  const decoder = new TextDecoder()
  let fullResponse = ''

  try {
    while (true) {
      // Check if aborted
      if (abortSignal?.aborted) {
        reader.releaseLock()
        throw new DOMException('Aborted', 'AbortError')
      }

      const { done, value } = await reader.read()
      if (done) break

      const chunk = decoder.decode(value)
      const parsedData = processStreamChunk(chunk)

      for (const parsed of parsedData) {
        onStreamData(parsed, fullResponse)
      }
    }
  } finally {
    reader.releaseLock()
  }

  return fullResponse
}

/**
 * Handle stream errors and create appropriate response
 */
export function handleStreamError(
  error: any,
  preservedContent: string,
  streamingActivity: {
    planning: string
    currentExecution: { name: string; description: string; args?: any } | null
    toolResults: Array<{ name: string; result: string; success: boolean }>
  }
): { isAborted: boolean; content: string } {
  const isAborted = error.name === 'AbortError'
  
  if (isAborted) {
    console.log('AI execution was stopped by user, preserving content:', preservedContent)
    
    // Build comprehensive stopped content that includes all AI activity
    let stoppedContent = ''
    
    // Add planning phase if it occurred
    if (streamingActivity.planning) {
      stoppedContent += `📋 **AI Planning:** ${streamingActivity.planning}\n\n`
    }
    
    // Add current tool execution if it was in progress
    if (streamingActivity.currentExecution) {
      stoppedContent += `âš™ī¸ **Was Executing:** ${streamingActivity.currentExecution.description} *(interrupted)*\n\n`
    }
    
    // Add completed tool execution results if any
    if (streamingActivity.toolResults.length > 0) {
      stoppedContent += `đŸ› ī¸ **Actions Performed Before Stop:**\n`
      streamingActivity.toolResults.forEach((result, index) => {
        const status = result.success ? '✅' : '❌'
        stoppedContent += `${status} **${result.name}:** ${result.result}\n`
      })
      stoppedContent += '\n'
    }
    
    // Add the partial content
    if (preservedContent) {
      stoppedContent += preservedContent + '\n\n'
    }
    
    // Add stop indicator
    stoppedContent += 'âšī¸ *Execution stopped by user*'
    
    return { isAborted: true, content: stoppedContent }
  }
  
  return { isAborted: false, content: '' }
}