feat: add LLMClientDP for Deepseek decompose (Phase 6)

Uses Deepseek's json_object response_format (not json_schema, which Deepseek does not support). Always disables thinking mode. Includes unit tests (12) and acceptance tests (5).

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Woody 2026-05-04 14:58:53 +08:00
parent 73ae621f3b
commit 849beb4d4e
3 changed files with 535 additions and 0 deletions

View File

@ -0,0 +1,188 @@
"""Deepseek API client for the query decomposition step only.
Provides `LLMClientDP`, a lightweight async client targeting the Deepseek
API (deepseek-v4-pro) with thinking mode always disabled. Uses the same
OpenAI-compatible SDK as `LLMClient` but with Deepseek-specific extra_body
and a separate set of Settings fields (dp_*).
Only implements `complete()` and `complete_structured()` the two methods
consumed by `QueryDecomposer`. ``complete_structured()`` uses a manual
JSON-extraction approach because Deepseek does not support OpenAI's
``response_format`` json_schema mode yet.
"""
import json
import logging
import re
import time
from typing import Any
import httpx
from openai import AsyncOpenAI, APIError, APITimeoutError
from app.core.config import Settings
logger = logging.getLogger(__name__)
class LLMClientDPError(Exception):
"""Raised when a Deepseek API call fails."""
def _truncate_prompt_for_log(prompt: str, first_chars: int = 100, last_chars: int = 100) -> str:
if len(prompt) <= first_chars + last_chars:
return prompt
return (
f"{prompt[:first_chars]}..."
f"({len(prompt) - first_chars - last_chars} chars omitted)..."
f"{prompt[-last_chars:]}"
)
class LLMClientDP:
"""Async Deepseek API client for query decomposition.
Always disables thinking mode via ``extra_body={"thinking": {"type": "disabled"}}``.
Uses the OpenAI-compatible SDK with Deepseek's base URL.
Falls back to ``settings.llm_api_key`` when ``settings.dp_api_key`` is empty.
"""
def __init__(self, settings: Settings) -> None:
api_key = settings.dp_api_key or settings.llm_api_key
self.model = settings.dp_model_name
self._client = AsyncOpenAI(
base_url=settings.dp_base_url.rstrip("/"),
api_key=api_key,
timeout=settings.llm_timeout,
http_client=httpx.AsyncClient(
headers={"Content-Type": "application/json"},
),
)
async def complete(
self,
prompt: str,
temperature: float = 0.7,
step_name: str = "QueryDecomposer",
response_format: dict[str, Any] | None = None,
) -> str:
"""Send a chat completion request with thinking disabled.
Used as the fallback path by ``QueryDecomposer.decompose()`` when
``complete_structured()`` fails.
Args:
prompt: The prompt to send.
temperature: Sampling temperature.
step_name: Identifier for logging.
response_format: Optional OpenAI ``response_format`` dict
(e.g. ``{"type": "json_object"}`` for Deepseek JSON mode).
"""
messages = [{"role": "user", "content": prompt}]
extra_body: dict[str, Any] = {"thinking": {"type": "disabled"}}
prompt_preview = _truncate_prompt_for_log(prompt)
logger.info("[%s] Deepseek request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
kwargs: dict[str, Any] = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"extra_body": extra_body,
}
if response_format is not None:
kwargs["response_format"] = response_format
try:
response = await self._client.chat.completions.create(**kwargs)
content = response.choices[0].message.content or ""
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"[%s] Deepseek request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)",
step_name,
elapsed_ms,
response.usage.prompt_tokens if response.usage else "?",
response.usage.completion_tokens if response.usage else "?",
)
return content
except (APITimeoutError, APIError) as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.error("[%s] Deepseek API error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientDPError from exc
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.error("[%s] Unexpected Deepseek error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientDPError from exc
async def complete_structured(
self,
prompt: str,
pydantic_model: Any,
step_name: str = "QueryDecomposer",
) -> Any:
"""Structured output via Deepseek's JSON mode + client-side validation.
Deepseek supports ``response_format={"type": "json_object"}`` (which
guarantees valid JSON) but not OpenAI's ``json_schema`` mode (which
would validate against a specific schema). We use the JSON mode to
get a guaranteed-valid JSON response, then validate it client-side
against *pydantic_model*.
"""
prompt_preview = _truncate_prompt_for_log(prompt, first_chars=300, last_chars=100)
logger.info("[%s] Deepseek structured request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
try:
response = await self.complete(
prompt=prompt,
temperature=0.0,
step_name=step_name,
response_format={"type": "json_object"},
)
except Exception:
raise
extracted = response.strip()
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", extracted, re.DOTALL)
if match:
extracted = match.group(1).strip()
# The decompose prompt asks for a "JSON array of strings", so the LLM
# may return a bare array. Wrap it into the {"questions": [...]} shape
# that SubQuestions expects.
try:
parsed = json.loads(extracted)
except json.JSONDecodeError:
pass
else:
if isinstance(parsed, list):
extracted = json.dumps({"questions": parsed})
try:
result = pydantic_model.model_validate_json(extracted)
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.error(
"[%s] Deepseek structured JSON parse failed after %.2fms. "
"Raw response (first 500 chars): %s",
step_name,
elapsed_ms,
response[:500],
exc_info=True,
)
raise LLMClientDPError from exc
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"[%s] Deepseek structured request completed in %.2fms. Result: %s",
step_name,
elapsed_ms,
getattr(result, "model_dump", lambda: result)(),
)
return result
async def close(self) -> None:
"""Close the underlying HTTP client."""
await self._client.close()

View File

@ -0,0 +1,111 @@
"""Acceptance test: Phase 6 Deepseek API decompose with real LLM.
Prerequisites:
- backend/.env configured with valid DP_API_KEY (Deepseek API key)
- Network access to Deepseek API (https://api.deepseek.com)
These tests verify that LLMClientDP can call the real Deepseek API
(deepseek-v4-pro, thinking disabled) and return valid sub-questions
via both structured output and legacy fallback paths.
"""
import pytest
from app.core.config import get_settings
from app.models.decompose import SubQuestions
from app.services.llm_client_dp import LLMClientDP
@pytest.fixture
def client():
settings = get_settings()
if not settings.dp_api_key and not settings.llm_api_key:
pytest.skip("DP_API_KEY not configured in .env")
return LLMClientDP(settings)
@pytest.mark.acceptance
@pytest.mark.slow
class TestLLMClientDPAcceptance:
async def test_structured_decompose_cantonese(self, client):
"""Cantonese question → structured output → valid SubQuestions."""
result = await client.complete_structured(
prompt=(
"Given this question: '立法會今日討論咗咩議題?'\n\n"
"Break it down into 2-5 simplified sub-questions that would help "
"search for relevant information. Each sub-question should be short "
"and focused on one aspect. Return as a JSON array of strings."
),
pydantic_model=SubQuestions,
step_name="QueryDecomposer",
)
assert isinstance(result, SubQuestions)
assert len(result.questions) >= 1
assert len(result.questions) <= 5
assert all(isinstance(q, str) and len(q) > 0 for q in result.questions)
async def test_structured_decompose_english(self, client):
"""English question → structured output → valid SubQuestions."""
result = await client.complete_structured(
prompt=(
"Given this question: 'What are the key provisions of the NEC4 contract?'\n\n"
"Break it down into 2-5 simplified sub-questions that would help "
"search for relevant information. Each sub-question should be short "
"and focused on one aspect. Return as a JSON array of strings."
),
pydantic_model=SubQuestions,
step_name="QueryDecomposer",
)
assert isinstance(result, SubQuestions)
assert len(result.questions) >= 1
assert len(result.questions) <= 5
assert all(isinstance(q, str) and len(q) > 0 for q in result.questions)
async def test_structured_decompose_simple_question(self, client):
"""Simple question → structured output → at least 1 sub-question."""
result = await client.complete_structured(
prompt=(
"Given this question: 'Who created Python?'\n\n"
"Break it down into 2-5 simplified sub-questions that would help "
"search for relevant information. Each sub-question should be short "
"and focused on one aspect. Return as a JSON array of strings."
),
pydantic_model=SubQuestions,
step_name="QueryDecomposer",
)
assert isinstance(result, SubQuestions)
assert len(result.questions) >= 1
async def test_complete_fallback_returns_text(self, client):
"""Legacy complete() path returns non-empty string from Deepseek."""
response = await client.complete(
prompt="Say hello in one word.",
temperature=0.7,
step_name="QueryDecomposer",
)
assert response is not None
assert len(response) > 0
assert isinstance(response, str)
async def test_complete_extra_body_has_thinking_disabled(self, client):
"""Verify thinking is disabled by checking the client's extra_body setup.
Since we can't introspect the actual HTTP request in an acceptance
test, we verify that the client is configured correctly by checking
that the model matches the expected value and that the client
completes successfully (which it wouldn't if thinking mode
interfered with temperature on a non-thinking-incompatible call).
"""
assert client.model == "deepseek-v4-pro"
response = await client.complete(
prompt="What is 2+2? Answer with just the number.",
temperature=0.0,
step_name="QueryDecomposer",
)
assert "4" in response

View File

@ -0,0 +1,236 @@
"""Tests for Phase 6 LLMClientDP — Deepseek API client for decomposition.
Coverage:
- Instantiation with Settings (dp_* fields, API key fallback)
- complete() sends extra_body with thinking disabled
- complete() success / API error / timeout error paths
- complete_structured() via mocked LangChain model
- complete_structured() error wrapping in LLMClientDPError
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from openai import APIError, APITimeoutError
from app.core.config import Settings
from app.models.decompose import SubQuestions
from app.services.llm_client_dp import LLMClientDP, LLMClientDPError
class TestLLMClientDPInstantiation:
"""Tests for LLMClientDP construction and config resolution."""
def test_uses_dp_config_fields(self):
"""Should use dp_* fields when they are set."""
settings = Settings(
dp_base_url="https://api.deepseek.com",
dp_api_key="dp-key-123",
dp_model_name="deepseek-v4-pro",
)
client = LLMClientDP(settings)
assert client.model == "deepseek-v4-pro"
def test_falls_back_to_llm_api_key_when_dp_empty(self):
"""When dp_api_key is empty, fall back to llm_api_key."""
settings = Settings(
dp_base_url="https://api.deepseek.com",
dp_api_key="",
llm_api_key="fallback-key",
dp_model_name="deepseek-v4-pro",
)
client = LLMClientDP(settings)
assert client.model == "deepseek-v4-pro"
class TestLLMClientDPComplete:
"""Tests for LLMClientDP.complete() — the fallback text path."""
@pytest.fixture
def settings(self):
return Settings(
dp_base_url="https://api.deepseek.com",
dp_api_key="test-key",
dp_model_name="deepseek-v4-pro",
)
@pytest.fixture
def client(self, settings):
return LLMClientDP(settings)
@pytest.fixture
def mock_response(self):
"""Build a mock OpenAI chat completion response."""
response = MagicMock()
response.choices = [MagicMock()]
response.choices[0].message.content = '["q1", "q2"]'
response.usage.prompt_tokens = 50
response.usage.completion_tokens = 10
return response
@pytest.mark.asyncio
async def test_complete_returns_text(self, client, mock_response):
"""complete() should return the LLM response text."""
with patch.object(
client._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_response
result = await client.complete(
prompt="Decompose this question",
temperature=0.7,
step_name="QueryDecomposer",
)
assert result == '["q1", "q2"]'
mock_create.assert_called_once()
call_kwargs = mock_create.call_args.kwargs
assert call_kwargs["model"] == "deepseek-v4-pro"
assert call_kwargs["temperature"] == 0.7
assert call_kwargs["extra_body"]["thinking"]["type"] == "disabled"
@pytest.mark.asyncio
async def test_complete_sends_thinking_disabled(self, client, mock_response):
"""Every complete() call must include thinking=disabled in extra_body."""
with patch.object(
client._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_response
await client.complete(prompt="Test")
extra_body = mock_create.call_args.kwargs["extra_body"]
assert extra_body == {"thinking": {"type": "disabled"}}
@pytest.mark.asyncio
async def test_complete_wraps_api_error(self, client):
"""APIError should be wrapped in LLMClientDPError."""
with patch.object(
client._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.side_effect = APIError(
message="Rate limited",
request=MagicMock(),
body=None,
)
with pytest.raises(LLMClientDPError):
await client.complete(prompt="Test")
@pytest.mark.asyncio
async def test_complete_wraps_timeout_error(self, client):
"""APITimeoutError should be wrapped in LLMClientDPError."""
with patch.object(
client._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.side_effect = APITimeoutError(request=MagicMock())
with pytest.raises(LLMClientDPError):
await client.complete(prompt="Test")
@pytest.mark.asyncio
async def test_complete_wraps_unexpected_error(self, client):
"""Unexpected exceptions should also be wrapped in LLMClientDPError."""
with patch.object(
client._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.side_effect = RuntimeError("unexpected")
with pytest.raises(LLMClientDPError):
await client.complete(prompt="Test")
@pytest.mark.asyncio
async def test_complete_handles_empty_content(self, client):
"""None or empty response content should return empty string."""
response = MagicMock()
response.choices = [MagicMock()]
response.choices[0].message.content = None
response.usage.prompt_tokens = 10
response.usage.completion_tokens = 0
with patch.object(
client._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = response
result = await client.complete(prompt="Test")
assert result == ""
class TestLLMClientDPCompleteStructured:
"""Tests for LLMClientDP.complete_structured() — JSON extraction path."""
@pytest.fixture
def settings(self):
return Settings(
dp_base_url="https://api.deepseek.com",
dp_api_key="test-key",
dp_model_name="deepseek-v4-pro",
)
@pytest.fixture
def client(self, settings):
return LLMClientDP(settings)
@pytest.mark.asyncio
async def test_complete_structured_returns_validated_model(self, client):
"""Should call complete() with json_object format and parse response."""
expected = SubQuestions(questions=["Q1", "Q2", "Q3"])
with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete:
mock_complete.return_value = '{"questions": ["Q1", "Q2", "Q3"]}'
result = await client.complete_structured(
prompt="Decompose: test question",
pydantic_model=SubQuestions,
step_name="QueryDecomposer",
)
assert result == expected
assert result.questions == ["Q1", "Q2", "Q3"]
mock_complete.assert_called_once()
# Verify Deepseek JSON mode is used
call_kwargs = mock_complete.call_args.kwargs
assert call_kwargs["response_format"] == {"type": "json_object"}
assert call_kwargs["temperature"] == 0.0
@pytest.mark.asyncio
async def test_complete_structured_with_markdown_fence(self, client):
"""Should strip markdown code fences before JSON parsing."""
expected = SubQuestions(questions=["Only one"])
with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete:
mock_complete.return_value = '```json\n{"questions": ["Only one"]}\n```'
result = await client.complete_structured(
prompt="Test",
pydantic_model=SubQuestions,
)
assert result == expected
@pytest.mark.asyncio
async def test_complete_structured_invalid_json_raises(self, client):
"""Unparseable JSON should raise LLMClientDPError."""
with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete:
mock_complete.return_value = "not json at all"
with pytest.raises(LLMClientDPError):
await client.complete_structured(
prompt="Test",
pydantic_model=SubQuestions,
)
@pytest.mark.asyncio
async def test_complete_structured_wrong_schema_raises(self, client):
"""Valid JSON but wrong Pydantic schema should raise LLMClientDPError."""
with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete:
mock_complete.return_value = '{"wrong_field": [1, 2, 3]}'
with pytest.raises(LLMClientDPError):
await client.complete_structured(
prompt="Test",
pydantic_model=SubQuestions,
)