legco_ai_assistant/backend/app/services/llm_client_dp.py

189 lines
6.9 KiB
Python

"""Deepseek API client for the query decomposition step only.
Provides `LLMClientDP`, a lightweight async client targeting the Deepseek
API (deepseek-v4-pro) with thinking mode always disabled. Uses the same
OpenAI-compatible SDK as `LLMClient` but with Deepseek-specific extra_body
and a separate set of Settings fields (dp_*).
Only implements `complete()` and `complete_structured()` — the two methods
consumed by `QueryDecomposer`. ``complete_structured()`` uses a manual
JSON-extraction approach because Deepseek does not support OpenAI's
``response_format`` json_schema mode yet.
"""
import json
import logging
import re
import time
from typing import Any
import httpx
from openai import AsyncOpenAI, APIError, APITimeoutError
from app.core.config import Settings
logger = logging.getLogger(__name__)
class LLMClientDPError(Exception):
"""Raised when a Deepseek API call fails."""
def _truncate_prompt_for_log(prompt: str, first_chars: int = 100, last_chars: int = 100) -> str:
if len(prompt) <= first_chars + last_chars:
return prompt
return (
f"{prompt[:first_chars]}..."
f"({len(prompt) - first_chars - last_chars} chars omitted)..."
f"{prompt[-last_chars:]}"
)
class LLMClientDP:
"""Async Deepseek API client for query decomposition.
Always disables thinking mode via ``extra_body={"thinking": {"type": "disabled"}}``.
Uses the OpenAI-compatible SDK with Deepseek's base URL.
Falls back to ``settings.llm_api_key`` when ``settings.dp_api_key`` is empty.
"""
def __init__(self, settings: Settings) -> None:
api_key = settings.dp_api_key or settings.llm_api_key
self.model = settings.dp_model_name
self._client = AsyncOpenAI(
base_url=settings.dp_base_url.rstrip("/"),
api_key=api_key,
timeout=settings.llm_timeout,
http_client=httpx.AsyncClient(
headers={"Content-Type": "application/json"},
),
)
async def complete(
self,
prompt: str,
temperature: float = 0.7,
step_name: str = "QueryDecomposer",
response_format: dict[str, Any] | None = None,
) -> str:
"""Send a chat completion request with thinking disabled.
Used as the fallback path by ``QueryDecomposer.decompose()`` when
``complete_structured()`` fails.
Args:
prompt: The prompt to send.
temperature: Sampling temperature.
step_name: Identifier for logging.
response_format: Optional OpenAI ``response_format`` dict
(e.g. ``{"type": "json_object"}`` for Deepseek JSON mode).
"""
messages = [{"role": "user", "content": prompt}]
extra_body: dict[str, Any] = {"thinking": {"type": "disabled"}}
prompt_preview = _truncate_prompt_for_log(prompt)
logger.info("[%s] Deepseek request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
kwargs: dict[str, Any] = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"extra_body": extra_body,
}
if response_format is not None:
kwargs["response_format"] = response_format
try:
response = await self._client.chat.completions.create(**kwargs)
content = response.choices[0].message.content or ""
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"[%s] Deepseek request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)",
step_name,
elapsed_ms,
response.usage.prompt_tokens if response.usage else "?",
response.usage.completion_tokens if response.usage else "?",
)
return content
except (APITimeoutError, APIError) as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.error("[%s] Deepseek API error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientDPError from exc
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.error("[%s] Unexpected Deepseek error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientDPError from exc
async def complete_structured(
self,
prompt: str,
pydantic_model: Any,
step_name: str = "QueryDecomposer",
) -> Any:
"""Structured output via Deepseek's JSON mode + client-side validation.
Deepseek supports ``response_format={"type": "json_object"}`` (which
guarantees valid JSON) but not OpenAI's ``json_schema`` mode (which
would validate against a specific schema). We use the JSON mode to
get a guaranteed-valid JSON response, then validate it client-side
against *pydantic_model*.
"""
prompt_preview = _truncate_prompt_for_log(prompt, first_chars=300, last_chars=100)
logger.info("[%s] Deepseek structured request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
try:
response = await self.complete(
prompt=prompt,
temperature=0.0,
step_name=step_name,
response_format={"type": "json_object"},
)
except Exception:
raise
extracted = response.strip()
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", extracted, re.DOTALL)
if match:
extracted = match.group(1).strip()
# The decompose prompt asks for a "JSON array of strings", so the LLM
# may return a bare array. Wrap it into the {"questions": [...]} shape
# that SubQuestions expects.
try:
parsed = json.loads(extracted)
except json.JSONDecodeError:
pass
else:
if isinstance(parsed, list):
extracted = json.dumps({"questions": parsed})
try:
result = pydantic_model.model_validate_json(extracted)
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.error(
"[%s] Deepseek structured JSON parse failed after %.2fms. "
"Raw response (first 500 chars): %s",
step_name,
elapsed_ms,
response[:500],
exc_info=True,
)
raise LLMClientDPError from exc
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"[%s] Deepseek structured request completed in %.2fms. Result: %s",
step_name,
elapsed_ms,
getattr(result, "model_dump", lambda: result)(),
)
return result
async def close(self) -> None:
"""Close the underlying HTTP client."""
await self._client.close()