import { ToolResult } from '@/lib/services/tool-executor'
export interface ParsedStreamData {
type?: string
content?: string
tool_name?: string
tool_args?: any
tool_result?: any
status?: string
contextInfo?: {
filesFound: number
fileNames: string[]
}
toolResult?: ToolResult
}
/**
* Parse a single line from the streaming response
*/
export function parseStreamLine(line: string): ParsedStreamData | null {
if (!line.startsWith('data: ')) {
return null
}
const data = line.slice(6)
try {
return JSON.parse(data)
} catch (e) {
// Skip invalid JSON
return null
}
}
/**
* Process multiple lines from a streaming chunk
*/
export function processStreamChunk(chunk: string): ParsedStreamData[] {
const lines = chunk.split('\n')
const results: ParsedStreamData[] = []
for (const line of lines) {
const parsed = parseStreamLine(line)
if (parsed) {
results.push(parsed)
}
}
return results
}
/**
* Handle legacy streaming protocol (old format)
*/
export function handleLegacyStreamData(
parsed: ParsedStreamData,
setContextInfo: (info: { filesFound: number; fileNames: string[] }) => void,
setToolResults: (setter: (prev: ToolResult[]) => ToolResult[]) => void,
updateContent: (content: string) => void,
accumulatedContent: string
): string {
let newContent = accumulatedContent
// Handle context information
if (parsed.contextInfo) {
setContextInfo(parsed.contextInfo)
}
// Handle legacy tool results
if (parsed.toolResult) {
setToolResults(prev => [...prev, parsed.toolResult!])
}
// Handle legacy content
if (parsed.content) {
newContent += parsed.content
updateContent(newContent)
}
return newContent
}
/**
* Create request body for chat API
*/
export function createChatRequestBody(
messages: any[],
newMessage: any,
selectedModel: string,
includedFiles: string[],
userId: string | null,
bookId?: string,
chatId?: string
): any {
// Limit conversation context to last 10 messages to prevent old context bleeding
const MAX_CONTEXT_MESSAGES = 10
const recentMessages = messages.slice(-MAX_CONTEXT_MESSAGES)
const requestBody: any = {
messages: [...recentMessages, newMessage],
model: selectedModel,
includedFiles: includedFiles,
userId: userId
}
// Include bookId if available
if (bookId) {
requestBody.bookId = bookId
}
// Include chatId if available
if (chatId) {
requestBody.chatId = chatId
}
return requestBody
}
/**
* Read and process stream response
*/
export async function processStreamResponse(
response: Response,
onStreamData: (parsed: ParsedStreamData, fullResponse: string) => void,
abortSignal?: AbortSignal
): Promise<string> {
const reader = response.body?.getReader()
if (!reader) {
throw new Error('No response body')
}
const decoder = new TextDecoder()
let fullResponse = ''
try {
while (true) {
// Check if aborted
if (abortSignal?.aborted) {
reader.releaseLock()
throw new DOMException('Aborted', 'AbortError')
}
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const parsedData = processStreamChunk(chunk)
for (const parsed of parsedData) {
onStreamData(parsed, fullResponse)
}
}
} finally {
reader.releaseLock()
}
return fullResponse
}
/**
* Handle stream errors and create appropriate response
*/
export function handleStreamError(
error: any,
preservedContent: string,
streamingActivity: {
planning: string
currentExecution: { name: string; description: string; args?: any } | null
toolResults: Array<{ name: string; result: string; success: boolean }>
}
): { isAborted: boolean; content: string } {
const isAborted = error.name === 'AbortError'
if (isAborted) {
console.log('AI execution was stopped by user, preserving content:', preservedContent)
// Build comprehensive stopped content that includes all AI activity
let stoppedContent = ''
// Add planning phase if it occurred
if (streamingActivity.planning) {
stoppedContent += `đ **AI Planning:** ${streamingActivity.planning}\n\n`
}
// Add current tool execution if it was in progress
if (streamingActivity.currentExecution) {
stoppedContent += `âī¸ **Was Executing:** ${streamingActivity.currentExecution.description} *(interrupted)*\n\n`
}
// Add completed tool execution results if any
if (streamingActivity.toolResults.length > 0) {
stoppedContent += `đ ī¸ **Actions Performed Before Stop:**\n`
streamingActivity.toolResults.forEach((result, index) => {
const status = result.success ? 'â
' : 'â'
stoppedContent += `${status} **${result.name}:** ${result.result}\n`
})
stoppedContent += '\n'
}
// Add the partial content
if (preservedContent) {
stoppedContent += preservedContent + '\n\n'
}
// Add stop indicator
stoppedContent += 'âšī¸ *Execution stopped by user*'
return { isAborted: true, content: stoppedContent }
}
return { isAborted: false, content: '' }
}