# Plan: Show Extracted Questions Immediately (Streaming Query Response) ## Problem On the LTT page, when a user submits a question like "what is power of project manager", the "Extracted Questions" section only appears after **all three LLM calls** complete: 1. **QueryDecomposer** (LLM #1) → `extracted_questions` — **available here** 2. **RelevanceFilter** (LLM #2) → filtered chunks 3. **ResponseGeneration** (LLM #3) → final answer The user must wait 10-30+ seconds (depending on LLM latency) before seeing the extracted sub-questions, even though they are produced in the first ~3-5 seconds. ## Root Cause The backend `/query` endpoint is a single synchronous HTTP call that awaits all pipeline steps before returning the complete `QueryResponse`. The frontend's `useMutation` receives the entire response at once. ## Objective Show `extracted_questions` to the user **as soon as the first LLM call completes**, while the remaining pipeline steps continue in the background. The user sees: 1. Extracted questions immediately (after LLM #1) 2. A loading indicator for the answer (while LLM #2 and #3 run) 3. The final answer and sources when complete ## Recommended Approach: SSE Streaming Convert the `/query` endpoint to use **Server-Sent Events (SSE)** — the industry-standard pattern for streaming LLM responses. This requires changes to both backend and frontend. ### Why SSE (not WebSocket or separate endpoint) | Approach | Pros | Cons | Verdict | |----------|------|------|---------| | **SSE Streaming** | Single connection, automatic reconnect, HTTP-based, simple JSON lines | One-directional (server→client only) | ✅ **Best fit** — we only need server→client | | **WebSocket** | Bidirectional, lower overhead | Overkill for this use case, more complex | ❌ Unnecessary complexity | | **Separate `/decompose` endpoint** | Minimal backend change | Two HTTP round-trips, awkward UX (submit twice), harder to maintain state | ❌ Poor UX, more frontend complexity | ## Backend Changes ### 1. New Streaming Response Models **File**: `backend/app/models/query.py` Add a `StreamingQueryEvent` union type for SSE events: ```python from pydantic import BaseModel from typing import Literal, Union class DecomposedEvent(BaseModel): phase: Literal["decomposed"] extracted_questions: list[str] class RetrievingEvent(BaseModel): phase: Literal["retrieving"] class FilteringEvent(BaseModel): phase: Literal["filtering"] class CompletedEvent(BaseModel): phase: Literal["completed"] answer: str sources: list[SourceMetadata] class ErrorEvent(BaseModel): phase: Literal["error"] message: str StreamingQueryEvent = Union[ DecomposedEvent, RetrievingEvent, FilteringEvent, CompletedEvent, ErrorEvent ] ``` ### 2. Streaming Query Endpoint **File**: `backend/app/routers/query.py` Convert `/query` to return `StreamingResponse` with `text/event-stream`: ```python from fastapi.responses import StreamingResponse import json async def _query_stream(request: QueryRequest): """Generator that yields SSE events for the query pipeline.""" settings = get_settings() try: llm_client = LLMClient(settings) rag = RAGService(llm_client=llm_client, settings=settings) # Step 1: Decompose (LLM #1) decomposer = QueryDecomposer(llm_client) extracted_questions = await decomposer.decompose(request.question) yield f"data: {json.dumps({'phase': 'decomposed', 'extracted_questions': extracted_questions})}\n\n" # Step 2: Retrieve (ChromaDB) yield f"data: {json.dumps({'phase': 'retrieving'})}\n\n" chunks = rag.retrieve(extracted_questions, n_results=settings.retrieval_n_results) if not chunks: yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n" return # Step 3: Filter (LLM #2) yield f"data: {json.dumps({'phase': 'filtering'})}\n\n" chunks_for_filter = [(text, meta) for text, meta, _dist in chunks] relevance_filter = RelevanceFilter(llm_client) filtered = await relevance_filter.filter( request.question, chunks_for_filter, threshold=settings.relevance_threshold ) if not filtered: yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n" return # Step 4: Generate (LLM #3) chunk_texts = [chunk for chunk, _meta in filtered] chunk_metadata = [meta for _chunk, meta in filtered] answer = await rag.generate_response(request.question, chunk_texts, chunk_metadata) sources = [ SourceMetadata( filename=meta.get("filename", "unknown"), upload_date=meta.get("upload_date", ""), content_summary=meta.get("content_summary", ""), chunk_index=meta.get("chunk_index", 0), page_number=meta.get("page_number"), chunk_file_path=meta.get("chunk_file_path"), ) for meta in chunk_metadata ] yield f"data: {json.dumps({'phase': 'completed', 'answer': answer, 'sources': [s.model_dump() for s in sources]})}\n\n" except Exception as e: logger.error("Query stream failed: %s", str(e)) yield f"data: {json.dumps({'phase': 'error', 'message': str(e)})}\n\n" @router.post("/query") async def query(request: QueryRequest): if not request.question or not request.question.strip(): raise HTTPException(status_code=400, detail="Question is required") return StreamingResponse( _query_stream(request), media_type="text/event-stream", ) ``` ### 3. Keep Old Endpoint (Backward Compatibility) Add a new `/query/stream` endpoint and keep the existing `/query` for backward compatibility, or version the API. For simplicity in this plan, we'll replace `/query` but the implementation could support both. **Decision**: Replace `/query` with streaming. The frontend is the only consumer and will be updated together. ## Frontend Changes ### 1. New Streaming API Client **File**: `frontend/src/lib/api.ts` Add a streaming query function: ```typescript export interface QueryStreamEvent { phase: 'decomposed' | 'retrieving' | 'filtering' | 'completed' | 'error' extracted_questions?: string[] answer?: string sources?: SourceMetadata[] message?: string } export const queryDocumentStream = ( request: QueryRequest, onEvent: (event: QueryStreamEvent) => void, onError: (error: Error) => void, onComplete: () => void ): (() => void) => { const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1' const url = `${baseUrl}/query` const eventSource = new EventSource(url, { // EventSource doesn't support POST with body natively // We need to use fetch + ReadableStream or a polyfill }) // ... implementation using fetch + ReadableStream return () => eventSource.close() // cleanup function } ``` **Important**: Standard `EventSource` only supports GET, not POST. We need to use `fetch()` with `ReadableStream` to POST the question and read SSE events. ```typescript export const queryDocumentStream = async ( request: QueryRequest, onEvent: (event: QueryStreamEvent) => void ): Promise => { const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1' const response = await fetch(`${baseUrl}/query`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(request), }) if (!response.ok) { throw new Error(`HTTP ${response.status}: ${await response.text()}`) } const reader = response.body!.getReader() const decoder = new TextDecoder() let buffer = '' while (true) { const { done, value } = await reader.read() if (done) break buffer += decoder.decode(value, { stream: true }) const lines = buffer.split('\n') buffer = lines.pop() || '' for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6) if (data === '[DONE]') return onEvent(JSON.parse(data)) } } } } ``` ### 2. New React Hook for Streaming **File**: `frontend/src/lib/queries.tsx` Add a custom hook that manages streaming state: ```typescript import { useState, useCallback, useRef } from 'react' interface QueryStreamState { extractedQuestions: string[] | null answer: string | null sources: SourceMetadata[] | null phase: 'idle' | 'decomposing' | 'retrieving' | 'filtering' | 'generating' | 'completed' | 'error' error: Error | null } export const useQueryDocumentStream = () => { const [state, setState] = useState({ extractedQuestions: null, answer: null, sources: null, phase: 'idle', error: null, }) const abortRef = useRef(null) const mutate = useCallback(async (request: QueryRequest) => { setState({ extractedQuestions: null, answer: null, sources: null, phase: 'decomposing', error: null, }) abortRef.current = new AbortController() try { await queryDocumentStream(request, (event) => { switch (event.phase) { case 'decomposed': setState(prev => ({ ...prev, extractedQuestions: event.extracted_questions!, phase: 'retrieving', })) break case 'retrieving': setState(prev => ({ ...prev, phase: 'retrieving' })) break case 'filtering': setState(prev => ({ ...prev, phase: 'filtering' })) break case 'completed': setState(prev => ({ ...prev, answer: event.answer!, sources: event.sources!, phase: 'completed', })) break case 'error': setState(prev => ({ ...prev, phase: 'error', error: new Error(event.message!), })) break } }) } catch (err) { setState(prev => ({ ...prev, phase: 'error', error: err instanceof Error ? err : new Error(String(err)), })) } }, []) const reset = useCallback(() => { abortRef.current?.abort() setState({ extractedQuestions: null, answer: null, sources: null, phase: 'idle', error: null, }) }, []) return { ...state, mutate, reset } } ``` ### 3. Update LTTPage to Use Streaming **File**: `frontend/src/pages/LTTPage.tsx` Replace `useQueryDocument` with `useQueryDocumentStream`: ```typescript const queryStream = useQueryDocumentStream() const handleQuerySubmit = (question: string) => { queryStream.mutate({ question }) } // In JSX: ``` ### 4. Update ExtractedQuestionsDisplay **File**: `frontend/src/components/ExtractedQuestionsDisplay.tsx` The component should already work with the new state shape. Ensure it handles the case where `extractedQuestions` is available but `answer` is still loading: ```typescript // Show questions as soon as they arrive, even if answer is still loading if (extractedQuestions && extractedQuestions.length > 0) { return (

