test(backend): add Phase 4 integration and acceptance tests

5 integration tests simulating full per-sub-question pipeline with mocked services covering 2-sub-q, empty decomposition fallback, single sub-q, all-filtered, and partial retrieval. 2 acceptance tests (manual run) for real LLM verification of per-sub-question organized answers with grouped sources and ## Sub-question headers.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Woody 2026-04-26 23:29:09 +08:00
parent dd98fa0b65
commit 201bddecf0
2 changed files with 519 additions and 0 deletions

View File

@ -0,0 +1,114 @@
"""Acceptance test: Phase 4 per-sub-question RAG query with real LLM.
Prerequisites:
- ChromaDB running (local or docker)
- .env configured with valid LLM_BASE_URL and LLM_API_KEY
- Test documents ingested via /api/v1/ingest
These tests verify the full per-sub-question pipeline against real services:
- Query endpoint returns SSE stream with per-sub-q events
- Answer contains ## Sub-question N headers
- Sources are grouped by sub-question
- No cross-contamination between sub-question source groups
"""
import os
import tempfile
import json
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client():
from app.main import app
return TestClient(app)
@pytest.fixture
def ingested_document(client):
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("Python is a high-level programming language created by Guido van Rossum.\n")
f.write("It was first released in 1991 and emphasizes code readability.\n")
f.write("Python supports multiple programming paradigms including procedural and object-oriented.\n")
f.write("The Zen of Python lists guiding principles for writing clean code.\n")
path = f.name
try:
with open(path, 'rb') as f:
response = client.post(
"/api/v1/ingest",
files={"file": ("python_info.txt", f, "text/plain")},
)
assert response.status_code == 200
yield response.json()["document_id"]
finally:
os.unlink(path)
def _parse_sse_events(response):
events = []
for line in response.text.split("\n"):
if line.startswith("data: "):
events.append(json.loads(line[6:]))
return events
@pytest.mark.acceptance
@pytest.mark.slow
def test_per_subq_query_with_real_llm(client, ingested_document):
query_response = client.post(
"/api/v1/query",
json={"question": "Who created Python and what paradigms does it support?"},
)
assert query_response.status_code == 200
events = _parse_sse_events(query_response)
phases = [e["phase"] for e in events]
assert "decomposed" in phases
assert "completed" in phases
dec_evt = next(e for e in events if e["phase"] == "decomposed")
assert isinstance(dec_evt["extracted_questions"], list)
comp_evt = next(e for e in events if e["phase"] == "completed")
answer = comp_evt["answer"]
assert len(answer) > 0
sources = comp_evt.get("sources", [])
assert len(sources) > 0
sq_sources = comp_evt.get("sub_question_sources", [])
assert isinstance(sq_sources, list)
@pytest.mark.acceptance
@pytest.mark.slow
def test_per_subq_query_sources_grouped(client, ingested_document):
query_response = client.post(
"/api/v1/query",
json={"question": "When was Python released and what is the Zen of Python?"},
)
assert query_response.status_code == 200
events = _parse_sse_events(query_response)
comp_evt = next(e for e in events if e["phase"] == "completed")
sq_sources = comp_evt.get("sub_question_sources", [])
assert isinstance(sq_sources, list)
for sq in sq_sources:
assert "sub_question_index" in sq
assert "sub_question_text" in sq
assert "sources" in sq
for src in sq["sources"]:
assert "filename" in src
assert "upload_date" in src
all_filenames_by_subq = [
{s["filename"] for s in sq["sources"]}
for sq in sq_sources
]
assert all(len(f) > 0 for f in all_filenames_by_subq)

View File

