15 KiB
Plan: Show Extracted Questions Immediately (Streaming Query Response)
Problem
On the LTT page, when a user submits a question like "what is power of project manager", the "Extracted Questions" section only appears after all three LLM calls complete:
- QueryDecomposer (LLM #1) →
extracted_questions— available here - RelevanceFilter (LLM #2) → filtered chunks
- ResponseGeneration (LLM #3) → final answer
The user must wait 10-30+ seconds (depending on LLM latency) before seeing the extracted sub-questions, even though they are produced in the first ~3-5 seconds.
Root Cause
The backend /query endpoint is a single synchronous HTTP call that awaits all pipeline steps before returning the complete QueryResponse. The frontend's useMutation receives the entire response at once.
Objective
Show extracted_questions to the user as soon as the first LLM call completes, while the remaining pipeline steps continue in the background. The user sees:
- Extracted questions immediately (after LLM #1)
- A loading indicator for the answer (while LLM #2 and #3 run)
- The final answer and sources when complete
Recommended Approach: SSE Streaming
Convert the /query endpoint to use Server-Sent Events (SSE) — the industry-standard pattern for streaming LLM responses. This requires changes to both backend and frontend.
Why SSE (not WebSocket or separate endpoint)
| Approach | Pros | Cons | Verdict |
|---|---|---|---|
| SSE Streaming | Single connection, automatic reconnect, HTTP-based, simple JSON lines | One-directional (server→client only) | ✅ Best fit — we only need server→client |
| WebSocket | Bidirectional, lower overhead | Overkill for this use case, more complex | ❌ Unnecessary complexity |
Separate /decompose endpoint |
Minimal backend change | Two HTTP round-trips, awkward UX (submit twice), harder to maintain state | ❌ Poor UX, more frontend complexity |
Backend Changes
1. New Streaming Response Models
File: backend/app/models/query.py
Add a StreamingQueryEvent union type for SSE events:
from pydantic import BaseModel
from typing import Literal, Union
class DecomposedEvent(BaseModel):
phase: Literal["decomposed"]
extracted_questions: list[str]
class RetrievingEvent(BaseModel):
phase: Literal["retrieving"]
class FilteringEvent(BaseModel):
phase: Literal["filtering"]
class CompletedEvent(BaseModel):
phase: Literal["completed"]
answer: str
sources: list[SourceMetadata]
class ErrorEvent(BaseModel):
phase: Literal["error"]
message: str
StreamingQueryEvent = Union[
DecomposedEvent, RetrievingEvent, FilteringEvent,
CompletedEvent, ErrorEvent
]
2. Streaming Query Endpoint
File: backend/app/routers/query.py
Convert /query to return StreamingResponse with text/event-stream:
from fastapi.responses import StreamingResponse
import json
async def _query_stream(request: QueryRequest):
"""Generator that yields SSE events for the query pipeline."""
settings = get_settings()
try:
llm_client = LLMClient(settings)
rag = RAGService(llm_client=llm_client, settings=settings)
# Step 1: Decompose (LLM #1)
decomposer = QueryDecomposer(llm_client)
extracted_questions = await decomposer.decompose(request.question)
yield f"data: {json.dumps({'phase': 'decomposed', 'extracted_questions': extracted_questions})}\n\n"
# Step 2: Retrieve (ChromaDB)
yield f"data: {json.dumps({'phase': 'retrieving'})}\n\n"
chunks = rag.retrieve(extracted_questions, n_results=settings.retrieval_n_results)
if not chunks:
yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
return
# Step 3: Filter (LLM #2)
yield f"data: {json.dumps({'phase': 'filtering'})}\n\n"
chunks_for_filter = [(text, meta) for text, meta, _dist in chunks]
relevance_filter = RelevanceFilter(llm_client)
filtered = await relevance_filter.filter(
request.question, chunks_for_filter, threshold=settings.relevance_threshold
)
if not filtered:
yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
return
# Step 4: Generate (LLM #3)
chunk_texts = [chunk for chunk, _meta in filtered]
chunk_metadata = [meta for _chunk, meta in filtered]
answer = await rag.generate_response(request.question, chunk_texts, chunk_metadata)
sources = [
SourceMetadata(
filename=meta.get("filename", "unknown"),
upload_date=meta.get("upload_date", ""),
content_summary=meta.get("content_summary", ""),
chunk_index=meta.get("chunk_index", 0),
page_number=meta.get("page_number"),
chunk_file_path=meta.get("chunk_file_path"),
)
for meta in chunk_metadata
]
yield f"data: {json.dumps({'phase': 'completed', 'answer': answer, 'sources': [s.model_dump() for s in sources]})}\n\n"
except Exception as e:
logger.error("Query stream failed: %s", str(e))
yield f"data: {json.dumps({'phase': 'error', 'message': str(e)})}\n\n"
@router.post("/query")
async def query(request: QueryRequest):
if not request.question or not request.question.strip():
raise HTTPException(status_code=400, detail="Question is required")
return StreamingResponse(
_query_stream(request),
media_type="text/event-stream",
)
3. Keep Old Endpoint (Backward Compatibility)
Add a new /query/stream endpoint and keep the existing /query for backward compatibility, or version the API. For simplicity in this plan, we'll replace /query but the implementation could support both.
Decision: Replace /query with streaming. The frontend is the only consumer and will be updated together.
Frontend Changes
1. New Streaming API Client
File: frontend/src/lib/api.ts
Add a streaming query function:
export interface QueryStreamEvent {
phase: 'decomposed' | 'retrieving' | 'filtering' | 'completed' | 'error'
extracted_questions?: string[]
answer?: string
sources?: SourceMetadata[]
message?: string
}
export const queryDocumentStream = (
request: QueryRequest,
onEvent: (event: QueryStreamEvent) => void,
onError: (error: Error) => void,
onComplete: () => void
): (() => void) => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
const url = `${baseUrl}/query`
const eventSource = new EventSource(url, {
// EventSource doesn't support POST with body natively
// We need to use fetch + ReadableStream or a polyfill
})
// ... implementation using fetch + ReadableStream
return () => eventSource.close() // cleanup function
}
Important: Standard EventSource only supports GET, not POST. We need to use fetch() with ReadableStream to POST the question and read SSE events.
export const queryDocumentStream = async (
request: QueryRequest,
onEvent: (event: QueryStreamEvent) => void
): Promise<void> => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
const response = await fetch(`${baseUrl}/query`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request),
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${await response.text()}`)
}
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') return
onEvent(JSON.parse(data))
}
}
}
}
2. New React Hook for Streaming
File: frontend/src/lib/queries.tsx
Add a custom hook that manages streaming state:
import { useState, useCallback, useRef } from 'react'
interface QueryStreamState {
extractedQuestions: string[] | null
answer: string | null
sources: SourceMetadata[] | null
phase: 'idle' | 'decomposing' | 'retrieving' | 'filtering' | 'generating' | 'completed' | 'error'
error: Error | null
}
export const useQueryDocumentStream = () => {
const [state, setState] = useState<QueryStreamState>({
extractedQuestions: null,
answer: null,
sources: null,
phase: 'idle',
error: null,
})
const abortRef = useRef<AbortController | null>(null)
const mutate = useCallback(async (request: QueryRequest) => {
setState({
extractedQuestions: null,
answer: null,
sources: null,
phase: 'decomposing',
error: null,
})
abortRef.current = new AbortController()
try {
await queryDocumentStream(request, (event) => {
switch (event.phase) {
case 'decomposed':
setState(prev => ({
...prev,
extractedQuestions: event.extracted_questions!,
phase: 'retrieving',
}))
break
case 'retrieving':
setState(prev => ({ ...prev, phase: 'retrieving' }))
break
case 'filtering':
setState(prev => ({ ...prev, phase: 'filtering' }))
break
case 'completed':
setState(prev => ({
...prev,
answer: event.answer!,
sources: event.sources!,
phase: 'completed',
}))
break
case 'error':
setState(prev => ({
...prev,
phase: 'error',
error: new Error(event.message!),
}))
break
}
})
} catch (err) {
setState(prev => ({
...prev,
phase: 'error',
error: err instanceof Error ? err : new Error(String(err)),
}))
}
}, [])
const reset = useCallback(() => {
abortRef.current?.abort()
setState({
extractedQuestions: null,
answer: null,
sources: null,
phase: 'idle',
error: null,
})
}, [])
return { ...state, mutate, reset }
}
3. Update LTTPage to Use Streaming
File: frontend/src/pages/LTTPage.tsx
Replace useQueryDocument with useQueryDocumentStream:
const queryStream = useQueryDocumentStream()
const handleQuerySubmit = (question: string) => {
queryStream.mutate({ question })
}
// In JSX:
<ExtractedQuestionsDisplay
extractedQuestions={queryStream.extractedQuestions}
isLoading={queryStream.phase === 'decomposing'}
/>
<ResponsePanel
answer={queryStream.answer}
sources={queryStream.sources}
isLoading={queryStream.phase === 'retrieving' || queryStream.phase === 'filtering' || queryStream.phase === 'generating'}
extractedQuestions={queryStream.extractedQuestions}
/>
4. Update ExtractedQuestionsDisplay
File: frontend/src/components/ExtractedQuestionsDisplay.tsx
The component should already work with the new state shape. Ensure it handles the case where extractedQuestions is available but answer is still loading:
// Show questions as soon as they arrive, even if answer is still loading
if (extractedQuestions && extractedQuestions.length > 0) {
return (
<div>
<h3>Extracted Questions:</h3>
<ol>
{extractedQuestions.map((q, i) => <li key={i}>{q}</li>)}
</ol>
</div>
)
}
5. Update ResponsePanel Loading States
File: frontend/src/components/ResponsePanel.tsx
Show different loading states based on phase:
phase === 'retrieving': "Searching documents..."phase === 'filtering': "Filtering relevant passages..."phase === 'generating': "Generating answer..."
Implementation Order
-
Backend streaming endpoint (
backend/app/routers/query.py)- Add
_query_stream()generator - Update
/queryto returnStreamingResponse - Test with
curlto verify SSE format
- Add
-
Frontend streaming API (
frontend/src/lib/api.ts)- Add
queryDocumentStream()function - Add
QueryStreamEventtype
- Add
-
Frontend streaming hook (
frontend/src/lib/queries.tsx)- Add
useQueryDocumentStream()hook
- Add
-
Update LTTPage (
frontend/src/pages/LTTPage.tsx)- Replace
useQueryDocumentwithuseQueryDocumentStream - Wire up new state to components
- Replace
-
Update ResponsePanel (
frontend/src/components/ResponsePanel.tsx)- Add phase-aware loading messages
-
Tests
- Mock
fetchwithReadableStreamfor testing - Test each phase transition
- Test error handling
- Mock
Testing Strategy
Backend Tests
- Test SSE event format matches expected JSON structure
- Test each phase event is emitted in correct order
- Test error events on failures
- Test early exit (no chunks) still emits
decomposedthencompleted
Frontend Tests
- Mock
fetchto return aReadableStreamwith SSE data - Test
useQueryDocumentStreamstate transitions:- idle → decomposing → decomposed (extractedQuestions set) → retrieving → filtering → completed (answer set)
- Test error handling: stream emits error event → state.error set
- Test abort: calling reset() aborts in-flight request
Risks & Mitigations
| Risk | Mitigation |
|---|---|
| SSE not supported by older browsers | Use fetch + ReadableStream (works in all modern browsers) |
| Proxy/CDN buffering SSE | Set X-Accel-Buffering: no header, use Cache-Control: no-cache |
| CORS with streaming | Ensure Access-Control-Allow-Origin includes frontend origin |
| Connection drops mid-stream | Add retry logic or show "Connection lost" message |
| TanStack Query cache invalidation | Not needed — streaming bypasses cache, keep old useQueryDocument for non-streaming use |
Files to Modify
| File | Change |
|---|---|
backend/app/models/query.py |
Add streaming event models |
backend/app/routers/query.py |
Convert to StreamingResponse |
frontend/src/lib/api.ts |
Add queryDocumentStream() |
frontend/src/lib/queries.tsx |
Add useQueryDocumentStream() hook |
frontend/src/pages/LTTPage.tsx |
Use streaming hook |
frontend/src/components/ResponsePanel.tsx |
Phase-aware loading states |
Success Criteria
- Extracted questions appear within 5 seconds of submitting a query
- Answer appears when generation completes (no UI blocking)
- Loading states clearly indicate which pipeline step is running
- Error handling works (shows error message if any step fails)
- All existing tests pass (or are updated for new behavior)
- New tests cover streaming state transitions