legco_ai_assistant/backend/app/services/query_decomposer.py

169 lines
6.2 KiB
Python

"""Query decomposer service.
This module provides a lightweight QueryDecomposer that delegates the
decomposition of a natural language question into simplified sub-questions
to an LLM client. Prompt templates are fetched from PromptService when
available; otherwise, a built-in default is used.
Uses LangChain structured output via LLMClient.complete_structured()
for guaranteed valid JSON, with a legacy json.loads() fallback path.
"""
from __future__ import annotations
import json
import logging
import re
from typing import TYPE_CHECKING, List, Tuple
if TYPE_CHECKING:
from app.services.prompt_service import PromptService
logger = logging.getLogger(__name__)
# Fallback template used when prompt_service is not provided (tests, standalone).
_BUILTIN_DECOMPOSE_TEMPLATE = (
"Given this question: '{question}'\n\n"
"Break it down into 1-3 simplified sub-questions that would help "
"search for relevant information. Each sub-question should be short "
"and focused on one aspect. Return as a JSON array of strings."
)
_DEFAULT_DECOMPOSE_FORMAT = {
"description": (
"請將問題/任務拆解成 1-3 個簡化子問題,標籤式主題必須清楚、簡潔、具體,"
"一看就明白(建議 3-8 個字),若涉及地點、地區、人物、時間、金額/財政 等關鍵資訊,"
"必須包含在標籤中 。具體提問/要求要精準、完整 並全部轉換成以下固定格式:\n"
"「標籤式主題:具體提問/要求」"
),
"max_length": 3,
}
def _extract_json_from_markdown(response: str) -> str:
if not isinstance(response, str):
return str(response)
pattern = r"```(?:json)?\s*\n?(.*?)\n?```"
match = re.search(pattern, response, re.DOTALL)
if match:
return match.group(1).strip()
return response.strip()
def _parse_legacy_json(response: str) -> List[str]:
extracted = _extract_json_from_markdown(response)
logger.info("Legacy JSON parse: extracted text (first 300 chars): %s", extracted[:300] if extracted else "(empty)")
try:
data = json.loads(extracted)
except json.JSONDecodeError:
logger.warning("Legacy JSON parse: json.loads failed on extracted text")
return []
if not isinstance(data, list):
return []
if len(data) == 0:
return []
if all(isinstance(item, str) for item in data):
return data
return [str(item) for item in data]
class QueryDecomposer:
"""Decompose a natural language question into simplified sub-questions.
The class expects an LLM client that exposes ``async complete(prompt: str) -> str``
and ``async complete_structured(prompt, pydantic_model) -> BaseModel``,
and an optional ``PromptService`` for templated prompts. When ``prompt_service`` is
``None``, a built-in default template is used.
"""
def __init__(self, llm_client, prompt_service: "PromptService | None" = None) -> None:
self.llm_client = llm_client
self._prompt_service = prompt_service
async def decompose(self, question: str) -> Tuple[List[str], str]:
"""Return a list of sub-questions and the prompt used for decomposition.
Uses LangChain structured output as the primary path (guaranteed valid JSON).
Falls back to legacy json.loads() parsing if structured output fails.
Args:
question: The natural language question to decompose.
Returns:
A tuple of (sub-questions, prompt). sub-questions is a list of
strings; prompt is the rendered prompt string. If both structured
and legacy paths fail, sub-questions will be an empty list.
"""
if question is None or question.strip() == "":
return [], ""
if self._prompt_service is not None:
template = self._prompt_service.get_prompt_template("decompose")
logger.info("Decompose prompt template (first 200 chars): %s", template[:200] if template else "(empty)")
else:
template = _BUILTIN_DECOMPOSE_TEMPLATE
prompt = template.replace("{question}", question)
from app.models.decompose import SubQuestions, create_subquestions_model, parse_decompose_format
pydantic_model = SubQuestions
if self._prompt_service is not None:
try:
format_raw = self._prompt_service.get_prompt_template("decompose_format")
format_config = parse_decompose_format(format_raw)
pydantic_model = create_subquestions_model(
description=format_config["description"],
max_length=format_config["max_length"],
)
logger.info(
"Decompose format loaded from DB: max_length=%d",
format_config["max_length"],
)
except Exception as exc:
logger.warning(
"Failed to load decompose_format from DB: %s. Falling back to static SubQuestions.",
exc,
)
try:
result = await self.llm_client.complete_structured(
prompt=prompt,
pydantic_model=pydantic_model,
step_name="QueryDecomposer",
)
return result.questions, prompt
except Exception as exc:
logger.warning(
"Structured decomposition failed: %s. Falling back to legacy parse.",
exc,
)
try:
response = await self.llm_client.complete(prompt, step_name="QueryDecomposer")
except Exception as exc:
logger.warning("Legacy LLM decomposition also failed: %s", exc)
return [], prompt
if not isinstance(response, str):
response = str(response)
questions = _parse_legacy_json(response)
if not questions:
logger.warning(
"Legacy decompose JSON parse failed. Raw response (first 500 chars): %s",
response[:500],
)
else:
logger.info(
"Legacy decompose succeeded after structured output failure. "
"Consider investigating why structured output failed."
)
return questions, prompt