import json import logging import os import re import time from typing import Any, Optional import httpx from openai import AsyncOpenAI, APIError, APITimeoutError from app.core.config import Settings class LLMClientError(Exception): pass class LLMClient: """Asynchronous LLM client using OpenAI SDK with provider-agnostic config.""" def __init__(self, settings: Settings): self.settings = settings self.model = settings.llm_model_name self.enable_thinking = settings.llm_enable_thinking self.logger = logging.getLogger(__name__) self._client = AsyncOpenAI( base_url=settings.llm_base_url.rstrip("/"), api_key=settings.llm_api_key, timeout=settings.llm_timeout, http_client=httpx.AsyncClient( headers={"Content-Type": "application/json"}, ), ) self._langchain_model = None def _truncate_prompt_for_log(self, prompt: str, first_chars: int = 100, last_chars: int = 100) -> str: """Truncate prompt for logging: show first N and last N chars with ellipsis.""" if len(prompt) <= first_chars + last_chars: return prompt return f"{prompt[:first_chars]}...({len(prompt) - first_chars - last_chars} chars omitted)...{prompt[-last_chars:]}" async def complete(self, prompt: str, temperature: float = 0.7, step_name: str = "LLM") -> str: """Send a chat completion request with optional thinking control. Args: prompt: The prompt to send to the LLM. temperature: Sampling temperature. step_name: Identifier for the processing step (e.g., "QueryDecomposer"). Returns: The LLM response text. """ messages = [{"role": "user", "content": prompt}] extra_body = self._build_extra_body() prompt_preview = self._truncate_prompt_for_log(prompt) self.logger.info("[%s] LLM request started. Prompt: %s", step_name, prompt_preview) start_time = time.perf_counter() self.logger.info("LLM Extra Body %s", str(extra_body)) try: response = await self._client.chat.completions.create( model=self.model, messages=messages, temperature=temperature, extra_body=extra_body if extra_body else None, ) self.logger.info("LLM Response: %s",str(response)) content = response.choices[0].message.content or "" elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.info( "[%s] LLM request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)", step_name, elapsed_ms, response.usage.prompt_tokens if response.usage else "?", response.usage.completion_tokens if response.usage else "?", ) return content except (APITimeoutError, APIError) as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.error("[%s] LLM API error after %.2fms: %s", step_name, elapsed_ms, exc) raise LLMClientError from exc except Exception as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.error("[%s] Unexpected LLM error after %.2fms: %s", step_name, elapsed_ms, exc) raise LLMClientError from exc def _build_extra_body(self) -> dict: """Build extra_body for provider-specific parameters. When thinking is enabled, no extra params are passed and the model uses its default thinking mode. vLLM: {"chat_template_kwargs": {"enable_thinking": False}} OpenRouter: {"reasoning": {"enabled": False}} """ if self.enable_thinking: return {} if self.settings.vllm_engine: return { "chat_template_kwargs": {"enable_thinking": False}, } return {"reasoning": {"enabled": False}} async def close(self): await self._client.close() def _get_langchain_model(self): if self._langchain_model is None: from langchain.chat_models import init_chat_model os.environ.setdefault("OPENAI_API_KEY", self.settings.llm_api_key) os.environ.setdefault("OPENAI_BASE_URL", self.settings.llm_base_url) # vLLM's chat_template_kwargs is incompatible with LangChain's # with_structured_output() — it leaks into AsyncCompletions.parse() # which rejects the unknown kwarg. Only provider-agnostic params # (e.g. OpenAI's reasoning) are safe to pass via model_kwargs. model_kwargs: dict[str, Any] | None = None if not self.settings.vllm_engine and not self.enable_thinking: model_kwargs = {"reasoning": {"enabled": False}} self._langchain_model = init_chat_model( model=self.model, model_provider="openai", temperature=0.0, model_kwargs=model_kwargs, ) return self._langchain_model async def complete_structured(self, prompt: str, pydantic_model, step_name: str = "LLM"): prompt_preview = self._truncate_prompt_for_log(prompt) self.logger.info("[%s] Structured LLM request started. Prompt: %s", step_name, prompt_preview) start_time = time.perf_counter() if self.settings.vllm_engine: return await self._complete_structured_vllm(prompt, pydantic_model, step_name, start_time) return await self._complete_structured_openai(prompt, pydantic_model, step_name, start_time) def _strip_markdown_fence(self, content: str) -> str: """Strip markdown code fences from LLM output if present. Some models (especially vLLM) may wrap JSON output in ```json ... ``` fences even when structured output is requested. """ match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", content, re.DOTALL) if match: return match.group(1).strip() return content async def _complete_structured_vllm(self, prompt: str, pydantic_model, step_name: str, start_time: float): """Use vLLM-native structured output via response_format or structured_outputs extra_body. Tier 1 (recommended): OpenAI-native response_format (vLLM v0.6.4+) - Portable across OpenRouter, OpenAI, and vLLM - vLLM RFC #19097 confirms this is the future direction Tier 2 (fallback): extra_body structured_outputs (vLLM v0.8+) - Battle-tested fallback for v0.8+ deployments guided_json is deliberately removed — it was removed in vLLM v0.12.0. """ schema = pydantic_model.model_json_schema() model_name = pydantic_model.__name__ prompt_preview = self._truncate_prompt_for_log(prompt, first_chars=300, last_chars=100) self.logger.info( "[%s] vLLM structured: prompt=%s schema=%s", step_name, prompt_preview, json.dumps(schema)[:300], ) # ------------------------------------------------------------------ # Tier 1: OpenAI-native response_format (vLLM v0.6.4+) # ------------------------------------------------------------------ # vLLM's protocol.py to_sampling_params() converts this to # StructuredOutputsParams since PR #7654 (Aug 2024). RFC #19097 # confirms this will become the primary structured output API. try: response = await self._client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.0, response_format={ "type": "json_schema", "json_schema": {"name": model_name, "schema": schema}, }, ) content = response.choices[0].message.content or "" content = self._strip_markdown_fence(content) elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.info( "[%s] vLLM structured succeeded via response_format in %.2fms " "tokens=(%s/%s)", step_name, elapsed_ms, response.usage.prompt_tokens if response.usage else "?", response.usage.completion_tokens if response.usage else "?", ) return pydantic_model.model_validate_json(content) except Exception as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.warning( "[%s] response_format failed after %.2fms: %s. " "Trying structured_outputs fallback.", step_name, elapsed_ms, exc, ) # ------------------------------------------------------------------ # Tier 2: extra_body structured_outputs (vLLM v0.8+) # ------------------------------------------------------------------ # This is the battle-tested format with extensive test coverage in # vLLM's test_chat.py. Used by multiple real-world projects. try: response = await self._client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.0, extra_body={"structured_outputs": {"json": schema}}, ) content = response.choices[0].message.content or "" content = self._strip_markdown_fence(content) elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.info( "[%s] vLLM structured succeeded via structured_outputs in %.2fms " "tokens=(%s/%s)", step_name, elapsed_ms, response.usage.prompt_tokens if response.usage else "?", response.usage.completion_tokens if response.usage else "?", ) return pydantic_model.model_validate_json(content) except Exception as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.error( "[%s] Both response_format and structured_outputs failed after %.2fms", step_name, elapsed_ms, exc_info=True, ) raise LLMClientError("vLLM structured output failed with all supported formats") async def _complete_structured_openai(self, prompt: str, pydantic_model, step_name: str, start_time: float): """Use OpenAI-native json_schema via LangChain's with_structured_output().""" try: model = self._get_langchain_model() self.logger.info("[%s] Structured output method: json_schema (OpenAI-native)", step_name) structured = model.with_structured_output(pydantic_model, method="json_schema") result = await structured.ainvoke(prompt) elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.info( "[%s] Structured LLM Response: %s", step_name, getattr(result, "model_dump", lambda: result)(), ) self.logger.info( "[%s] Structured LLM request completed in %.2fms", step_name, elapsed_ms, ) return result except Exception as exc: elapsed_ms = (time.perf_counter() - start_time) * 1000 self.logger.error( "[%s] Structured LLM error after %.2fms: %s", step_name, elapsed_ms, exc, exc_info=True, ) raise LLMClientError from exc