legco_ai_assistant/.plans/streaming_extracted_questio...

455 lines
15 KiB
Markdown

# Plan: Show Extracted Questions Immediately (Streaming Query Response)
## Problem
On the LTT page, when a user submits a question like "what is power of project manager", the "Extracted Questions" section only appears after **all three LLM calls** complete:
1. **QueryDecomposer** (LLM #1) → `extracted_questions`**available here**
2. **RelevanceFilter** (LLM #2) → filtered chunks
3. **ResponseGeneration** (LLM #3) → final answer
The user must wait 10-30+ seconds (depending on LLM latency) before seeing the extracted sub-questions, even though they are produced in the first ~3-5 seconds.
## Root Cause
The backend `/query` endpoint is a single synchronous HTTP call that awaits all pipeline steps before returning the complete `QueryResponse`. The frontend's `useMutation` receives the entire response at once.
## Objective
Show `extracted_questions` to the user **as soon as the first LLM call completes**, while the remaining pipeline steps continue in the background. The user sees:
1. Extracted questions immediately (after LLM #1)
2. A loading indicator for the answer (while LLM #2 and #3 run)
3. The final answer and sources when complete
## Recommended Approach: SSE Streaming
Convert the `/query` endpoint to use **Server-Sent Events (SSE)** — the industry-standard pattern for streaming LLM responses. This requires changes to both backend and frontend.
### Why SSE (not WebSocket or separate endpoint)
| Approach | Pros | Cons | Verdict |
|----------|------|------|---------|
| **SSE Streaming** | Single connection, automatic reconnect, HTTP-based, simple JSON lines | One-directional (server→client only) | ✅ **Best fit** — we only need server→client |
| **WebSocket** | Bidirectional, lower overhead | Overkill for this use case, more complex | ❌ Unnecessary complexity |
| **Separate `/decompose` endpoint** | Minimal backend change | Two HTTP round-trips, awkward UX (submit twice), harder to maintain state | ❌ Poor UX, more frontend complexity |
## Backend Changes
### 1. New Streaming Response Models
**File**: `backend/app/models/query.py`
Add a `StreamingQueryEvent` union type for SSE events:
```python
from pydantic import BaseModel
from typing import Literal, Union
class DecomposedEvent(BaseModel):
phase: Literal["decomposed"]
extracted_questions: list[str]
class RetrievingEvent(BaseModel):
phase: Literal["retrieving"]
class FilteringEvent(BaseModel):
phase: Literal["filtering"]
class CompletedEvent(BaseModel):
phase: Literal["completed"]
answer: str
sources: list[SourceMetadata]
class ErrorEvent(BaseModel):
phase: Literal["error"]
message: str
StreamingQueryEvent = Union[
DecomposedEvent, RetrievingEvent, FilteringEvent,
CompletedEvent, ErrorEvent
]
```
### 2. Streaming Query Endpoint
**File**: `backend/app/routers/query.py`
Convert `/query` to return `StreamingResponse` with `text/event-stream`:
```python
from fastapi.responses import StreamingResponse
import json
async def _query_stream(request: QueryRequest):
"""Generator that yields SSE events for the query pipeline."""
settings = get_settings()
try:
llm_client = LLMClient(settings)
rag = RAGService(llm_client=llm_client, settings=settings)
# Step 1: Decompose (LLM #1)
decomposer = QueryDecomposer(llm_client)
extracted_questions = await decomposer.decompose(request.question)
yield f"data: {json.dumps({'phase': 'decomposed', 'extracted_questions': extracted_questions})}\n\n"
# Step 2: Retrieve (ChromaDB)
yield f"data: {json.dumps({'phase': 'retrieving'})}\n\n"
chunks = rag.retrieve(extracted_questions, n_results=settings.retrieval_n_results)
if not chunks:
yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
return
# Step 3: Filter (LLM #2)
yield f"data: {json.dumps({'phase': 'filtering'})}\n\n"
chunks_for_filter = [(text, meta) for text, meta, _dist in chunks]
relevance_filter = RelevanceFilter(llm_client)
filtered = await relevance_filter.filter(
request.question, chunks_for_filter, threshold=settings.relevance_threshold
)
if not filtered:
yield f"data: {json.dumps({'phase': 'completed', 'answer': NO_RESULTS_ANSWER, 'sources': []})}\n\n"
return
# Step 4: Generate (LLM #3)
chunk_texts = [chunk for chunk, _meta in filtered]
chunk_metadata = [meta for _chunk, meta in filtered]
answer = await rag.generate_response(request.question, chunk_texts, chunk_metadata)
sources = [
SourceMetadata(
filename=meta.get("filename", "unknown"),
upload_date=meta.get("upload_date", ""),
content_summary=meta.get("content_summary", ""),
chunk_index=meta.get("chunk_index", 0),
page_number=meta.get("page_number"),
chunk_file_path=meta.get("chunk_file_path"),
)
for meta in chunk_metadata
]
yield f"data: {json.dumps({'phase': 'completed', 'answer': answer, 'sources': [s.model_dump() for s in sources]})}\n\n"
except Exception as e:
logger.error("Query stream failed: %s", str(e))
yield f"data: {json.dumps({'phase': 'error', 'message': str(e)})}\n\n"
@router.post("/query")
async def query(request: QueryRequest):
if not request.question or not request.question.strip():
raise HTTPException(status_code=400, detail="Question is required")
return StreamingResponse(
_query_stream(request),
media_type="text/event-stream",
)
```
### 3. Keep Old Endpoint (Backward Compatibility)
Add a new `/query/stream` endpoint and keep the existing `/query` for backward compatibility, or version the API. For simplicity in this plan, we'll replace `/query` but the implementation could support both.
**Decision**: Replace `/query` with streaming. The frontend is the only consumer and will be updated together.
## Frontend Changes
### 1. New Streaming API Client
**File**: `frontend/src/lib/api.ts`
Add a streaming query function:
```typescript
export interface QueryStreamEvent {
phase: 'decomposed' | 'retrieving' | 'filtering' | 'completed' | 'error'
extracted_questions?: string[]
answer?: string
sources?: SourceMetadata[]
message?: string
}
export const queryDocumentStream = (
request: QueryRequest,
onEvent: (event: QueryStreamEvent) => void,
onError: (error: Error) => void,
onComplete: () => void
): (() => void) => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
const url = `${baseUrl}/query`
const eventSource = new EventSource(url, {
// EventSource doesn't support POST with body natively
// We need to use fetch + ReadableStream or a polyfill
})
// ... implementation using fetch + ReadableStream
return () => eventSource.close() // cleanup function
}
```
**Important**: Standard `EventSource` only supports GET, not POST. We need to use `fetch()` with `ReadableStream` to POST the question and read SSE events.
```typescript
export const queryDocumentStream = async (
request: QueryRequest,
onEvent: (event: QueryStreamEvent) => void
): Promise<void> => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? 'http://localhost:8000/api/v1'
const response = await fetch(`${baseUrl}/query`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(request),
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${await response.text()}`)
}
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') return
onEvent(JSON.parse(data))
}
}
}
}
```
### 2. New React Hook for Streaming
**File**: `frontend/src/lib/queries.tsx`
Add a custom hook that manages streaming state:
```typescript
import { useState, useCallback, useRef } from 'react'
interface QueryStreamState {
extractedQuestions: string[] | null
answer: string | null
sources: SourceMetadata[] | null
phase: 'idle' | 'decomposing' | 'retrieving' | 'filtering' | 'generating' | 'completed' | 'error'
error: Error | null
}
export const useQueryDocumentStream = () => {
const [state, setState] = useState<QueryStreamState>({
extractedQuestions: null,
answer: null,
sources: null,
phase: 'idle',
error: null,
})
const abortRef = useRef<AbortController | null>(null)
const mutate = useCallback(async (request: QueryRequest) => {
setState({
extractedQuestions: null,
answer: null,
sources: null,
phase: 'decomposing',
error: null,
})
abortRef.current = new AbortController()
try {
await queryDocumentStream(request, (event) => {
switch (event.phase) {
case 'decomposed':
setState(prev => ({
...prev,
extractedQuestions: event.extracted_questions!,
phase: 'retrieving',
}))
break
case 'retrieving':
setState(prev => ({ ...prev, phase: 'retrieving' }))
break
case 'filtering':
setState(prev => ({ ...prev, phase: 'filtering' }))
break
case 'completed':
setState(prev => ({
...prev,
answer: event.answer!,
sources: event.sources!,
phase: 'completed',
}))
break
case 'error':
setState(prev => ({
...prev,
phase: 'error',
error: new Error(event.message!),
}))
break
}
})
} catch (err) {
setState(prev => ({
...prev,
phase: 'error',
error: err instanceof Error ? err : new Error(String(err)),
}))
}
}, [])
const reset = useCallback(() => {
abortRef.current?.abort()
setState({
extractedQuestions: null,
answer: null,
sources: null,
phase: 'idle',
error: null,
})
}, [])
return { ...state, mutate, reset }
}
```
### 3. Update LTTPage to Use Streaming
**File**: `frontend/src/pages/LTTPage.tsx`
Replace `useQueryDocument` with `useQueryDocumentStream`:
```typescript
const queryStream = useQueryDocumentStream()
const handleQuerySubmit = (question: string) => {
queryStream.mutate({ question })
}
// In JSX:
<ExtractedQuestionsDisplay
extractedQuestions={queryStream.extractedQuestions}
isLoading={queryStream.phase === 'decomposing'}
/>
<ResponsePanel
answer={queryStream.answer}
sources={queryStream.sources}
isLoading={queryStream.phase === 'retrieving' || queryStream.phase === 'filtering' || queryStream.phase === 'generating'}
extractedQuestions={queryStream.extractedQuestions}
/>
```
### 4. Update ExtractedQuestionsDisplay
**File**: `frontend/src/components/ExtractedQuestionsDisplay.tsx`
The component should already work with the new state shape. Ensure it handles the case where `extractedQuestions` is available but `answer` is still loading:
```typescript
// Show questions as soon as they arrive, even if answer is still loading
if (extractedQuestions && extractedQuestions.length > 0) {
return (
<div>
<h3>Extracted Questions:</h3>
<ol>
{extractedQuestions.map((q, i) => <li key={i}>{q}</li>)}
</ol>
</div>
)
}
```
### 5. Update ResponsePanel Loading States
**File**: `frontend/src/components/ResponsePanel.tsx`
Show different loading states based on phase:
- `phase === 'retrieving'`: "Searching documents..."
- `phase === 'filtering'`: "Filtering relevant passages..."
- `phase === 'generating'`: "Generating answer..."
## Implementation Order
1. **Backend streaming endpoint** (`backend/app/routers/query.py`)
- Add `_query_stream()` generator
- Update `/query` to return `StreamingResponse`
- Test with `curl` to verify SSE format
2. **Frontend streaming API** (`frontend/src/lib/api.ts`)
- Add `queryDocumentStream()` function
- Add `QueryStreamEvent` type
3. **Frontend streaming hook** (`frontend/src/lib/queries.tsx`)
- Add `useQueryDocumentStream()` hook
4. **Update LTTPage** (`frontend/src/pages/LTTPage.tsx`)
- Replace `useQueryDocument` with `useQueryDocumentStream`
- Wire up new state to components
5. **Update ResponsePanel** (`frontend/src/components/ResponsePanel.tsx`)
- Add phase-aware loading messages
6. **Tests**
- Mock `fetch` with `ReadableStream` for testing
- Test each phase transition
- Test error handling
## Testing Strategy
### Backend Tests
- Test SSE event format matches expected JSON structure
- Test each phase event is emitted in correct order
- Test error events on failures
- Test early exit (no chunks) still emits `decomposed` then `completed`
### Frontend Tests
- Mock `fetch` to return a `ReadableStream` with SSE data
- Test `useQueryDocumentStream` state transitions:
- idle → decomposing → decomposed (extractedQuestions set) → retrieving → filtering → completed (answer set)
- Test error handling: stream emits error event → state.error set
- Test abort: calling reset() aborts in-flight request
## Risks & Mitigations
| Risk | Mitigation |
|------|-----------|
| SSE not supported by older browsers | Use fetch + ReadableStream (works in all modern browsers) |
| Proxy/CDN buffering SSE | Set `X-Accel-Buffering: no` header, use `Cache-Control: no-cache` |
| CORS with streaming | Ensure `Access-Control-Allow-Origin` includes frontend origin |
| Connection drops mid-stream | Add retry logic or show "Connection lost" message |
| TanStack Query cache invalidation | Not needed — streaming bypasses cache, keep old `useQueryDocument` for non-streaming use |
## Files to Modify
| File | Change |
|------|--------|
| `backend/app/models/query.py` | Add streaming event models |
| `backend/app/routers/query.py` | Convert to StreamingResponse |
| `frontend/src/lib/api.ts` | Add `queryDocumentStream()` |
| `frontend/src/lib/queries.tsx` | Add `useQueryDocumentStream()` hook |
| `frontend/src/pages/LTTPage.tsx` | Use streaming hook |
| `frontend/src/components/ResponsePanel.tsx` | Phase-aware loading states |
## Success Criteria
- [ ] Extracted questions appear within 5 seconds of submitting a query
- [ ] Answer appears when generation completes (no UI blocking)
- [ ] Loading states clearly indicate which pipeline step is running
- [ ] Error handling works (shows error message if any step fails)
- [ ] All existing tests pass (or are updated for new behavior)
- [ ] New tests cover streaming state transitions