From 849beb4d4e8ce09ab122797d29d582804918a56e Mon Sep 17 00:00:00 2001 From: Woody Date: Mon, 4 May 2026 14:58:53 +0800 Subject: [PATCH] feat: add LLMClientDP for Deepseek decompose (Phase 6) Uses Deepseek's json_object response_format (not json_schema, which Deepseek does not support). Always disables thinking mode. Includes unit tests (12) and acceptance tests (5). Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- backend/app/services/llm_client_dp.py | 188 ++++++++++++++ .../test_acceptance_phase6_dp_decompose.py | 111 ++++++++ backend/app/test/test_phase6_llm_client_dp.py | 236 ++++++++++++++++++ 3 files changed, 535 insertions(+) create mode 100644 backend/app/services/llm_client_dp.py create mode 100644 backend/app/test/acceptance/test_acceptance_phase6_dp_decompose.py create mode 100644 backend/app/test/test_phase6_llm_client_dp.py diff --git a/backend/app/services/llm_client_dp.py b/backend/app/services/llm_client_dp.py new file mode 100644 index 0000000..5baf05d --- /dev/null +++ b/backend/app/services/llm_client_dp.py @@ -0,0 +1,188 @@ +"""Deepseek API client for the query decomposition step only. + +Provides `LLMClientDP`, a lightweight async client targeting the Deepseek +API (deepseek-v4-pro) with thinking mode always disabled. Uses the same +OpenAI-compatible SDK as `LLMClient` but with Deepseek-specific extra_body +and a separate set of Settings fields (dp_*). + +Only implements `complete()` and `complete_structured()` — the two methods +consumed by `QueryDecomposer`. ``complete_structured()`` uses a manual +JSON-extraction approach because Deepseek does not support OpenAI's +``response_format`` json_schema mode yet. +""" + +import json +import logging +import re +import time +from typing import Any + +import httpx +from openai import AsyncOpenAI, APIError, APITimeoutError + +from app.core.config import Settings + +logger = logging.getLogger(__name__) + + +class LLMClientDPError(Exception): + """Raised when a Deepseek API call fails.""" + + +def _truncate_prompt_for_log(prompt: str, first_chars: int = 100, last_chars: int = 100) -> str: + if len(prompt) <= first_chars + last_chars: + return prompt + return ( + f"{prompt[:first_chars]}..." + f"({len(prompt) - first_chars - last_chars} chars omitted)..." + f"{prompt[-last_chars:]}" + ) + + +class LLMClientDP: + """Async Deepseek API client for query decomposition. + + Always disables thinking mode via ``extra_body={"thinking": {"type": "disabled"}}``. + Uses the OpenAI-compatible SDK with Deepseek's base URL. + + Falls back to ``settings.llm_api_key`` when ``settings.dp_api_key`` is empty. + """ + + def __init__(self, settings: Settings) -> None: + api_key = settings.dp_api_key or settings.llm_api_key + self.model = settings.dp_model_name + self._client = AsyncOpenAI( + base_url=settings.dp_base_url.rstrip("/"), + api_key=api_key, + timeout=settings.llm_timeout, + http_client=httpx.AsyncClient( + headers={"Content-Type": "application/json"}, + ), + ) + + async def complete( + self, + prompt: str, + temperature: float = 0.7, + step_name: str = "QueryDecomposer", + response_format: dict[str, Any] | None = None, + ) -> str: + """Send a chat completion request with thinking disabled. + + Used as the fallback path by ``QueryDecomposer.decompose()`` when + ``complete_structured()`` fails. + + Args: + prompt: The prompt to send. + temperature: Sampling temperature. + step_name: Identifier for logging. + response_format: Optional OpenAI ``response_format`` dict + (e.g. ``{"type": "json_object"}`` for Deepseek JSON mode). + """ + messages = [{"role": "user", "content": prompt}] + extra_body: dict[str, Any] = {"thinking": {"type": "disabled"}} + + prompt_preview = _truncate_prompt_for_log(prompt) + logger.info("[%s] Deepseek request started. Prompt: %s", step_name, prompt_preview) + start_time = time.perf_counter() + + kwargs: dict[str, Any] = { + "model": self.model, + "messages": messages, + "temperature": temperature, + "extra_body": extra_body, + } + if response_format is not None: + kwargs["response_format"] = response_format + + try: + response = await self._client.chat.completions.create(**kwargs) + content = response.choices[0].message.content or "" + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.info( + "[%s] Deepseek request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)", + step_name, + elapsed_ms, + response.usage.prompt_tokens if response.usage else "?", + response.usage.completion_tokens if response.usage else "?", + ) + return content + except (APITimeoutError, APIError) as exc: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.error("[%s] Deepseek API error after %.2fms: %s", step_name, elapsed_ms, exc) + raise LLMClientDPError from exc + except Exception as exc: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.error("[%s] Unexpected Deepseek error after %.2fms: %s", step_name, elapsed_ms, exc) + raise LLMClientDPError from exc + + async def complete_structured( + self, + prompt: str, + pydantic_model: Any, + step_name: str = "QueryDecomposer", + ) -> Any: + """Structured output via Deepseek's JSON mode + client-side validation. + + Deepseek supports ``response_format={"type": "json_object"}`` (which + guarantees valid JSON) but not OpenAI's ``json_schema`` mode (which + would validate against a specific schema). We use the JSON mode to + get a guaranteed-valid JSON response, then validate it client-side + against *pydantic_model*. + """ + prompt_preview = _truncate_prompt_for_log(prompt, first_chars=300, last_chars=100) + logger.info("[%s] Deepseek structured request started. Prompt: %s", step_name, prompt_preview) + start_time = time.perf_counter() + + try: + response = await self.complete( + prompt=prompt, + temperature=0.0, + step_name=step_name, + response_format={"type": "json_object"}, + ) + except Exception: + raise + + extracted = response.strip() + match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", extracted, re.DOTALL) + if match: + extracted = match.group(1).strip() + + # The decompose prompt asks for a "JSON array of strings", so the LLM + # may return a bare array. Wrap it into the {"questions": [...]} shape + # that SubQuestions expects. + try: + parsed = json.loads(extracted) + except json.JSONDecodeError: + pass + else: + if isinstance(parsed, list): + extracted = json.dumps({"questions": parsed}) + + try: + result = pydantic_model.model_validate_json(extracted) + except Exception as exc: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.error( + "[%s] Deepseek structured JSON parse failed after %.2fms. " + "Raw response (first 500 chars): %s", + step_name, + elapsed_ms, + response[:500], + exc_info=True, + ) + raise LLMClientDPError from exc + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.info( + "[%s] Deepseek structured request completed in %.2fms. Result: %s", + step_name, + elapsed_ms, + getattr(result, "model_dump", lambda: result)(), + ) + return result + + async def close(self) -> None: + """Close the underlying HTTP client.""" + await self._client.close() diff --git a/backend/app/test/acceptance/test_acceptance_phase6_dp_decompose.py b/backend/app/test/acceptance/test_acceptance_phase6_dp_decompose.py new file mode 100644 index 0000000..4dde819 --- /dev/null +++ b/backend/app/test/acceptance/test_acceptance_phase6_dp_decompose.py @@ -0,0 +1,111 @@ +"""Acceptance test: Phase 6 Deepseek API decompose with real LLM. + +Prerequisites: +- backend/.env configured with valid DP_API_KEY (Deepseek API key) +- Network access to Deepseek API (https://api.deepseek.com) + +These tests verify that LLMClientDP can call the real Deepseek API +(deepseek-v4-pro, thinking disabled) and return valid sub-questions +via both structured output and legacy fallback paths. +""" +import pytest + +from app.core.config import get_settings +from app.models.decompose import SubQuestions +from app.services.llm_client_dp import LLMClientDP + + +@pytest.fixture +def client(): + settings = get_settings() + if not settings.dp_api_key and not settings.llm_api_key: + pytest.skip("DP_API_KEY not configured in .env") + return LLMClientDP(settings) + + +@pytest.mark.acceptance +@pytest.mark.slow +class TestLLMClientDPAcceptance: + + async def test_structured_decompose_cantonese(self, client): + """Cantonese question → structured output → valid SubQuestions.""" + result = await client.complete_structured( + prompt=( + "Given this question: '立法會今日討論咗咩議題?'\n\n" + "Break it down into 2-5 simplified sub-questions that would help " + "search for relevant information. Each sub-question should be short " + "and focused on one aspect. Return as a JSON array of strings." + ), + pydantic_model=SubQuestions, + step_name="QueryDecomposer", + ) + + assert isinstance(result, SubQuestions) + assert len(result.questions) >= 1 + assert len(result.questions) <= 5 + assert all(isinstance(q, str) and len(q) > 0 for q in result.questions) + + async def test_structured_decompose_english(self, client): + """English question → structured output → valid SubQuestions.""" + result = await client.complete_structured( + prompt=( + "Given this question: 'What are the key provisions of the NEC4 contract?'\n\n" + "Break it down into 2-5 simplified sub-questions that would help " + "search for relevant information. Each sub-question should be short " + "and focused on one aspect. Return as a JSON array of strings." + ), + pydantic_model=SubQuestions, + step_name="QueryDecomposer", + ) + + assert isinstance(result, SubQuestions) + assert len(result.questions) >= 1 + assert len(result.questions) <= 5 + assert all(isinstance(q, str) and len(q) > 0 for q in result.questions) + + async def test_structured_decompose_simple_question(self, client): + """Simple question → structured output → at least 1 sub-question.""" + result = await client.complete_structured( + prompt=( + "Given this question: 'Who created Python?'\n\n" + "Break it down into 2-5 simplified sub-questions that would help " + "search for relevant information. Each sub-question should be short " + "and focused on one aspect. Return as a JSON array of strings." + ), + pydantic_model=SubQuestions, + step_name="QueryDecomposer", + ) + + assert isinstance(result, SubQuestions) + assert len(result.questions) >= 1 + + async def test_complete_fallback_returns_text(self, client): + """Legacy complete() path returns non-empty string from Deepseek.""" + response = await client.complete( + prompt="Say hello in one word.", + temperature=0.7, + step_name="QueryDecomposer", + ) + + assert response is not None + assert len(response) > 0 + assert isinstance(response, str) + + async def test_complete_extra_body_has_thinking_disabled(self, client): + """Verify thinking is disabled by checking the client's extra_body setup. + + Since we can't introspect the actual HTTP request in an acceptance + test, we verify that the client is configured correctly by checking + that the model matches the expected value and that the client + completes successfully (which it wouldn't if thinking mode + interfered with temperature on a non-thinking-incompatible call). + """ + assert client.model == "deepseek-v4-pro" + + response = await client.complete( + prompt="What is 2+2? Answer with just the number.", + temperature=0.0, + step_name="QueryDecomposer", + ) + + assert "4" in response diff --git a/backend/app/test/test_phase6_llm_client_dp.py b/backend/app/test/test_phase6_llm_client_dp.py new file mode 100644 index 0000000..d821b6c --- /dev/null +++ b/backend/app/test/test_phase6_llm_client_dp.py @@ -0,0 +1,236 @@ +"""Tests for Phase 6 LLMClientDP — Deepseek API client for decomposition. + +Coverage: +- Instantiation with Settings (dp_* fields, API key fallback) +- complete() sends extra_body with thinking disabled +- complete() success / API error / timeout error paths +- complete_structured() via mocked LangChain model +- complete_structured() error wrapping in LLMClientDPError +""" +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from openai import APIError, APITimeoutError + +from app.core.config import Settings +from app.models.decompose import SubQuestions +from app.services.llm_client_dp import LLMClientDP, LLMClientDPError + + +class TestLLMClientDPInstantiation: + """Tests for LLMClientDP construction and config resolution.""" + + def test_uses_dp_config_fields(self): + """Should use dp_* fields when they are set.""" + settings = Settings( + dp_base_url="https://api.deepseek.com", + dp_api_key="dp-key-123", + dp_model_name="deepseek-v4-pro", + ) + client = LLMClientDP(settings) + + assert client.model == "deepseek-v4-pro" + + def test_falls_back_to_llm_api_key_when_dp_empty(self): + """When dp_api_key is empty, fall back to llm_api_key.""" + settings = Settings( + dp_base_url="https://api.deepseek.com", + dp_api_key="", + llm_api_key="fallback-key", + dp_model_name="deepseek-v4-pro", + ) + client = LLMClientDP(settings) + + assert client.model == "deepseek-v4-pro" + + +class TestLLMClientDPComplete: + """Tests for LLMClientDP.complete() — the fallback text path.""" + + @pytest.fixture + def settings(self): + return Settings( + dp_base_url="https://api.deepseek.com", + dp_api_key="test-key", + dp_model_name="deepseek-v4-pro", + ) + + @pytest.fixture + def client(self, settings): + return LLMClientDP(settings) + + @pytest.fixture + def mock_response(self): + """Build a mock OpenAI chat completion response.""" + response = MagicMock() + response.choices = [MagicMock()] + response.choices[0].message.content = '["q1", "q2"]' + response.usage.prompt_tokens = 50 + response.usage.completion_tokens = 10 + return response + + @pytest.mark.asyncio + async def test_complete_returns_text(self, client, mock_response): + """complete() should return the LLM response text.""" + with patch.object( + client._client.chat.completions, "create", new_callable=AsyncMock + ) as mock_create: + mock_create.return_value = mock_response + + result = await client.complete( + prompt="Decompose this question", + temperature=0.7, + step_name="QueryDecomposer", + ) + + assert result == '["q1", "q2"]' + mock_create.assert_called_once() + call_kwargs = mock_create.call_args.kwargs + assert call_kwargs["model"] == "deepseek-v4-pro" + assert call_kwargs["temperature"] == 0.7 + assert call_kwargs["extra_body"]["thinking"]["type"] == "disabled" + + @pytest.mark.asyncio + async def test_complete_sends_thinking_disabled(self, client, mock_response): + """Every complete() call must include thinking=disabled in extra_body.""" + with patch.object( + client._client.chat.completions, "create", new_callable=AsyncMock + ) as mock_create: + mock_create.return_value = mock_response + + await client.complete(prompt="Test") + + extra_body = mock_create.call_args.kwargs["extra_body"] + assert extra_body == {"thinking": {"type": "disabled"}} + + @pytest.mark.asyncio + async def test_complete_wraps_api_error(self, client): + """APIError should be wrapped in LLMClientDPError.""" + with patch.object( + client._client.chat.completions, "create", new_callable=AsyncMock + ) as mock_create: + mock_create.side_effect = APIError( + message="Rate limited", + request=MagicMock(), + body=None, + ) + + with pytest.raises(LLMClientDPError): + await client.complete(prompt="Test") + + @pytest.mark.asyncio + async def test_complete_wraps_timeout_error(self, client): + """APITimeoutError should be wrapped in LLMClientDPError.""" + with patch.object( + client._client.chat.completions, "create", new_callable=AsyncMock + ) as mock_create: + mock_create.side_effect = APITimeoutError(request=MagicMock()) + + with pytest.raises(LLMClientDPError): + await client.complete(prompt="Test") + + @pytest.mark.asyncio + async def test_complete_wraps_unexpected_error(self, client): + """Unexpected exceptions should also be wrapped in LLMClientDPError.""" + with patch.object( + client._client.chat.completions, "create", new_callable=AsyncMock + ) as mock_create: + mock_create.side_effect = RuntimeError("unexpected") + + with pytest.raises(LLMClientDPError): + await client.complete(prompt="Test") + + @pytest.mark.asyncio + async def test_complete_handles_empty_content(self, client): + """None or empty response content should return empty string.""" + response = MagicMock() + response.choices = [MagicMock()] + response.choices[0].message.content = None + response.usage.prompt_tokens = 10 + response.usage.completion_tokens = 0 + + with patch.object( + client._client.chat.completions, "create", new_callable=AsyncMock + ) as mock_create: + mock_create.return_value = response + + result = await client.complete(prompt="Test") + + assert result == "" + + +class TestLLMClientDPCompleteStructured: + """Tests for LLMClientDP.complete_structured() — JSON extraction path.""" + + @pytest.fixture + def settings(self): + return Settings( + dp_base_url="https://api.deepseek.com", + dp_api_key="test-key", + dp_model_name="deepseek-v4-pro", + ) + + @pytest.fixture + def client(self, settings): + return LLMClientDP(settings) + + @pytest.mark.asyncio + async def test_complete_structured_returns_validated_model(self, client): + """Should call complete() with json_object format and parse response.""" + expected = SubQuestions(questions=["Q1", "Q2", "Q3"]) + + with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete: + mock_complete.return_value = '{"questions": ["Q1", "Q2", "Q3"]}' + + result = await client.complete_structured( + prompt="Decompose: test question", + pydantic_model=SubQuestions, + step_name="QueryDecomposer", + ) + + assert result == expected + assert result.questions == ["Q1", "Q2", "Q3"] + mock_complete.assert_called_once() + # Verify Deepseek JSON mode is used + call_kwargs = mock_complete.call_args.kwargs + assert call_kwargs["response_format"] == {"type": "json_object"} + assert call_kwargs["temperature"] == 0.0 + + @pytest.mark.asyncio + async def test_complete_structured_with_markdown_fence(self, client): + """Should strip markdown code fences before JSON parsing.""" + expected = SubQuestions(questions=["Only one"]) + + with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete: + mock_complete.return_value = '```json\n{"questions": ["Only one"]}\n```' + + result = await client.complete_structured( + prompt="Test", + pydantic_model=SubQuestions, + ) + + assert result == expected + + @pytest.mark.asyncio + async def test_complete_structured_invalid_json_raises(self, client): + """Unparseable JSON should raise LLMClientDPError.""" + with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete: + mock_complete.return_value = "not json at all" + + with pytest.raises(LLMClientDPError): + await client.complete_structured( + prompt="Test", + pydantic_model=SubQuestions, + ) + + @pytest.mark.asyncio + async def test_complete_structured_wrong_schema_raises(self, client): + """Valid JSON but wrong Pydantic schema should raise LLMClientDPError.""" + with patch.object(client, "complete", new_callable=AsyncMock) as mock_complete: + mock_complete.return_value = '{"wrong_field": [1, 2, 3]}' + + with pytest.raises(LLMClientDPError): + await client.complete_structured( + prompt="Test", + pydantic_model=SubQuestions, + )