From 57a130dc962f28b1ce7ec58084f16a9e9a3ff227 Mon Sep 17 00:00:00 2001 From: Woody Date: Sun, 26 Apr 2026 23:27:50 +0800 Subject: [PATCH] feat(services): add per-sub-question retrieval, filtering, and response generation Add retrieve_per_subquestion() that queries ChromaDB independently per sub-question instead of joining all sub-qs into one query string. Add filter_per_subquestion() that evaluates each chunk against its own originating sub-question in a single LLM call with a redesigned grouped prompt. Add generate_response_per_subquestion() that produces markdown sections per sub-question with grouped sources and {context_sections} template support. All existing methods preserved for backward compatibility. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- backend/app/services/rag.py | 128 +++++++++++++++++++++++ backend/app/services/relevance_filter.py | 111 ++++++++++++++++++++ 2 files changed, 239 insertions(+) diff --git a/backend/app/services/rag.py b/backend/app/services/rag.py index ecab11d..e361fb5 100644 --- a/backend/app/services/rag.py +++ b/backend/app/services/rag.py @@ -63,6 +63,35 @@ class RAGService: return document_id + def retrieve_per_subquestion( + self, + sub_questions: List[str], + n_results: int = 10, + ) -> List[Tuple[str, List[Tuple[str, Dict[str, Any], float]]]]: + """Retrieve chunks for each sub-question independently. + + Calls retrieve() once per sub-question to get chunks specifically + relevant to each decomposed question, rather than joining all + sub-questions into a single query string. + + Args: + sub_questions: List of decomposed sub-questions from QueryDecomposer. + n_results: Number of chunks to retrieve per sub-question. + + Returns: + List of (sub_question, chunks) tuples. Each chunks list contains + (text, metadata, distance) tuples in the standard retrieve() format. + Returns empty list if sub_questions is empty. + """ + if not sub_questions: + return [] + + results: List[Tuple[str, List[Tuple[str, Dict[str, Any], float]]]] = [] + for sub_q in sub_questions: + chunks = self.retrieve([sub_q], n_results=n_results) + results.append((sub_q, chunks)) + return results + def retrieve( self, query_keywords: List[str], @@ -142,6 +171,105 @@ class RAGService: result = await self.llm_client.complete(prompt=prompt, temperature=0.3, step_name="ResponseGeneration") return result, prompt + async def generate_response_per_subquestion( + self, + sub_questions: List[str], + sub_chunks: List[List[str]], + sub_metadata: List[List[Dict[str, Any]]], + ) -> Tuple[str, str, List[List[Dict[str, Any]]]]: + """Generate sub-question-organized RAG response. + + Builds context sections for each sub-question and asks the LLM to + answer each one using only its own document chunks. Returns the + full markdown answer plus sources organized by sub-question. + + Args: + sub_questions: List of decomposed sub-questions. + sub_chunks: List of chunk text lists (one per sub-question). + sub_metadata: List of metadata dict lists (one per sub-question). + Must be same length as sub_chunks, with inner lists matching. + + Returns: + Tuple of (answer, prompt, grouped_sources). + answer: Markdown string with ## Sub-question N: sections. + prompt: The rendered LLM prompt string. + grouped_sources: List of metadata dict lists (one per sub-question), + each metadata dict is a SourceMetadata-compatible dict. + """ + if not sub_questions: + return ( + "I could not find any relevant information to answer your question.", + "", + [], + ) + + has_chunks = any(len(c) > 0 for c in sub_chunks) + if not has_chunks: + return ( + "I could not find any relevant information to answer your question.", + "", + [], + ) + + if self.llm_client is None: + return ("LLM client not configured.", "", []) + + context_parts = [] + for idx, (sq, chunks, metas) in enumerate( + zip(sub_questions, sub_chunks, sub_metadata) + ): + context_parts.append( + f'### Context for Sub-question {idx}: "{sq}"' + ) + for chunk, meta in zip(chunks, metas): + source = meta.get("filename", "unknown") + summary = meta.get("content_summary", "") + page_num = meta.get("page_number") + citation_label = ( + f"{source}, page {page_num}" if page_num else source + ) + context_parts.append( + f"[{citation_label}] Source: {source}\n" + f"Summary: {summary}\n" + f"Content: {chunk}\n" + ) + + context_sections = "\n".join(context_parts) + + if self._prompt_service is not None: + template = self._prompt_service.get_prompt_template( + "generate_per_subq" + ) + else: + template = ( + "You must answer each sub-question using ONLY the document " + "chunks provided for it.\n" + "Do not use any external knowledge.\n" + "Format your answer as markdown sections — one section per " + "sub-question.\n" + 'Each section should start with "## Sub-question N: ' + '"\n' + "Each section should contain 1-5 bullet points.\n" + "Cite your sources inline using bracket labels, " + "e.g. [filename, page N].\n" + "Place the citation at the end of each relevant bullet point." + "\n\n" + "{context_sections}\n\n" + "Answer:" + ) + + prompt = template.replace("{context_sections}", context_sections) + + answer = await self.llm_client.complete( + prompt=prompt, temperature=0.3, step_name="ResponseGeneration" + ) + + grouped_sources: List[List[Dict[str, Any]]] = [] + for metas in sub_metadata: + grouped_sources.append(list(metas)) + + return answer, prompt, grouped_sources + def list_documents(self) -> Tuple[List[Dict[str, Any]], int, int]: from collections import defaultdict diff --git a/backend/app/services/relevance_filter.py b/backend/app/services/relevance_filter.py index 077989c..07274d3 100644 --- a/backend/app/services/relevance_filter.py +++ b/backend/app/services/relevance_filter.py @@ -103,3 +103,114 @@ class RelevanceFilter: result.append((chunk, meta)) return result, prompt + + def _build_per_subq_prompt( + self, + sub_questions: List[str], + sub_chunks: List[List[Tuple[str, Dict]]], + ) -> str: + sections: List[str] = [ + "Evaluate each chunk for relevance to its associated sub-question only." + ] + for idx, (sq, chunks) in enumerate(zip(sub_questions, sub_chunks)): + sections.append(f'\nSub-question {idx}: "{sq}"') + for c_idx, (text, _meta) in enumerate(chunks): + sections.append(f"Chunk {c_idx}: {text}") + + sections.append( + "\nFor each chunk, rate its relevance 0-10 considering ONLY its associated sub-question." + ) + sections.append( + 'Return a JSON object mapping sub-question indices to arrays of scores.\n' + 'Example: {"0": [8.5, 3.2, 9.0], "1": [7.0, 9.1]}' + ) + return "\n".join(sections) + + async def filter_per_subquestion( + self, + sub_questions: List[str], + sub_chunks: List[List[Tuple[str, Dict]]], + threshold: float = 7.0, + ) -> Tuple[List[Tuple[str, List[Tuple[str, Dict]]]], str]: + """Filter chunks per sub-question in a single LLM call. + + Builds a prompt that groups chunks by their originating sub-question + and asks the LLM to score each chunk 0-10 against only its own + sub-question. Returns results organized by sub-question with + relevance scores embedded in metadata. + + Args: + sub_questions: List of decomposed sub-questions. + sub_chunks: List of chunk lists (one per sub-question). Each inner + list contains (chunk_text, metadata) tuples. + threshold: Minimum relevance score (exclusive) to keep a chunk. + + Returns: + Tuple of (filtered_results, prompt). + filtered_results: List of (sub_question, filtered_chunks) tuples. + Each filtered_chunks is a list of (chunk_text, metadata) tuples + where metadata includes 'relevance_score'. + Returns ([], "") on error or empty input. + """ + if not sub_questions: + return [], "" + + has_any_chunks = any(len(c) > 0 for c in sub_chunks) + if not has_any_chunks: + return [ + (sq, []) for sq in sub_questions + ], "" + + prompt = self._build_per_subq_prompt(sub_questions, sub_chunks) + try: + response = await self.llm_client.complete( + prompt, temperature=0.0, step_name="RelevanceFilter" + ) + except Exception as exc: + logger.error("RelevanceFilter per-subq LLM call failed: %s", exc) + return [], prompt + + try: + response = _extract_json_from_markdown(response) + parsed = json.loads(response) + if not isinstance(parsed, dict): + logger.error("RelevanceFilter per-subq: expected JSON object, got %s", type(parsed).__name__) + return [], prompt + + score_map: Dict[str, List[float]] = {} + for key, scores in parsed.items(): + if not isinstance(scores, list): + return [], prompt + score_map[key] = [] + for v in scores: + if not isinstance(v, (int, float)): + return [], prompt + score_map[key].append(float(v)) + except Exception as exc: + logger.error("RelevanceFilter per-subq JSON parse failed: %s", exc) + return [], prompt + + for idx in range(len(sub_questions)): + key = str(idx) + if len(sub_chunks[idx]) == 0: + continue + if key not in score_map or len(score_map[key]) != len(sub_chunks[idx]): + logger.error( + "RelevanceFilter per-subq score count mismatch for sub-q %d: " + "expected %d scores, got %d", + idx, len(sub_chunks[idx]), + len(score_map.get(key, [])), + ) + return [], prompt + + filtered_results: List[Tuple[str, List[Tuple[str, Dict]]]] = [] + for idx, (sq, chunks) in enumerate(zip(sub_questions, sub_chunks)): + scores = score_map.get(str(idx), []) + kept: List[Tuple[str, Dict]] = [] + for (chunk, meta), score in zip(chunks, scores): + if score > threshold: + meta = {**meta, "relevance_score": score} + kept.append((chunk, meta)) + filtered_results.append((sq, kept)) + + return filtered_results, prompt