240 lines
8.9 KiB
Python
240 lines
8.9 KiB
Python
"""Deepseek API client for the query decomposition step only.
|
|
|
|
Provides `LLMClientDP`, a lightweight async client targeting the Deepseek
|
|
API (deepseek-v4-pro) with thinking mode always disabled. Uses the same
|
|
OpenAI-compatible SDK as `LLMClient` but with Deepseek-specific extra_body
|
|
and a separate set of Settings fields (dp_*).
|
|
|
|
Only implements `complete()` and `complete_structured()` — the two methods
|
|
consumed by `QueryDecomposer`. ``complete_structured()`` uses a manual
|
|
JSON-extraction approach because Deepseek does not support OpenAI's
|
|
``response_format`` json_schema mode yet.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from openai import AsyncOpenAI, APIError, APITimeoutError
|
|
|
|
from app.core.config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LLMClientDPError(Exception):
|
|
"""Raised when a Deepseek API call fails."""
|
|
|
|
|
|
def _truncate_prompt_for_log(prompt: str, first_chars: int = 100, last_chars: int = 100) -> str:
|
|
if len(prompt) <= first_chars + last_chars:
|
|
return prompt
|
|
return (
|
|
f"{prompt[:first_chars]}..."
|
|
f"({len(prompt) - first_chars - last_chars} chars omitted)..."
|
|
f"{prompt[-last_chars:]}"
|
|
)
|
|
|
|
|
|
def _pydantic_to_json_instruction(model: Any) -> str:
|
|
"""Build a JSON-format instruction from a Pydantic model's schema.
|
|
|
|
Follows the Deepseek JSON Output guide: the prompt must contain the word
|
|
"json" and an example of the expected shape. The model schema is
|
|
converted into a human-readable text description with a filled-in example.
|
|
"""
|
|
schema = model.model_json_schema()
|
|
props = schema.get("properties", {})
|
|
title = schema.get("title", model.__name__)
|
|
|
|
parts: list[str] = []
|
|
parts.append(f"Output the result in JSON format as a {title} object.")
|
|
|
|
# Build an example by filling each field with a representative value.
|
|
example: dict[str, Any] = {}
|
|
for name, info in props.items():
|
|
t = info.get("type", "any")
|
|
desc = info.get("description", "")
|
|
if t == "array":
|
|
items = info.get("items", {})
|
|
item_type = items.get("type", "string")
|
|
min_items = info.get("minItems", 1)
|
|
parts.append(
|
|
f'- "{name}": array of {item_type} '
|
|
f"(min {min_items}"
|
|
+ (f", max {info['maxItems']}" if info.get("maxItems") else "")
|
|
+ f") — {desc}"
|
|
)
|
|
example[name] = [f"<{item_type}_1>", f"<{item_type}_2>"]
|
|
elif t == "string":
|
|
parts.append(f'- "{name}": {t} — {desc}')
|
|
example[name] = f"<{desc[:40]}>"
|
|
elif t == "integer" or t == "number":
|
|
parts.append(f'- "{name}": {t} — {desc}')
|
|
example[name] = 0
|
|
else:
|
|
parts.append(f'- "{name}": {t} — {desc}')
|
|
example[name] = f"<{t}>"
|
|
|
|
parts.append("")
|
|
parts.append("EXAMPLE JSON OUTPUT:")
|
|
parts.append(json.dumps(example, indent=2, ensure_ascii=False))
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
class LLMClientDP:
|
|
"""Async Deepseek API client for query decomposition.
|
|
|
|
Always disables thinking mode via ``extra_body={"thinking": {"type": "disabled"}}``.
|
|
Uses the OpenAI-compatible SDK with Deepseek's base URL.
|
|
|
|
Falls back to ``settings.llm_api_key`` when ``settings.dp_api_key`` is empty.
|
|
"""
|
|
|
|
def __init__(self, settings: Settings) -> None:
|
|
api_key = settings.dp_api_key or settings.llm_api_key
|
|
self.model = settings.dp_model_name
|
|
self._client = AsyncOpenAI(
|
|
base_url=settings.dp_base_url.rstrip("/"),
|
|
api_key=api_key,
|
|
timeout=settings.llm_timeout,
|
|
http_client=httpx.AsyncClient(
|
|
headers={"Content-Type": "application/json"},
|
|
),
|
|
)
|
|
|
|
async def complete(
|
|
self,
|
|
prompt: str,
|
|
temperature: float = 0.7,
|
|
step_name: str = "QueryDecomposer",
|
|
response_format: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Send a chat completion request with thinking disabled.
|
|
|
|
Used as the fallback path by ``QueryDecomposer.decompose()`` when
|
|
``complete_structured()`` fails.
|
|
|
|
Args:
|
|
prompt: The prompt to send.
|
|
temperature: Sampling temperature.
|
|
step_name: Identifier for logging.
|
|
response_format: Optional OpenAI ``response_format`` dict
|
|
(e.g. ``{"type": "json_object"}`` for Deepseek JSON mode).
|
|
"""
|
|
messages = [{"role": "user", "content": prompt}]
|
|
extra_body: dict[str, Any] = {"thinking": {"type": "disabled"}}
|
|
|
|
prompt_preview = _truncate_prompt_for_log(prompt)
|
|
logger.info("[%s] Deepseek request started. Prompt: %s", step_name, prompt_preview)
|
|
start_time = time.perf_counter()
|
|
|
|
kwargs: dict[str, Any] = {
|
|
"model": self.model,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
"extra_body": extra_body,
|
|
}
|
|
if response_format is not None:
|
|
kwargs["response_format"] = response_format
|
|
|
|
try:
|
|
response = await self._client.chat.completions.create(**kwargs)
|
|
content = response.choices[0].message.content or ""
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
logger.info(
|
|
"[%s] Deepseek request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)",
|
|
step_name,
|
|
elapsed_ms,
|
|
response.usage.prompt_tokens if response.usage else "?",
|
|
response.usage.completion_tokens if response.usage else "?",
|
|
)
|
|
return content
|
|
except (APITimeoutError, APIError) as exc:
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
logger.error("[%s] Deepseek API error after %.2fms: %s", step_name, elapsed_ms, exc)
|
|
raise LLMClientDPError from exc
|
|
except Exception as exc:
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
logger.error("[%s] Unexpected Deepseek error after %.2fms: %s", step_name, elapsed_ms, exc)
|
|
raise LLMClientDPError from exc
|
|
|
|
async def complete_structured(
|
|
self,
|
|
prompt: str,
|
|
pydantic_model: Any,
|
|
step_name: str = "QueryDecomposer",
|
|
) -> Any:
|
|
"""Structured output via Deepseek's JSON mode + client-side validation.
|
|
|
|
Deepseek supports ``response_format={"type": "json_object"}`` (which
|
|
guarantees valid JSON) but not OpenAI's ``json_schema`` mode (which
|
|
would validate against a specific schema). We inject a JSON format
|
|
instruction derived from *pydantic_model* into the prompt (per the
|
|
Deepseek JSON Output guide), then validate client-side.
|
|
"""
|
|
prompt_preview = _truncate_prompt_for_log(prompt, first_chars=300, last_chars=100)
|
|
logger.info("[%s] Deepseek structured request started. Prompt: %s", step_name, prompt_preview)
|
|
start_time = time.perf_counter()
|
|
|
|
# Inject JSON format instruction from the Pydantic model.
|
|
json_instruction = _pydantic_to_json_instruction(pydantic_model)
|
|
full_prompt = f"{prompt}\n\n{json_instruction}"
|
|
|
|
try:
|
|
response = await self.complete(
|
|
prompt=full_prompt,
|
|
temperature=0.0,
|
|
step_name=step_name,
|
|
response_format={"type": "json_object"},
|
|
)
|
|
except Exception:
|
|
raise
|
|
|
|
extracted = response.strip()
|
|
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", extracted, re.DOTALL)
|
|
if match:
|
|
extracted = match.group(1).strip()
|
|
|
|
# The decompose prompt asks for a "JSON array of strings", so the LLM
|
|
# may return a bare array. Wrap it into the {"questions": [...]} shape
|
|
# that SubQuestions expects.
|
|
try:
|
|
parsed = json.loads(extracted)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
else:
|
|
if isinstance(parsed, list):
|
|
extracted = json.dumps({"questions": parsed})
|
|
|
|
try:
|
|
result = pydantic_model.model_validate_json(extracted)
|
|
except Exception as exc:
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
logger.error(
|
|
"[%s] Deepseek structured JSON parse failed after %.2fms. "
|
|
"Raw response (first 500 chars): %s",
|
|
step_name,
|
|
elapsed_ms,
|
|
response[:500],
|
|
exc_info=True,
|
|
)
|
|
raise LLMClientDPError from exc
|
|
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
logger.info(
|
|
"[%s] Deepseek structured request completed in %.2fms. Result: %s",
|
|
step_name,
|
|
elapsed_ms,
|
|
getattr(result, "model_dump", lambda: result)(),
|
|
)
|
|
return result
|
|
|
|
async def close(self) -> None:
|
|
"""Close the underlying HTTP client."""
|
|
await self._client.close()
|