455 lines
15 KiB
Markdown
455 lines
15 KiB
Markdown
# Plan: Show Extracted Questions Immediately (Streaming Query Response)
|
|
|
|
## Problem
|
|
|
|
On the LTT page, when a user submits a question like "what is power of project manager", the "Extracted Questions" section only appears after **all three LLM calls** complete:
|
|
|
|
1. **QueryDecomposer** (LLM #1) → `extracted_questions` — **available here**
|
|
2. **RelevanceFilter** (LLM #2) → filtered chunks
|
|
3. **ResponseGeneration** (LLM #3) → final answer
|
|
|
|
The user must wait 10-30+ seconds (depending on LLM latency) before seeing the extracted sub-questions, even though they are produced in the first ~3-5 seconds.
|
|
|
|
## Root Cause
|
|
|
|
The backend `/query` endpoint is a single synchronous HTTP call that awaits all pipeline steps before returning the complete `QueryResponse`. The frontend's `useMutation` receives the entire response at once.
|
|
|
|
## Objective
|
|
|
|
Show `extracted_questions` to the user **as soon as the first LLM call completes**, while the remaining pipeline steps continue in the background. The user sees:
|
|
1. Extracted questions immediately (after LLM #1)
|
|
2. A loading indicator for the answer (while LLM #2 and #3 run)
|
|
3. The final answer and sources when complete
|
|
|
|
## Recommended Approach: SSE Streaming
|
|
|
|
Convert the `/query` endpoint to use **Server-Sent Events (SSE)** — the industry-standard pattern for streaming LLM responses. This requires changes to both backend and frontend.
|
|
|
|
### Why SSE (not WebSocket or separate endpoint)
|
|
|
|
| Approach | Pros | Cons | Verdict |
|
|
|----------|------|------|---------|
|
|
| **SSE Streaming** | Single connection, automatic reconnect, HTTP-based, simple JSON lines | One-directional (server→client only) | ✅ **Best fit** — we only need server→client |
|
|
| **WebSocket** | Bidirectional, lower overhead | Overkill for this use case, more complex | ❌ Unnecessary complexity |
|
|
| **Separate `/decompose` endpoint** | Minimal backend change | Two HTTP round-trips, awkward UX (submit twice), harder to maintain state | ❌ Poor UX, more frontend complexity |
|
|
|
|
## Backend Changes
|
|
|
|
### 1. New Streaming Response Models
|
|
|
|
**File**: `backend/app/models/query.py`
|
|
|
|
Add a `StreamingQueryEvent` union type for SSE events:
|
|
|
|
```python
|
|
from pydantic import BaseModel
|
|
from typing import Literal, Union
|
|
|
|
class DecomposedEvent(BaseModel):
|
|
phase: Literal["decomposed"]
|
|
extracted_questions: list[str]
|
|
|
|
class RetrievingEvent(BaseModel):
|
|
phase: Literal["retrieving"]
|
|
|
|
class FilteringEvent(BaseModel):
|
|
phase: Literal["filtering"]
|
|
|
|
class CompletedEvent(BaseModel):
|
|
phase: Literal["completed"]
|
|
answer: str
|
|
sources: list[SourceMetadata]
|
|
|
|
class ErrorEvent(BaseModel):
|
|
phase: Literal["error"]
|
|
message: str
|
|
|
|
StreamingQueryEvent = Union[
|
|
DecomposedEvent, RetrievingEvent, FilteringEvent,
|
|
CompletedEvent, ErrorEvent
|
|
]
|
|
```
|
|
|
|
### 2. Streaming Query Endpoint
|
|
|
|
**File**: `backend/app/routers/query.py`
|
|
|
|
Convert `/query` to return `StreamingResponse` with `text/event-stream`:
|
|
|
|
```python
|
|
from fastapi.responses import StreamingResponse
|
|
import json
|
|
|
|
async def _query_stream(request: QueryRequest):
|
|
"""Generator that yields SSE events for the query pipeline."""
|
|
settings = get_settings()
|
|
|
|
try:
|
|
llm_client = LLMClient(settings)
|
|
rag = RAGService(llm_client=llm_client, settings=settings)
|
|
|
|
# Step 1: Decompose (LLM #1)
|
|
decomposer = QueryDecomposer(llm_client)
|
|
extracted_questions = await decomposer.decompose(request.question)
|
|
yield f"data: {json.dumps({'phase': 'decomposed', 'extracted_questions': extracted_questions})}\n\n"
|
|
|
|
# Step 2: Retrieve (ChromaDB)
|
|
yield f"data: {json.dumps({'phase': 'retrieving'})}\n\n"
|
|
chunks = rag.retrieve(extracted_questions, n_results=settings.retrieval_n_results)
|
|
|
|
if not chunks:
|
|
yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
|
|
return
|
|
|
|
# Step 3: Filter (LLM #2)
|
|
yield f"data: {json.dumps({'phase': 'filtering'})}\n\n"
|
|
chunks_for_filter = [(text, meta) for text, meta, _dist in chunks]
|
|
relevance_filter = RelevanceFilter(llm_client)
|
|
filtered = await relevance_filter.filter(
|
|
request.question, chunks_for_filter, threshold=settings.relevance_threshold
|
|
)
|
|
|
|
if not filtered:
|
|
yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
|
|
return
|
|
|
|
# Step 4: Generate (LLM #3)
|
|
chunk_texts = [chunk for chunk, _meta in filtered]
|
|
chunk_metadata = [meta for _chunk, meta in filtered]
|
|
answer = await rag.generate_response(request.question, chunk_texts, chunk_metadata)
|
|
|
|
sources = [
|
|
SourceMetadata(
|
|
filename=meta.get("filename", "unknown"),
|
|
upload_date=meta.get("upload_date", ""),
|
|
content_summary=meta.get("content_summary", ""),
|
|
chunk_index=meta.get("chunk_index", 0),
|
|
page_number=meta.get("page_number"),
|
|
chunk_file_path=meta.get("chunk_file_path"),
|
|
)
|
|
for meta in chunk_metadata
|
|
]
|
|
|
|
yield f"data: {json.dumps({'phase': 'completed', 'answer': answer, 'sources': [s.model_dump() for s in sources]})}\n\n"
|
|
|
|
except Exception as e:
|
|
logger.error("Query stream failed: %s", str(e))
|
|
yield f"data: {json.dumps({'phase': 'error', 'message': str(e)})}\n\n"
|
|
|
|
@router.post("/query")
|
|
async def query(request: QueryRequest):
|
|
if not request.question or not request.question.strip():
|
|
raise HTTPException(status_code=400, detail="Question is required")
|
|
|
|
return StreamingResponse(
|
|
_query_stream(request),
|
|
media_type="text/event-stream",
|
|
)
|
|
```
|
|
|
|
### 3. Keep Old Endpoint (Backward Compatibility)
|
|
|
|
Add a new `/query/stream` endpoint and keep the existing `/query` for backward compatibility, or version the API. For simplicity in this plan, we'll replace `/query` but the implementation could support both.
|
|
|
|
**Decision**: Replace `/query` with streaming. The frontend is the only consumer and will be updated together.
|
|
|
|
## Frontend Changes
|
|
|
|
### 1. New Streaming API Client
|
|
|
|
**File**: `frontend/src/lib/api.ts`
|
|
|
|
Add a streaming query function:
|
|
|
|
```typescript
|
|
export interface QueryStreamEvent {
|
|
phase: 'decomposed' | 'retrieving' | 'filtering' | 'completed' | 'error'
|
|
extracted_questions?: string[]
|
|
answer?: string
|
|
sources?: SourceMetadata[]
|
|
message?: string
|
|
}
|
|
|
|
export const queryDocumentStream = (
|
|
request: QueryRequest,
|
|
onEvent: (event: QueryStreamEvent) => void,
|
|
onError: (error: Error) => void,
|
|
onComplete: () => void
|
|
): (() => void) => {
|
|
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
|
|
const url = `${baseUrl}/query`
|
|
|
|
const eventSource = new EventSource(url, {
|
|
// EventSource doesn't support POST with body natively
|
|
// We need to use fetch + ReadableStream or a polyfill
|
|
})
|
|
|
|
// ... implementation using fetch + ReadableStream
|
|
|
|
return () => eventSource.close() // cleanup function
|
|
}
|
|
```
|
|
|
|
**Important**: Standard `EventSource` only supports GET, not POST. We need to use `fetch()` with `ReadableStream` to POST the question and read SSE events.
|
|
|
|
```typescript
|
|
export const queryDocumentStream = async (
|
|
request: QueryRequest,
|
|
onEvent: (event: QueryStreamEvent) => void
|
|
): Promise<void> => {
|
|
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
|
|
const response = await fetch(`${baseUrl}/query`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(request),
|
|
})
|
|
|
|
if (!response.ok) {
|
|
throw new Error(`HTTP ${response.status}: ${await response.text()}`)
|
|
}
|
|
|
|
const reader = response.body!.getReader()
|
|
const decoder = new TextDecoder()
|
|
let buffer = ''
|
|
|
|
while (true) {
|
|
const { done, value } = await reader.read()
|
|
if (done) break
|
|
|
|
buffer += decoder.decode(value, { stream: true })
|
|
const lines = buffer.split('\n')
|
|
buffer = lines.pop() || ''
|
|
|
|
for (const line of lines) {
|
|
if (line.startsWith('data: ')) {
|
|
const data = line.slice(6)
|
|
if (data === '[DONE]') return
|
|
onEvent(JSON.parse(data))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. New React Hook for Streaming
|
|
|
|
**File**: `frontend/src/lib/queries.tsx`
|
|
|
|
Add a custom hook that manages streaming state:
|
|
|
|
```typescript
|
|
import { useState, useCallback, useRef } from 'react'
|
|
|
|
interface QueryStreamState {
|
|
extractedQuestions: string[] | null
|
|
answer: string | null
|
|
sources: SourceMetadata[] | null
|
|
phase: 'idle' | 'decomposing' | 'retrieving' | 'filtering' | 'generating' | 'completed' | 'error'
|
|
error: Error | null
|
|
}
|
|
|
|
export const useQueryDocumentStream = () => {
|
|
const [state, setState] = useState<QueryStreamState>({
|
|
extractedQuestions: null,
|
|
answer: null,
|
|
sources: null,
|
|
phase: 'idle',
|
|
error: null,
|
|
})
|
|
const abortRef = useRef<AbortController | null>(null)
|
|
|
|
const mutate = useCallback(async (request: QueryRequest) => {
|
|
setState({
|
|
extractedQuestions: null,
|
|
answer: null,
|
|
sources: null,
|
|
phase: 'decomposing',
|
|
error: null,
|
|
})
|
|
|
|
abortRef.current = new AbortController()
|
|
|
|
try {
|
|
await queryDocumentStream(request, (event) => {
|
|
switch (event.phase) {
|
|
case 'decomposed':
|
|
setState(prev => ({
|
|
...prev,
|
|
extractedQuestions: event.extracted_questions!,
|
|
phase: 'retrieving',
|
|
}))
|
|
break
|
|
case 'retrieving':
|
|
setState(prev => ({ ...prev, phase: 'retrieving' }))
|
|
break
|
|
case 'filtering':
|
|
setState(prev => ({ ...prev, phase: 'filtering' }))
|
|
break
|
|
case 'completed':
|
|
setState(prev => ({
|
|
...prev,
|
|
answer: event.answer!,
|
|
sources: event.sources!,
|
|
phase: 'completed',
|
|
}))
|
|
break
|
|
case 'error':
|
|
setState(prev => ({
|
|
...prev,
|
|
phase: 'error',
|
|
error: new Error(event.message!),
|
|
}))
|
|
break
|
|
}
|
|
})
|
|
} catch (err) {
|
|
setState(prev => ({
|
|
...prev,
|
|
phase: 'error',
|
|
error: err instanceof Error ? err : new Error(String(err)),
|
|
}))
|
|
}
|
|
}, [])
|
|
|
|
const reset = useCallback(() => {
|
|
abortRef.current?.abort()
|
|
setState({
|
|
extractedQuestions: null,
|
|
answer: null,
|
|
sources: null,
|
|
phase: 'idle',
|
|
error: null,
|
|
})
|
|
}, [])
|
|
|
|
return { ...state, mutate, reset }
|
|
}
|
|
```
|
|
|
|
### 3. Update LTTPage to Use Streaming
|
|
|
|
**File**: `frontend/src/pages/LTTPage.tsx`
|
|
|
|
Replace `useQueryDocument` with `useQueryDocumentStream`:
|
|
|
|
```typescript
|
|
const queryStream = useQueryDocumentStream()
|
|
|
|
const handleQuerySubmit = (question: string) => {
|
|
queryStream.mutate({ question })
|
|
}
|
|
|
|
// In JSX:
|
|
<ExtractedQuestionsDisplay
|
|
extractedQuestions={queryStream.extractedQuestions}
|
|
isLoading={queryStream.phase === 'decomposing'}
|
|
/>
|
|
|
|
<ResponsePanel
|
|
answer={queryStream.answer}
|
|
sources={queryStream.sources}
|
|
isLoading={queryStream.phase === 'retrieving' || queryStream.phase === 'filtering' || queryStream.phase === 'generating'}
|
|
extractedQuestions={queryStream.extractedQuestions}
|
|
/>
|
|
```
|
|
|
|
### 4. Update ExtractedQuestionsDisplay
|
|
|
|
**File**: `frontend/src/components/ExtractedQuestionsDisplay.tsx`
|
|
|
|
The component should already work with the new state shape. Ensure it handles the case where `extractedQuestions` is available but `answer` is still loading:
|
|
|
|
```typescript
|
|
// Show questions as soon as they arrive, even if answer is still loading
|
|
if (extractedQuestions && extractedQuestions.length > 0) {
|
|
return (
|
|
<div>
|
|
<h3>Extracted Questions:</h3>
|
|
<ol>
|
|
{extractedQuestions.map((q, i) => <li key={i}>{q}</li>)}
|
|
</ol>
|
|
</div>
|
|
)
|
|
}
|
|
```
|
|
|
|
### 5. Update ResponsePanel Loading States
|
|
|
|
**File**: `frontend/src/components/ResponsePanel.tsx`
|
|
|
|
Show different loading states based on phase:
|
|
- `phase === 'retrieving'`: "Searching documents..."
|
|
- `phase === 'filtering'`: "Filtering relevant passages..."
|
|
- `phase === 'generating'`: "Generating answer..."
|
|
|
|
## Implementation Order
|
|
|
|
1. **Backend streaming endpoint** (`backend/app/routers/query.py`)
|
|
- Add `_query_stream()` generator
|
|
- Update `/query` to return `StreamingResponse`
|
|
- Test with `curl` to verify SSE format
|
|
|
|
2. **Frontend streaming API** (`frontend/src/lib/api.ts`)
|
|
- Add `queryDocumentStream()` function
|
|
- Add `QueryStreamEvent` type
|
|
|
|
3. **Frontend streaming hook** (`frontend/src/lib/queries.tsx`)
|
|
- Add `useQueryDocumentStream()` hook
|
|
|
|
4. **Update LTTPage** (`frontend/src/pages/LTTPage.tsx`)
|
|
- Replace `useQueryDocument` with `useQueryDocumentStream`
|
|
- Wire up new state to components
|
|
|
|
5. **Update ResponsePanel** (`frontend/src/components/ResponsePanel.tsx`)
|
|
- Add phase-aware loading messages
|
|
|
|
6. **Tests**
|
|
- Mock `fetch` with `ReadableStream` for testing
|
|
- Test each phase transition
|
|
- Test error handling
|
|
|
|
## Testing Strategy
|
|
|
|
### Backend Tests
|
|
- Test SSE event format matches expected JSON structure
|
|
- Test each phase event is emitted in correct order
|
|
- Test error events on failures
|
|
- Test early exit (no chunks) still emits `decomposed` then `completed`
|
|
|
|
### Frontend Tests
|
|
- Mock `fetch` to return a `ReadableStream` with SSE data
|
|
- Test `useQueryDocumentStream` state transitions:
|
|
- idle → decomposing → decomposed (extractedQuestions set) → retrieving → filtering → completed (answer set)
|
|
- Test error handling: stream emits error event → state.error set
|
|
- Test abort: calling reset() aborts in-flight request
|
|
|
|
## Risks & Mitigations
|
|
|
|
| Risk | Mitigation |
|
|
|------|-----------|
|
|
| SSE not supported by older browsers | Use fetch + ReadableStream (works in all modern browsers) |
|
|
| Proxy/CDN buffering SSE | Set `X-Accel-Buffering: no` header, use `Cache-Control: no-cache` |
|
|
| CORS with streaming | Ensure `Access-Control-Allow-Origin` includes frontend origin |
|
|
| Connection drops mid-stream | Add retry logic or show "Connection lost" message |
|
|
| TanStack Query cache invalidation | Not needed — streaming bypasses cache, keep old `useQueryDocument` for non-streaming use |
|
|
|
|
## Files to Modify
|
|
|
|
| File | Change |
|
|
|------|--------|
|
|
| `backend/app/models/query.py` | Add streaming event models |
|
|
| `backend/app/routers/query.py` | Convert to StreamingResponse |
|
|
| `frontend/src/lib/api.ts` | Add `queryDocumentStream()` |
|
|
| `frontend/src/lib/queries.tsx` | Add `useQueryDocumentStream()` hook |
|
|
| `frontend/src/pages/LTTPage.tsx` | Use streaming hook |
|
|
| `frontend/src/components/ResponsePanel.tsx` | Phase-aware loading states |
|
|
|
|
## Success Criteria
|
|
|
|
- [ ] Extracted questions appear within 5 seconds of submitting a query
|
|
- [ ] Answer appears when generation completes (no UI blocking)
|
|
- [ ] Loading states clearly indicate which pipeline step is running
|
|
- [ ] Error handling works (shows error message if any step fails)
|
|
- [ ] All existing tests pass (or are updated for new behavior)
|
|
- [ ] New tests cover streaming state transitions
|