Extracted Questions:

    {extractedQuestions.map((q, i) =>
  1. {q}
  2. )}
) } ``` ### 5. Update ResponsePanel Loading States **File**: `frontend/src/components/ResponsePanel.tsx` Show different loading states based on phase: - `phase === 'retrieving'`: "Searching documents..." - `phase === 'filtering'`: "Filtering relevant passages..." - `phase === 'generating'`: "Generating answer..." ## Implementation Order 1. **Backend streaming endpoint** (`backend/app/routers/query.py`) - Add `_query_stream()` generator - Update `/query` to return `StreamingResponse` - Test with `curl` to verify SSE format 2. **Frontend streaming API** (`frontend/src/lib/api.ts`) - Add `queryDocumentStream()` function - Add `QueryStreamEvent` type 3. **Frontend streaming hook** (`frontend/src/lib/queries.tsx`) - Add `useQueryDocumentStream()` hook 4. **Update LTTPage** (`frontend/src/pages/LTTPage.tsx`) - Replace `useQueryDocument` with `useQueryDocumentStream` - Wire up new state to components 5. **Update ResponsePanel** (`frontend/src/components/ResponsePanel.tsx`) - Add phase-aware loading messages 6. **Tests** - Mock `fetch` with `ReadableStream` for testing - Test each phase transition - Test error handling ## Testing Strategy ### Backend Tests - Test SSE event format matches expected JSON structure - Test each phase event is emitted in correct order - Test error events on failures - Test early exit (no chunks) still emits `decomposed` then `completed` ### Frontend Tests - Mock `fetch` to return a `ReadableStream` with SSE data - Test `useQueryDocumentStream` state transitions: - idle → decomposing → decomposed (extractedQuestions set) → retrieving → filtering → completed (answer set) - Test error handling: stream emits error event → state.error set - Test abort: calling reset() aborts in-flight request ## Risks & Mitigations | Risk | Mitigation | |------|-----------| | SSE not supported by older browsers | Use fetch + ReadableStream (works in all modern browsers) | | Proxy/CDN buffering SSE | Set `X-Accel-Buffering: no` header, use `Cache-Control: no-cache` | | CORS with streaming | Ensure `Access-Control-Allow-Origin` includes frontend origin | | Connection drops mid-stream | Add retry logic or show "Connection lost" message | | TanStack Query cache invalidation | Not needed — streaming bypasses cache, keep old `useQueryDocument` for non-streaming use | ## Files to Modify | File | Change | |------|--------| | `backend/app/models/query.py` | Add streaming event models | | `backend/app/routers/query.py` | Convert to StreamingResponse | | `frontend/src/lib/api.ts` | Add `queryDocumentStream()` | | `frontend/src/lib/queries.tsx` | Add `useQueryDocumentStream()` hook | | `frontend/src/pages/LTTPage.tsx` | Use streaming hook | | `frontend/src/components/ResponsePanel.tsx` | Phase-aware loading states | ## Success Criteria - [ ] Extracted questions appear within 5 seconds of submitting a query - [ ] Answer appears when generation completes (no UI blocking) - [ ] Loading states clearly indicate which pipeline step is running - [ ] Error handling works (shows error message if any step fails) - [ ] All existing tests pass (or are updated for new behavior) - [ ] New tests cover streaming state transitions