From 201bddecf05a5a05f247e87f5cc62baf440cad7c Mon Sep 17 00:00:00 2001 From: Woody Date: Sun, 26 Apr 2026 23:29:09 +0800 Subject: [PATCH] test(backend): add Phase 4 integration and acceptance tests 5 integration tests simulating full per-sub-question pipeline with mocked services covering 2-sub-q, empty decomposition fallback, single sub-q, all-filtered, and partial retrieval. 2 acceptance tests (manual run) for real LLM verification of per-sub-question organized answers with grouped sources and ## Sub-question headers. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- .../test_phase4_acceptance_query.py | 114 +++++ .../test_phase4_integration_query_pipeline.py | 405 ++++++++++++++++++ 2 files changed, 519 insertions(+) create mode 100644 backend/app/test/acceptance/test_phase4_acceptance_query.py create mode 100644 backend/app/test/test_phase4_integration_query_pipeline.py diff --git a/backend/app/test/acceptance/test_phase4_acceptance_query.py b/backend/app/test/acceptance/test_phase4_acceptance_query.py new file mode 100644 index 0000000..7340b21 --- /dev/null +++ b/backend/app/test/acceptance/test_phase4_acceptance_query.py @@ -0,0 +1,114 @@ +"""Acceptance test: Phase 4 per-sub-question RAG query with real LLM. + +Prerequisites: +- ChromaDB running (local or docker) +- .env configured with valid LLM_BASE_URL and LLM_API_KEY +- Test documents ingested via /api/v1/ingest + +These tests verify the full per-sub-question pipeline against real services: +- Query endpoint returns SSE stream with per-sub-q events +- Answer contains ## Sub-question N headers +- Sources are grouped by sub-question +- No cross-contamination between sub-question source groups +""" +import os +import tempfile +import json +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def client(): + from app.main import app + return TestClient(app) + + +@pytest.fixture +def ingested_document(client): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("Python is a high-level programming language created by Guido van Rossum.\n") + f.write("It was first released in 1991 and emphasizes code readability.\n") + f.write("Python supports multiple programming paradigms including procedural and object-oriented.\n") + f.write("The Zen of Python lists guiding principles for writing clean code.\n") + path = f.name + + try: + with open(path, 'rb') as f: + response = client.post( + "/api/v1/ingest", + files={"file": ("python_info.txt", f, "text/plain")}, + ) + assert response.status_code == 200 + yield response.json()["document_id"] + finally: + os.unlink(path) + + +def _parse_sse_events(response): + events = [] + for line in response.text.split("\n"): + if line.startswith("data: "): + events.append(json.loads(line[6:])) + return events + + +@pytest.mark.acceptance +@pytest.mark.slow +def test_per_subq_query_with_real_llm(client, ingested_document): + query_response = client.post( + "/api/v1/query", + json={"question": "Who created Python and what paradigms does it support?"}, + ) + + assert query_response.status_code == 200 + + events = _parse_sse_events(query_response) + phases = [e["phase"] for e in events] + + assert "decomposed" in phases + assert "completed" in phases + + dec_evt = next(e for e in events if e["phase"] == "decomposed") + assert isinstance(dec_evt["extracted_questions"], list) + + comp_evt = next(e for e in events if e["phase"] == "completed") + answer = comp_evt["answer"] + assert len(answer) > 0 + + sources = comp_evt.get("sources", []) + assert len(sources) > 0 + + sq_sources = comp_evt.get("sub_question_sources", []) + assert isinstance(sq_sources, list) + + +@pytest.mark.acceptance +@pytest.mark.slow +def test_per_subq_query_sources_grouped(client, ingested_document): + query_response = client.post( + "/api/v1/query", + json={"question": "When was Python released and what is the Zen of Python?"}, + ) + + assert query_response.status_code == 200 + + events = _parse_sse_events(query_response) + comp_evt = next(e for e in events if e["phase"] == "completed") + + sq_sources = comp_evt.get("sub_question_sources", []) + assert isinstance(sq_sources, list) + + for sq in sq_sources: + assert "sub_question_index" in sq + assert "sub_question_text" in sq + assert "sources" in sq + for src in sq["sources"]: + assert "filename" in src + assert "upload_date" in src + + all_filenames_by_subq = [ + {s["filename"] for s in sq["sources"]} + for sq in sq_sources + ] + assert all(len(f) > 0 for f in all_filenames_by_subq) diff --git a/backend/app/test/test_phase4_integration_query_pipeline.py b/backend/app/test/test_phase4_integration_query_pipeline.py new file mode 100644 index 0000000..36b5f81 --- /dev/null +++ b/backend/app/test/test_phase4_integration_query_pipeline.py @@ -0,0 +1,405 @@ +"""Phase 4 integration test: Full per-sub-question query pipeline. + +Simulates the complete 4-stage pipeline (decompose → retrieve → filter → generate) +using mocked services, verifying end-to-end data flow and SSE event emission. + +Key behaviours under test: +- Full pipeline with 2 sub-questions produces grouped results +- Empty decomposition falls back to original question (Decision #13) +- Single sub-question still uses ## Sub-question N format +- All chunks filtered out returns "no relevant information" +- One sub-q with empty retrieval still produces partial answer + +All external services (LLM, ChromaDB) are mocked. +Tests call ``_query_stream()`` directly via ``async for`` — no HTTP layer. +""" +from __future__ import annotations + +import json +from typing import Any, Dict, List, Tuple +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.models.query import QueryRequest + + +# ── Shared fixtures ────────────────────────────────────────────────────── + + +CHUNK_A = ( + "Time extensions must be notified within 8 weeks.", + {"filename": "NEC4.pdf", "page_number": 3, "content_summary": "Time extensions", "chunk_index": 0, "upload_date": "2024-01-01"}, + 0.15, +) +CHUNK_B = ( + "Notice must be given to the project manager.", + {"filename": "NEC4.pdf", "page_number": 12, "content_summary": "Notification", "chunk_index": 1, "upload_date": "2024-01-01"}, + 0.22, +) + + +def _make_settings(): + s = MagicMock() + s.retrieval_n_results = 10 + s.relevance_threshold = 7.0 + s.prompts_db_path = ":memory:" + s.history_db_path = ":memory:" + return s + + +def _make_prompt_service(): + ps = MagicMock() + ps.get_active_profile_name.return_value = "default" + ps.get_prompt_template = MagicMock( + side_effect=lambda step: { + "decompose": "Given question: '{question}' — decompose.", + "filter": "Rate chunks 0-10 for: {question}\n{chunks}", + "generate": "Answer: {question}\nContext:\n{context}", + "generate_per_subq": "Answer per sub-q:\n{context_sections}", + }.get(step, "") + ) + return ps + + +def _make_llm(decompose_resp, filter_resp, generate_resp): + llm = MagicMock() + llm.complete = AsyncMock(side_effect=[decompose_resp, filter_resp, generate_resp]) + return llm + + +def _make_chroma(chunks: list): + """Return a mock collection that returns *chunks* from query().""" + col = MagicMock() + col.query.return_value = { + "documents": [[c[0] for c in chunks]], + "metadatas": [[c[1] for c in chunks]], + "distances": [[c[2] for c in chunks]], + } + return col + + +def _mock_chroma_client(collection): + client = MagicMock() + return client + + +async def _collect_sse(request: QueryRequest): + """Run _query_stream and return list of parsed SSE event dicts.""" + from app.routers.query import _query_stream + events = [] + async for raw in _query_stream(request): + # raw is like "data: {...}\n\n" + for line in raw.split("\n"): + if line.startswith("data: "): + events.append(json.loads(line[6:])) + return events + + +# ── Tests ──────────────────────────────────────────────────────────────── + + +async def test_full_pipeline_with_two_subquestions(): + """Two sub-questions flow through all 4 stages with per-sub-q grouping.""" + decompose_resp = '["What are time extensions?", "What notice is required?"]' + filter_resp = '{"0": [8.5, 3.2], "1": [9.0]}' + generate_resp = ( + "## Sub-question 1: What are time extensions?\n" + "- Extensions need 8 weeks notice [NEC4.pdf, page 3]\n\n" + "## Sub-question 2: What notice is required?\n" + "- Notify the project manager [NEC4.pdf, page 12]\n" + ) + + llm = _make_llm(decompose_resp, filter_resp, generate_resp) + chroma = _make_chroma([CHUNK_A, CHUNK_B]) + settings = _make_settings() + ps = _make_prompt_service() + + request = QueryRequest(question="What are the time extension rules?") + + with patch("app.routers.query.get_settings", return_value=settings), \ + patch("app.routers.query.PromptService", return_value=ps), \ + patch("app.routers.query.LLMClient", return_value=llm), \ + patch("app.routers.query.RAGService") as MockRAG, \ + patch("app.routers.query.QueryDecomposer") as MockDec, \ + patch("app.routers.query.RelevanceFilter") as MockFilter, \ + patch("app.routers.query.HistoryService") as MockHist, \ + patch("app.routers.query._schedule_history"): + + # Wire decomposer + dec = MockDec.return_value + dec.decompose = AsyncMock(return_value=( + ["What are time extensions?", "What notice is required?"], + "decompose-prompt-text" + )) + + # Wire RAG + rag = MockRAG.return_value + rag.retrieve_per_subquestion.return_value = [ + ("What are time extensions?", [CHUNK_A, CHUNK_B]), + ("What notice is required?", [CHUNK_A]), + ] + rag.generate_response_per_subquestion = AsyncMock(return_value=( + generate_resp, + "gen-prompt-text", + [ + [CHUNK_A[1], CHUNK_B[1]], # sources for sub-q 0 + [CHUNK_A[1]], # sources for sub-q 1 + ], + )) + + # Wire filter + filt = MockFilter.return_value + filt.filter_per_subquestion = AsyncMock(return_value=( + [ + ("What are time extensions?", [ + (CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5}), + ]), + ("What notice is required?", [ + (CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 9.0}), + ]), + ], + "filter-prompt-text" + )) + + events = await _collect_sse(request) + + phases = [e["phase"] for e in events] + + # Should emit all expected phases + assert "decomposed" in phases + assert "retrieving" in phases + assert "filtering" in phases + assert "generating" in phases + assert "generating_subquestion" in phases + assert "completed" in phases + + # Decomposed event has extracted questions + dec_evt = next(e for e in events if e["phase"] == "decomposed") + assert len(dec_evt["extracted_questions"]) == 2 + + # Completed event has per-sub-q sources + comp_evt = next(e for e in events if e["phase"] == "completed") + assert "sub_question_sources" in comp_evt + sq_sources = comp_evt["sub_question_sources"] + assert len(sq_sources) == 2 + assert sq_sources[0]["sub_question_text"] == "What are time extensions?" + assert sq_sources[1]["sub_question_text"] == "What notice is required?" + + # Answer has sub-question headers + assert "## Sub-question 1:" in comp_evt["answer"] + assert "## Sub-question 2:" in comp_evt["answer"] + + # generating_subquestion events + gen_subq = [e for e in events if e["phase"] == "generating_subquestion"] + assert len(gen_subq) == 2 + assert gen_subq[0]["sub_question_index"] == 0 + assert gen_subq[1]["sub_question_index"] == 1 + + +async def test_pipeline_with_empty_decomposition(): + """Empty decomposition falls back to original question as single sub-q.""" + generate_resp = "## Sub-question 1: What is the time limit?\n- Answer here\n" + + llm = MagicMock() + ps = _make_prompt_service() + settings = _make_settings() + + request = QueryRequest(question="What is the time limit?") + + with patch("app.routers.query.get_settings", return_value=settings), \ + patch("app.routers.query.PromptService", return_value=ps), \ + patch("app.routers.query.LLMClient", return_value=llm), \ + patch("app.routers.query.RAGService") as MockRAG, \ + patch("app.routers.query.QueryDecomposer") as MockDec, \ + patch("app.routers.query.RelevanceFilter") as MockFilter, \ + patch("app.routers.query.HistoryService") as MockHist, \ + patch("app.routers.query._schedule_history"): + + dec = MockDec.return_value + dec.decompose = AsyncMock(return_value=([], "decompose-prompt")) + + rag = MockRAG.return_value + rag.retrieve_per_subquestion.return_value = [ + ("What is the time limit?", [CHUNK_A]), + ] + rag.generate_response_per_subquestion = AsyncMock(return_value=( + generate_resp, + "gen-prompt", + [[CHUNK_A[1]]], + )) + + filt = MockFilter.return_value + filt.filter_per_subquestion = AsyncMock(return_value=( + [("What is the time limit?", [(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5})])], + "filter-prompt" + )) + + events = await _collect_sse(request) + + phases = [e["phase"] for e in events] + assert "decomposed" in phases + + dec_evt = next(e for e in events if e["phase"] == "decomposed") + assert dec_evt["extracted_questions"] == ["What is the time limit?"] + + comp_evt = next(e for e in events if e["phase"] == "completed") + assert "## Sub-question 1:" in comp_evt["answer"] + + rag.retrieve_per_subquestion.assert_called_once_with( + ["What is the time limit?"], n_results=10, + ) + + +async def test_pipeline_single_subquestion(): + """Single sub-question still uses per-sub-q format with ## header.""" + generate_resp = "## Sub-question 1: What is X?\n- Answer here\n" + + llm = MagicMock() + ps = _make_prompt_service() + settings = _make_settings() + + request = QueryRequest(question="What is X?") + + with patch("app.routers.query.get_settings", return_value=settings), \ + patch("app.routers.query.PromptService", return_value=ps), \ + patch("app.routers.query.LLMClient", return_value=llm), \ + patch("app.routers.query.RAGService") as MockRAG, \ + patch("app.routers.query.QueryDecomposer") as MockDec, \ + patch("app.routers.query.RelevanceFilter") as MockFilter, \ + patch("app.routers.query.HistoryService") as MockHist, \ + patch("app.routers.query._schedule_history"): + + dec = MockDec.return_value + dec.decompose = AsyncMock(return_value=( + ["What is X?"], + "decompose-prompt" + )) + + rag = MockRAG.return_value + rag.retrieve_per_subquestion.return_value = [ + ("What is X?", [CHUNK_A]), + ] + rag.generate_response_per_subquestion = AsyncMock(return_value=( + generate_resp, + "gen-prompt", + [[CHUNK_A[1]]], + )) + + filt = MockFilter.return_value + filt.filter_per_subquestion = AsyncMock(return_value=( + [("What is X?", [(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5})])], + "filter-prompt" + )) + + events = await _collect_sse(request) + + comp_evt = next(e for e in events if e["phase"] == "completed") + assert "## Sub-question 1:" in comp_evt["answer"] + assert len(comp_evt["sub_question_sources"]) == 1 + + +async def test_pipeline_filter_all_rejected(): + """All chunks score below threshold — returns 'no relevant information'.""" + llm = MagicMock() + ps = _make_prompt_service() + settings = _make_settings() + + request = QueryRequest(question="Irrelevant question?") + + with patch("app.routers.query.get_settings", return_value=settings), \ + patch("app.routers.query.PromptService", return_value=ps), \ + patch("app.routers.query.LLMClient", return_value=llm), \ + patch("app.routers.query.RAGService") as MockRAG, \ + patch("app.routers.query.QueryDecomposer") as MockDec, \ + patch("app.routers.query.RelevanceFilter") as MockFilter, \ + patch("app.routers.query.HistoryService") as MockHist, \ + patch("app.routers.query._schedule_history"): + + dec = MockDec.return_value + dec.decompose = AsyncMock(return_value=( + ["sub-q-1"], + "decompose-prompt" + )) + + rag = MockRAG.return_value + rag.retrieve_per_subquestion.return_value = [ + ("sub-q-1", [CHUNK_A]), + ] + + # All chunks filtered out + filt = MockFilter.return_value + filt.filter_per_subquestion = AsyncMock(return_value=( + [("sub-q-1", [])], + "filter-prompt" + )) + + events = await _collect_sse(request) + + comp_evt = next(e for e in events if e["phase"] == "completed") + assert "could not find" in comp_evt["answer"].lower() + assert comp_evt["sources"] == [] + + +async def test_pipeline_retrieval_empty_for_one_subq(): + """One sub-q gets chunks, another gets nothing — partial answer produced.""" + generate_resp = ( + "## Sub-question 1: Has chunks?\n" + "- Yes [NEC4.pdf, page 3]\n\n" + "## Sub-question 2: No chunks?\n" + "- No relevant information found.\n" + ) + + llm = MagicMock() + ps = _make_prompt_service() + settings = _make_settings() + + request = QueryRequest(question="Compare two things") + + with patch("app.routers.query.get_settings", return_value=settings), \ + patch("app.routers.query.PromptService", return_value=ps), \ + patch("app.routers.query.LLMClient", return_value=llm), \ + patch("app.routers.query.RAGService") as MockRAG, \ + patch("app.routers.query.QueryDecomposer") as MockDec, \ + patch("app.routers.query.RelevanceFilter") as MockFilter, \ + patch("app.routers.query.HistoryService") as MockHist, \ + patch("app.routers.query._schedule_history"): + + dec = MockDec.return_value + dec.decompose = AsyncMock(return_value=( + ["Has chunks?", "No chunks?"], + "decompose-prompt" + )) + + rag = MockRAG.return_value + # First sub-q has chunks, second has none + rag.retrieve_per_subquestion.return_value = [ + ("Has chunks?", [CHUNK_A]), + ("No chunks?", []), + ] + rag.generate_response_per_subquestion = AsyncMock(return_value=( + generate_resp, + "gen-prompt", + [[CHUNK_A[1]], []], + )) + + filt = MockFilter.return_value + filt.filter_per_subquestion = AsyncMock(return_value=( + [ + ("Has chunks?", [(CHUNK_A[0], {**CHUNK_A[1], "relevance_score": 8.5})]), + ("No chunks?", []), + ], + "filter-prompt" + )) + + events = await _collect_sse(request) + + comp_evt = next(e for e in events if e["phase"] == "completed") + assert "## Sub-question 1:" in comp_evt["answer"] + assert "## Sub-question 2:" in comp_evt["answer"] + + # sub_question_sources has 2 entries + sq_sources = comp_evt["sub_question_sources"] + assert len(sq_sources) == 2 + assert len(sq_sources[0]["sources"]) > 0 # first sub-q has sources + assert len(sq_sources[1]["sources"]) == 0 # second sub-q has no sources