@ -0,0 +1,405 @@
"""Phase 4 integration test: Full per-sub-question query pipeline.
Simulates the complete 4-stage pipeline (decompose retrieve filter generate)
using mocked services, verifying end-to-end data flow and SSE event emission.
Key behaviours under test:
- Full pipeline with 2 sub-questions produces grouped results
- Empty decomposition falls back to original question (Decision #13)
- Single sub-question still uses ## Sub-question N format
- All chunks filtered out returns "no relevant information"
- One sub-q with empty retrieval still produces partial answer
All external services (LLM, ChromaDB) are mocked.
Tests call ``_query_stream()`` directly via ``async for`` no HTTP layer.
"""
from __future__ import annotations
import json
from typing import Any, Dict, List, Tuple
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.models.query import QueryRequest
# ── Shared fixtures ──────────────────────────────────────────────────────
CHUNK_A = (
"Time extensions must be notified within 8 weeks.",
{"filename": "NEC4.pdf", "page_number": 3, "content_summary": "Time extensions", "chunk_index": 0, "upload_date": "2024-01-01"},
0.15,
)
CHUNK_B = (
"Notice must be given to the project manager.",
{"filename": "NEC4.pdf", "page_number": 12, "content_summary": "Notification", "chunk_index": 1, "upload_date": "2024-01-01"},
0.22,
)
def _make_settings():
s = MagicMock()
s.retrieval_n_results = 10
s.relevance_threshold = 7.0
s.prompts_db_path = ":memory:"
s.history_db_path = ":memory:"
return s
def _make_prompt_service():
ps = MagicMock()
ps.get_active_profile_name.return_value = "default"
ps.get_prompt_template = MagicMock(
side_effect=lambda step: {
"decompose": "Given question: '{question}' — decompose.",
"filter": "Rate chunks 0-10 for: {question}\n{chunks}",
"generate": "Answer: {question}\nContext:\n{context}",
"generate_per_subq": "Answer per sub-q:\n{context_sections}",
}.get(step, "")
)
return ps
def _make_llm(decompose_resp, filter_resp, generate_resp):
llm = MagicMock()
llm.complete = AsyncMock(side_effect=[decompose_resp, filter_resp, generate_resp])
return llm
def _make_chroma(chunks: list):
"""Return a mock collection that returns *chunks* from query()."""
col = MagicMock()
col.query.return_value = {
"documents": [[c[0] for c in chunks]],
"metadatas": [[c[1] for c in chunks]],
"distances": [[c[2] for c in chunks]],
}
return col
def _mock_chroma_client(collection):
client = MagicMock()
return client
async def _collect_sse(request: QueryRequest):
"""Run _query_stream and return list of parsed SSE event dicts."""
from app.routers.query import _query_stream
events = []
async for raw in _query_stream(request):
# raw is like "data: {...}\n\n"
for line in raw.split("\n"):
if line.startswith("data: "):
events.append(json.loads(line[6:]))
return events
# ── Tests ────────────────────────────────────────────────────────────────
async def test_full_pipeline_with_two_subquestions():
"""Two sub-questions flow through all 4 stages with per-sub-q grouping."""
decompose_resp = '["What are time extensions?", "What notice is required?"]'
filter_resp = '{"0": [8.5, 3.2], "1": [9.0]}'
generate_resp = (
"## Sub-question 1: What are time extensions?\n"
"- Extensions need 8 weeks notice [NEC4.pdf, page 3]\n\n"
"## Sub-question 2: What notice is required?\n"
"- Notify the project manager [NEC4.pdf, page 12]\n"
)
llm = _make_llm(decompose_resp, filter_resp, generate_resp)
chroma = _make_chroma([CHUNK_A, CHUNK_B])
settings = _make_settings()
ps = _make_prompt_service()
request = QueryRequest(question="What are the time extension rules?")
with patch("app.routers.query.get_settings", return_value=settings), \
patch("app.routers.query.PromptService", return_value=ps), \
patch("app.routers.query.LLMClient", return_value=llm), \
patch("app.routers.query.RAGService") as MockRAG, \
patch("app.routers.query.QueryDecomposer") as MockDec, \
patch("app.routers.query.RelevanceFilter") as MockFilter, \
patch("app.routers.query.HistoryService") as MockHist, \
patch("app.routers.query._schedule_history"):
# Wire decomposer
dec = MockDec.return_value
dec.decompose = AsyncMock(return_value=(
["What are time extensions?", "What notice is required?"],
"decompose-prompt-text"
))
# Wire RAG
rag = MockRAG.return_value
rag.retrieve_per_subquestion.return_value = [
("What are time extensions?", [CHUNK_A, CHUNK_B]),
("What notice is required?", [CHUNK_A]),
]
rag.generate_response_per_subquestion = AsyncMock(return_value=(
generate_resp,
"gen-prompt-text",
[
[CHUNK_A[1], CHUNK_B[1]], # sources for sub-q 0
[CHUNK_A[1]], # sources for sub-q 1
],
))
# Wire filter
filt = MockFilter.return_value
filt.filter_per_subquestion = AsyncMock(return_value=(
[
("What are time extensions?", [
(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5}),
]),
("What notice is required?", [
(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 9.0}),
]),
],
"filter-prompt-text"
))
events = await _collect_sse(request)
phases = [e["phase"] for e in events]
# Should emit all expected phases
assert "decomposed" in phases
assert "retrieving" in phases
assert "filtering" in phases
assert "generating" in phases
assert "generating_subquestion" in phases
assert "completed" in phases
# Decomposed event has extracted questions
dec_evt = next(e for e in events if e["phase"] == "decomposed")
assert len(dec_evt["extracted_questions"]) == 2
# Completed event has per-sub-q sources
comp_evt = next(e for e in events if e["phase"] == "completed")
assert "sub_question_sources" in comp_evt
sq_sources = comp_evt["sub_question_sources"]
assert len(sq_sources) == 2
assert sq_sources[0]["sub_question_text"] == "What are time extensions?"
assert sq_sources[1]["sub_question_text"] == "What notice is required?"
# Answer has sub-question headers
assert "## Sub-question 1:" in comp_evt["answer"]
assert "## Sub-question 2:" in comp_evt["answer"]
# generating_subquestion events
gen_subq = [e for e in events if e["phase"] == "generating_subquestion"]
assert len(gen_subq) == 2
assert gen_subq[0]["sub_question_index"] == 0
assert gen_subq[1]["sub_question_index"] == 1
async def test_pipeline_with_empty_decomposition():
"""Empty decomposition falls back to original question as single sub-q."""
generate_resp = "## Sub-question 1: What is the time limit?\n- Answer here\n"
llm = MagicMock()
ps = _make_prompt_service()
settings = _make_settings()
request = QueryRequest(question="What is the time limit?")
with patch("app.routers.query.get_settings", return_value=settings), \
patch("app.routers.query.PromptService", return_value=ps), \
patch("app.routers.query.LLMClient", return_value=llm), \
patch("app.routers.query.RAGService") as MockRAG, \
patch("app.routers.query.QueryDecomposer") as MockDec, \
patch("app.routers.query.RelevanceFilter") as MockFilter, \
patch("app.routers.query.HistoryService") as MockHist, \
patch("app.routers.query._schedule_history"):
dec = MockDec.return_value
dec.decompose = AsyncMock(return_value=([], "decompose-prompt"))
rag = MockRAG.return_value
rag.retrieve_per_subquestion.return_value = [
("What is the time limit?", [CHUNK_A]),
]
rag.generate_response_per_subquestion = AsyncMock(return_value=(
generate_resp,
"gen-prompt",
[[CHUNK_A[1]]],
))
filt = MockFilter.return_value
filt.filter_per_subquestion = AsyncMock(return_value=(
[("What is the time limit?", [(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5})])],
"filter-prompt"
))
events = await _collect_sse(request)
phases = [e["phase"] for e in events]
assert "decomposed" in phases
dec_evt = next(e for e in events if e["phase"] == "decomposed")
assert dec_evt["extracted_questions"] == ["What is the time limit?"]
comp_evt = next(e for e in events if e["phase"] == "completed")
assert "## Sub-question 1:" in comp_evt["answer"]
rag.retrieve_per_subquestion.assert_called_once_with(
["What is the time limit?"], n_results=10,
)
async def test_pipeline_single_subquestion():
"""Single sub-question still uses per-sub-q format with ## header."""
generate_resp = "## Sub-question 1: What is X?\n- Answer here\n"
llm = MagicMock()
ps = _make_prompt_service()
settings = _make_settings()
request = QueryRequest(question="What is X?")
with patch("app.routers.query.get_settings", return_value=settings), \
patch("app.routers.query.PromptService", return_value=ps), \
patch("app.routers.query.LLMClient", return_value=llm), \
patch("app.routers.query.RAGService") as MockRAG, \
patch("app.routers.query.QueryDecomposer") as MockDec, \
patch("app.routers.query.RelevanceFilter") as MockFilter, \
patch("app.routers.query.HistoryService") as MockHist, \
patch("app.routers.query._schedule_history"):
dec = MockDec.return_value
dec.decompose = AsyncMock(return_value=(
["What is X?"],
"decompose-prompt"
))
rag = MockRAG.return_value
rag.retrieve_per_subquestion.return_value = [
("What is X?", [CHUNK_A]),
]
rag.generate_response_per_subquestion = AsyncMock(return_value=(
generate_resp,
"gen-prompt",
[[CHUNK_A[1]]],
))
filt = MockFilter.return_value
filt.filter_per_subquestion = AsyncMock(return_value=(
[("What is X?", [(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5})])],
"filter-prompt"
))
events = await _collect_sse(request)
comp_evt = next(e for e in events if e["phase"] == "completed")
assert "## Sub-question 1:" in comp_evt["answer"]
assert len(comp_evt["sub_question_sources"]) == 1
async def test_pipeline_filter_all_rejected():
"""All chunks score below threshold — returns 'no relevant information'."""
llm = MagicMock()
ps = _make_prompt_service()
settings = _make_settings()
request = QueryRequest(question="Irrelevant question?")
with patch("app.routers.query.get_settings", return_value=settings), \
patch("app.routers.query.PromptService", return_value=ps), \
patch("app.routers.query.LLMClient", return_value=llm), \
patch("app.routers.query.RAGService") as MockRAG, \
patch("app.routers.query.QueryDecomposer") as MockDec, \
patch("app.routers.query.RelevanceFilter") as MockFilter, \
patch("app.routers.query.HistoryService") as MockHist, \
patch("app.routers.query._schedule_history"):
dec = MockDec.return_value
dec.decompose = AsyncMock(return_value=(
["sub-q-1"],
"decompose-prompt"
))
rag = MockRAG.return_value
rag.retrieve_per_subquestion.return_value = [
("sub-q-1", [CHUNK_A]),
]
# All chunks filtered out
filt = MockFilter.return_value
filt.filter_per_subquestion = AsyncMock(return_value=(
[("sub-q-1", [])],
"filter-prompt"
))
events = await _collect_sse(request)
comp_evt = next(e for e in events if e["phase"] == "completed")
assert "could not find" in comp_evt["answer"].lower()
assert comp_evt["sources"] == []
async def test_pipeline_retrieval_empty_for_one_subq():
"""One sub-q gets chunks, another gets nothing — partial answer produced."""
generate_resp = (
"## Sub-question 1: Has chunks?\n"
"- Yes [NEC4.pdf, page 3]\n\n"
"## Sub-question 2: No chunks?\n"
"- No relevant information found.\n"
)
llm = MagicMock()
ps = _make_prompt_service()
settings = _make_settings()
request = QueryRequest(question="Compare two things")
with patch("app.routers.query.get_settings", return_value=settings), \
patch("app.routers.query.PromptService", return_value=ps), \
patch("app.routers.query.LLMClient", return_value=llm), \
patch("app.routers.query.RAGService") as MockRAG, \
patch("app.routers.query.QueryDecomposer") as MockDec, \
patch("app.routers.query.RelevanceFilter") as MockFilter, \
patch("app.routers.query.HistoryService") as MockHist, \
patch("app.routers.query._schedule_history"):
dec = MockDec.return_value
dec.decompose = AsyncMock(return_value=(
["Has chunks?", "No chunks?"],
"decompose-prompt"
))
rag = MockRAG.return_value
# First sub-q has chunks, second has none
rag.retrieve_per_subquestion.return_value = [
("Has chunks?", [CHUNK_A]),
("No chunks?", []),
]
rag.generate_response_per_subquestion = AsyncMock(return_value=(
generate_resp,
"gen-prompt",
[[CHUNK_A[1]], []],
))
filt = MockFilter.return_value
filt.filter_per_subquestion = AsyncMock(return_value=(
[
("Has chunks?", [(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5})]),
("No chunks?", []),
],
"filter-prompt"
))
events = await _collect_sse(request)
comp_evt = next(e for e in events if e["phase"] == "completed")
assert "## Sub-question 1:" in comp_evt["answer"]
assert "## Sub-question 2:" in comp_evt["answer"]
# sub_question_sources has 2 entries
sq_sources = comp_evt["sub_question_sources"]
assert len(sq_sources) == 2
assert len(sq_sources[0]["sources"]) > 0 # first sub-q has sources
assert len(sq_sources[1]["sources"]) == 0 # second sub-q has no sources