legco_ai_assistant/.plans/streaming_extracted_questio...

15 KiB

Plan: Show Extracted Questions Immediately (Streaming Query Response)

Problem

On the LTT page, when a user submits a question like "what is power of project manager", the "Extracted Questions" section only appears after all three LLM calls complete:

  1. QueryDecomposer (LLM #1) → extracted_questionsavailable here
  2. RelevanceFilter (LLM #2) → filtered chunks
  3. ResponseGeneration (LLM #3) → final answer

The user must wait 10-30+ seconds (depending on LLM latency) before seeing the extracted sub-questions, even though they are produced in the first ~3-5 seconds.

Root Cause

The backend /query endpoint is a single synchronous HTTP call that awaits all pipeline steps before returning the complete QueryResponse. The frontend's useMutation receives the entire response at once.

Objective

Show extracted_questions to the user as soon as the first LLM call completes, while the remaining pipeline steps continue in the background. The user sees:

  1. Extracted questions immediately (after LLM #1)
  2. A loading indicator for the answer (while LLM #2 and #3 run)
  3. The final answer and sources when complete

Convert the /query endpoint to use Server-Sent Events (SSE) — the industry-standard pattern for streaming LLM responses. This requires changes to both backend and frontend.

Why SSE (not WebSocket or separate endpoint)

Approach Pros Cons Verdict
SSE Streaming Single connection, automatic reconnect, HTTP-based, simple JSON lines One-directional (server→client only) Best fit — we only need server→client
WebSocket Bidirectional, lower overhead Overkill for this use case, more complex Unnecessary complexity
Separate /decompose endpoint Minimal backend change Two HTTP round-trips, awkward UX (submit twice), harder to maintain state Poor UX, more frontend complexity

Backend Changes

1. New Streaming Response Models

File: backend/app/models/query.py

Add a StreamingQueryEvent union type for SSE events:

from pydantic import BaseModel
from typing import Literal, Union

class DecomposedEvent(BaseModel):
    phase: Literal["decomposed"]
    extracted_questions: list[str]

class RetrievingEvent(BaseModel):
    phase: Literal["retrieving"]

class FilteringEvent(BaseModel):
    phase: Literal["filtering"]

class CompletedEvent(BaseModel):
    phase: Literal["completed"]
    answer: str
    sources: list[SourceMetadata]

class ErrorEvent(BaseModel):
    phase: Literal["error"]
    message: str

StreamingQueryEvent = Union[
    DecomposedEvent, RetrievingEvent, FilteringEvent, 
    CompletedEvent, ErrorEvent
]

2. Streaming Query Endpoint

File: backend/app/routers/query.py

Convert /query to return StreamingResponse with text/event-stream:

from fastapi.responses import StreamingResponse
import json

async def _query_stream(request: QueryRequest):
    """Generator that yields SSE events for the query pipeline."""
    settings = get_settings()
    
    try:
        llm_client = LLMClient(settings)
        rag = RAGService(llm_client=llm_client, settings=settings)
        
        # Step 1: Decompose (LLM #1)
        decomposer = QueryDecomposer(llm_client)
        extracted_questions = await decomposer.decompose(request.question)
        yield f"data: {json.dumps({'phase': 'decomposed', 'extracted_questions': extracted_questions})}\n\n"
        
        # Step 2: Retrieve (ChromaDB)
        yield f"data: {json.dumps({'phase': 'retrieving'})}\n\n"
        chunks = rag.retrieve(extracted_questions, n_results=settings.retrieval_n_results)
        
        if not chunks:
            yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
            return
        
        # Step 3: Filter (LLM #2)
        yield f"data: {json.dumps({'phase': 'filtering'})}\n\n"
        chunks_for_filter = [(text, meta) for text, meta, _dist in chunks]
        relevance_filter = RelevanceFilter(llm_client)
        filtered = await relevance_filter.filter(
            request.question, chunks_for_filter, threshold=settings.relevance_threshold
        )
        
        if not filtered:
            yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
            return
        
        # Step 4: Generate (LLM #3)
        chunk_texts = [chunk for chunk, _meta in filtered]
        chunk_metadata = [meta for _chunk, meta in filtered]
        answer = await rag.generate_response(request.question, chunk_texts, chunk_metadata)
        
        sources = [
            SourceMetadata(
                filename=meta.get("filename", "unknown"),
                upload_date=meta.get("upload_date", ""),
                content_summary=meta.get("content_summary", ""),
                chunk_index=meta.get("chunk_index", 0),
                page_number=meta.get("page_number"),
                chunk_file_path=meta.get("chunk_file_path"),
            )
            for meta in chunk_metadata
        ]
        
        yield f"data: {json.dumps({'phase': 'completed', 'answer': answer, 'sources': [s.model_dump() for s in sources]})}\n\n"
        
    except Exception as e:
        logger.error("Query stream failed: %s", str(e))
        yield f"data: {json.dumps({'phase': 'error', 'message': str(e)})}\n\n"

@router.post("/query")
async def query(request: QueryRequest):
    if not request.question or not request.question.strip():
        raise HTTPException(status_code=400, detail="Question is required")
    
    return StreamingResponse(
        _query_stream(request),
        media_type="text/event-stream",
    )

3. Keep Old Endpoint (Backward Compatibility)

Add a new /query/stream endpoint and keep the existing /query for backward compatibility, or version the API. For simplicity in this plan, we'll replace /query but the implementation could support both.

Decision: Replace /query with streaming. The frontend is the only consumer and will be updated together.

Frontend Changes

1. New Streaming API Client

File: frontend/src/lib/api.ts

Add a streaming query function:

export interface QueryStreamEvent {
  phase: 'decomposed' | 'retrieving' | 'filtering' | 'completed' | 'error'
  extracted_questions?: string[]
  answer?: string
  sources?: SourceMetadata[]
  message?: string
}

export const queryDocumentStream = (
  request: QueryRequest,
  onEvent: (event: QueryStreamEvent) => void,
  onError: (error: Error) => void,
  onComplete: () => void
): (() => void) => {
  const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
  const url = `${baseUrl}/query`
  
  const eventSource = new EventSource(url, {
    // EventSource doesn't support POST with body natively
    // We need to use fetch + ReadableStream or a polyfill
  })
  
  // ... implementation using fetch + ReadableStream
  
  return () => eventSource.close() // cleanup function
}

Important: Standard EventSource only supports GET, not POST. We need to use fetch() with ReadableStream to POST the question and read SSE events.

export const queryDocumentStream = async (
  request: QueryRequest,
  onEvent: (event: QueryStreamEvent) => void
): Promise<void> => {
  const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
  const response = await fetch(`${baseUrl}/query`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(request),
  })
  
  if (!response.ok) {
    throw new Error(`HTTP ${response.status}: ${await response.text()}`)
  }
  
  const reader = response.body!.getReader()
  const decoder = new TextDecoder()
  let buffer = ''
  
  while (true) {
    const { done, value } = await reader.read()
    if (done) break
    
    buffer += decoder.decode(value, { stream: true })
    const lines = buffer.split('\n')
    buffer = lines.pop() || ''
    
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') return
        onEvent(JSON.parse(data))
      }
    }
  }
}

2. New React Hook for Streaming

File: frontend/src/lib/queries.tsx

Add a custom hook that manages streaming state:

import { useState, useCallback, useRef } from 'react'

interface QueryStreamState {
  extractedQuestions: string[] | null
  answer: string | null
  sources: SourceMetadata[] | null
  phase: 'idle' | 'decomposing' | 'retrieving' | 'filtering' | 'generating' | 'completed' | 'error'
  error: Error | null
}

export const useQueryDocumentStream = () => {
  const [state, setState] = useState<QueryStreamState>({
    extractedQuestions: null,
    answer: null,
    sources: null,
    phase: 'idle',
    error: null,
  })
  const abortRef = useRef<AbortController | null>(null)
  
  const mutate = useCallback(async (request: QueryRequest) => {
    setState({
      extractedQuestions: null,
      answer: null,
      sources: null,
      phase: 'decomposing',
      error: null,
    })
    
    abortRef.current = new AbortController()
    
    try {
      await queryDocumentStream(request, (event) => {
        switch (event.phase) {
          case 'decomposed':
            setState(prev => ({
              ...prev,
              extractedQuestions: event.extracted_questions!,
              phase: 'retrieving',
            }))
            break
          case 'retrieving':
            setState(prev => ({ ...prev, phase: 'retrieving' }))
            break
          case 'filtering':
            setState(prev => ({ ...prev, phase: 'filtering' }))
            break
          case 'completed':
            setState(prev => ({
              ...prev,
              answer: event.answer!,
              sources: event.sources!,
              phase: 'completed',
            }))
            break
          case 'error':
            setState(prev => ({
              ...prev,
              phase: 'error',
              error: new Error(event.message!),
            }))
            break
        }
      })
    } catch (err) {
      setState(prev => ({
        ...prev,
        phase: 'error',
        error: err instanceof Error ? err : new Error(String(err)),
      }))
    }
  }, [])
  
  const reset = useCallback(() => {
    abortRef.current?.abort()
    setState({
      extractedQuestions: null,
      answer: null,
      sources: null,
      phase: 'idle',
      error: null,
    })
  }, [])
  
  return { ...state, mutate, reset }
}

3. Update LTTPage to Use Streaming

File: frontend/src/pages/LTTPage.tsx

Replace useQueryDocument with useQueryDocumentStream:

const queryStream = useQueryDocumentStream()

const handleQuerySubmit = (question: string) => {
  queryStream.mutate({ question })
}

// In JSX:
<ExtractedQuestionsDisplay
  extractedQuestions={queryStream.extractedQuestions}
  isLoading={queryStream.phase === 'decomposing'}
/>

<ResponsePanel
  answer={queryStream.answer}
  sources={queryStream.sources}
  isLoading={queryStream.phase === 'retrieving' || queryStream.phase === 'filtering' || queryStream.phase === 'generating'}
  extractedQuestions={queryStream.extractedQuestions}
/>

4. Update ExtractedQuestionsDisplay

File: frontend/src/components/ExtractedQuestionsDisplay.tsx

The component should already work with the new state shape. Ensure it handles the case where extractedQuestions is available but answer is still loading:

// Show questions as soon as they arrive, even if answer is still loading
if (extractedQuestions && extractedQuestions.length > 0) {
  return (
    <div>
      <h3>Extracted Questions:</h3>
      <ol>
        {extractedQuestions.map((q, i) => <li key={i}>{q}</li>)}
      </ol>
    </div>
  )
}

5. Update ResponsePanel Loading States

File: frontend/src/components/ResponsePanel.tsx

Show different loading states based on phase:

  • phase === 'retrieving': "Searching documents..."
  • phase === 'filtering': "Filtering relevant passages..."
  • phase === 'generating': "Generating answer..."

Implementation Order

  1. Backend streaming endpoint (backend/app/routers/query.py)

    • Add _query_stream() generator
    • Update /query to return StreamingResponse
    • Test with curl to verify SSE format
  2. Frontend streaming API (frontend/src/lib/api.ts)

    • Add queryDocumentStream() function
    • Add QueryStreamEvent type
  3. Frontend streaming hook (frontend/src/lib/queries.tsx)

    • Add useQueryDocumentStream() hook
  4. Update LTTPage (frontend/src/pages/LTTPage.tsx)

    • Replace useQueryDocument with useQueryDocumentStream
    • Wire up new state to components
  5. Update ResponsePanel (frontend/src/components/ResponsePanel.tsx)

    • Add phase-aware loading messages
  6. Tests

    • Mock fetch with ReadableStream for testing
    • Test each phase transition
    • Test error handling

Testing Strategy

Backend Tests

  • Test SSE event format matches expected JSON structure
  • Test each phase event is emitted in correct order
  • Test error events on failures
  • Test early exit (no chunks) still emits decomposed then completed

Frontend Tests

  • Mock fetch to return a ReadableStream with SSE data
  • Test useQueryDocumentStream state transitions:
    • idle → decomposing → decomposed (extractedQuestions set) → retrieving → filtering → completed (answer set)
  • Test error handling: stream emits error event → state.error set
  • Test abort: calling reset() aborts in-flight request

Risks & Mitigations

Risk Mitigation
SSE not supported by older browsers Use fetch + ReadableStream (works in all modern browsers)
Proxy/CDN buffering SSE Set X-Accel-Buffering: no header, use Cache-Control: no-cache
CORS with streaming Ensure Access-Control-Allow-Origin includes frontend origin
Connection drops mid-stream Add retry logic or show "Connection lost" message
TanStack Query cache invalidation Not needed — streaming bypasses cache, keep old useQueryDocument for non-streaming use

Files to Modify

File Change
backend/app/models/query.py Add streaming event models
backend/app/routers/query.py Convert to StreamingResponse
frontend/src/lib/api.ts Add queryDocumentStream()
frontend/src/lib/queries.tsx Add useQueryDocumentStream() hook
frontend/src/pages/LTTPage.tsx Use streaming hook
frontend/src/components/ResponsePanel.tsx Phase-aware loading states

Success Criteria

  • Extracted questions appear within 5 seconds of submitting a query
  • Answer appears when generation completes (no UI blocking)
  • Loading states clearly indicate which pipeline step is running
  • Error handling works (shows error message if any step fails)
  • All existing tests pass (or are updated for new behavior)
  • New tests cover streaming state transitions