"""Deepseek API client for the query decomposition step only. Provides `LLMClientDP`, a lightweight async client targeting the Deepseek API (deepseek-v4-pro) with thinking mode always disabled. Uses the same OpenAI-compatible SDK as `LLMClient` but with Deepseek-specific extra_body and a separate set of Settings fields (dp_*). Only implements `complete()` and `complete_structured()` — the two methods consumed by `QueryDecomposer`. ``complete_structured()`` uses a manual JSON-extraction approach because Deepseek does not support OpenAI's ``response_format`` json_schema mode yet. """ import json import logging import re import time from typing import Any import httpx from openai import AsyncOpenAI, APIError, APITimeoutError from app.core.config import Settings logger = logging.getLogger(__name__) class LLMClientDPError(Exception): """Raised when a Deepseek API call fails.""" def _truncate_prompt_for_log(prompt: str, first_chars: int = 100, last_chars: int = 100) -> str: if len(prompt) <= first_chars + last_chars: return prompt return ( f"{prompt[:first_chars]}..." f"({len(prompt) - first_chars - last_chars} chars omitted)..." f"{prompt[-last_chars:]}" ) def _pydantic_to_json_instruction(model: Any) -> str: """Build a JSON-format instruction from a Pydantic model's schema. Follows the Deepseek JSON Output guide: the prompt must contain the word "json" and an example of the expected shape. The model schema is converted into a human-readable text description with a filled-in example. """ schema = model.model_json_schema() props = schema.get("properties", {}) title = schema.get("title", model.__name__) parts: list[str] = [] parts.append(f"Output the result in JSON format as a {title} object.") # Build an example by filling each field with a representative value. example: dict[str, Any] = {} for name, info in props.items(): t = info.get("type", "any") desc = info.get("description", "") if t == "array": items = info.get("items", {}) item_type = items.get("type", "string") min_items = info.get("minItems", 1) parts.append( f'- "{name}": array of {item_type} ' f"(min {min_items}" + (f", max {info['maxItems']}" if info.get("maxItems") else "") + f") — {desc}" ) example[name] = [f"<{item_type}_1>", f"<{item_type}_2>"] elif t == "string": parts.append(f'- "{name}": {t} — {desc}') example[name] = f"<{desc[:40]}>" elif t == "integer" or t == "number": parts.append(f'- "{name}": {t} — {desc}') example[name] = 0 else: parts.append(f'- "{name}": {t} — {desc}') example[name] = f"<{t}>" parts.append("") parts.append("EXAMPLE JSON OUTPUT:") parts.append(json.dumps(example, indent=2, ensure_ascii=False)) return "\n".join(parts) class LLMClientDP: """Async Deepseek API client for query decomposition. Always disables thinking mode via ``extra_body={"thinking": {"type": "disabled"}}``. Uses the OpenAI-compatible SDK with Deepseek's base URL. Falls back to ``settings.llm_api_key`` when ``settings.dp_api_key`` is empty. """ def __init__(self, settings: Settings) -> None: api_key = settings.dp_api_key or settings.llm_api_key self.model = settings.dp_model_name self._client = AsyncOpenAI( base_url=settings.dp_base_url.rstrip("/"), api_key=api_key, timeout=settings.llm_timeout, http_client=httpx.AsyncClient( headers={"Content-Type": "application/json"}, ), ) async def complete( self, prompt: str, temperature: float = 0.7, step_name: str = "QueryDecomposer", response_format: dict[str, Any] | None = None, ) -> str: """Send a chat completion request with thinking disabled. Used as the fallback path by ``QueryDecomposer.decompose()`` when ``complete_structured()`` fails. Args: prompt: The prompt to send. temperature: Sampling temperature. step_name: Identifier for logging. response_format: Optional OpenAI ``response_format`` dict (e.g. ``{"type": "json_object"}`` for Deepseek JSON mode). """ messages = [{"role": "user", "content": prompt}] extra_body: dict[str, Any] = {"thinking": {"type": "disabled"}} prompt_preview = _truncate_prompt_for_log(prompt) logger.info("[%s] Deepseek request started. Prompt: %s", step_name, prompt_preview) start_time = time.perf_counter() kwargs: dict[str, Any] = { "model": self.model, "messages": messages, "temperature": temperature, "extra_body": extra_body, } if response_format is not None: kwargs["response_format"] = response_format try: response = await self._client.chat.completions.create(**kwargs) content = response.choices[0].message.content or "" elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.info( "[%s] Deepseek request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)", step_name, elapsed_ms, response.usage.prompt_tokens if response.usage else "?", response.usage.completion_tokens if response.usage else "?", ) return content except (APITimeoutError, APIError) as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.error("[%s] Deepseek API error after %.2fms: %s", step_name, elapsed_ms, exc) raise LLMClientDPError from exc except Exception as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.error("[%s] Unexpected Deepseek error after %.2fms: %s", step_name, elapsed_ms, exc) raise LLMClientDPError from exc async def complete_structured( self, prompt: str, pydantic_model: Any, step_name: str = "QueryDecomposer", ) -> Any: """Structured output via Deepseek's JSON mode + client-side validation. Deepseek supports ``response_format={"type": "json_object"}`` (which guarantees valid JSON) but not OpenAI's ``json_schema`` mode (which would validate against a specific schema). We inject a JSON format instruction derived from *pydantic_model* into the prompt (per the Deepseek JSON Output guide), then validate client-side. """ prompt_preview = _truncate_prompt_for_log(prompt, first_chars=300, last_chars=100) logger.info("[%s] Deepseek structured request started. Prompt: %s", step_name, prompt_preview) start_time = time.perf_counter() # Inject JSON format instruction from the Pydantic model. json_instruction = _pydantic_to_json_instruction(pydantic_model) full_prompt = f"{prompt}\n\n{json_instruction}" try: response = await self.complete( prompt=full_prompt, temperature=0.0, step_name=step_name, response_format={"type": "json_object"}, ) except Exception: raise extracted = response.strip() match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", extracted, re.DOTALL) if match: extracted = match.group(1).strip() # The decompose prompt asks for a "JSON array of strings", so the LLM # may return a bare array. Wrap it into the {"questions": [...]} shape # that SubQuestions expects. try: parsed = json.loads(extracted) except json.JSONDecodeError: pass else: if isinstance(parsed, list): extracted = json.dumps({"questions": parsed}) try: result = pydantic_model.model_validate_json(extracted) except Exception as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.error( "[%s] Deepseek structured JSON parse failed after %.2fms. " "Raw response (first 500 chars): %s", step_name, elapsed_ms, response[:500], exc_info=True, ) raise LLMClientDPError from exc elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.info( "[%s] Deepseek structured request completed in %.2fms. Result: %s", step_name, elapsed_ms, getattr(result, "model_dump", lambda: result)(), ) return result async def close(self) -> None: """Close the underlying HTTP client.""" await self._client.close()