legco_ai_assistant/backend/app/services/llm_client.py

221 lines
9.5 KiB
Python

import json
import logging
import os
import time
from typing import Any, Optional
import httpx
from openai import AsyncOpenAI, APIError, APITimeoutError
from app.core.config import Settings
class LLMClientError(Exception):
pass
class LLMClient:
"""Asynchronous LLM client using OpenAI SDK with provider-agnostic config."""
def __init__(self, settings: Settings):
self.settings = settings
self.model = settings.llm_model_name
self.enable_thinking = settings.llm_enable_thinking
self.logger = logging.getLogger(__name__)
self._client = AsyncOpenAI(
base_url=settings.llm_base_url.rstrip("/"),
api_key=settings.llm_api_key,
timeout=settings.llm_timeout,
http_client=httpx.AsyncClient(
headers={"Content-Type": "application/json"},
),
)
self._langchain_model = None
def _truncate_prompt_for_log(self, prompt: str, first_chars: int = 100, last_chars: int = 100) -> str:
"""Truncate prompt for logging: show first N and last N chars with ellipsis."""
if len(prompt) <= first_chars + last_chars:
return prompt
return f"{prompt[:first_chars]}...({len(prompt) - first_chars - last_chars} chars omitted)...{prompt[-last_chars:]}"
async def complete(self, prompt: str, temperature: float = 0.7, step_name: str = "LLM") -> str:
"""Send a chat completion request with optional thinking control.
Args:
prompt: The prompt to send to the LLM.
temperature: Sampling temperature.
step_name: Identifier for the processing step (e.g., "QueryDecomposer").
Returns:
The LLM response text.
"""
messages = [{"role": "user", "content": prompt}]
extra_body = self._build_extra_body()
prompt_preview = self._truncate_prompt_for_log(prompt)
self.logger.info("[%s] LLM request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
self.logger.info("LLM Extra Body %s", str(extra_body))
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
temperature=temperature,
extra_body=extra_body if extra_body else None,
)
self.logger.info("LLM Response: %s",str(response))
content = response.choices[0].message.content or ""
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.info(
"[%s] LLM request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)",
step_name,
elapsed_ms,
response.usage.prompt_tokens if response.usage else "?",
response.usage.completion_tokens if response.usage else "?",
)
return content
except (APITimeoutError, APIError) as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.error("[%s] LLM API error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientError from exc
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.error("[%s] Unexpected LLM error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientError from exc
def _build_extra_body(self) -> dict:
"""Build extra_body for provider-specific parameters.
When thinking is enabled, no extra params are passed
and the model uses its default thinking mode.
vLLM: {"chat_template_kwargs": {"enable_thinking": False}}
OpenRouter: {"reasoning": {"enabled": False}}
"""
if self.enable_thinking:
return {}
if self.settings.vllm_engine:
return {
"chat_template_kwargs": {"enable_thinking": False},
}
return {"reasoning": {"enabled": False}}
async def close(self):
await self._client.close()
def _get_langchain_model(self):
if self._langchain_model is None:
from langchain.chat_models import init_chat_model
os.environ.setdefault("OPENAI_API_KEY", self.settings.llm_api_key)
os.environ.setdefault("OPENAI_BASE_URL", self.settings.llm_base_url)
# vLLM's chat_template_kwargs is incompatible with LangChain's
# with_structured_output() — it leaks into AsyncCompletions.parse()
# which rejects the unknown kwarg. Only provider-agnostic params
# (e.g. OpenAI's reasoning) are safe to pass via model_kwargs.
model_kwargs: dict[str, Any] | None = None
if not self.settings.vllm_engine and not self.enable_thinking:
model_kwargs = {"reasoning": {"enabled": False}}
self._langchain_model = init_chat_model(
model=self.model,
model_provider="openai",
temperature=0.0,
model_kwargs=model_kwargs,
)
return self._langchain_model
async def complete_structured(self, prompt: str, pydantic_model, step_name: str = "LLM"):
prompt_preview = self._truncate_prompt_for_log(prompt)
self.logger.info("[%s] Structured LLM request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
if self.settings.vllm_engine:
return await self._complete_structured_vllm(prompt, pydantic_model, step_name, start_time)
return await self._complete_structured_openai(prompt, pydantic_model, step_name, start_time)
async def _complete_structured_vllm(self, prompt: str, pydantic_model, step_name: str, start_time: float):
"""Use vLLM's native guided_json via extra_body for structured output."""
schema = pydantic_model.model_json_schema()
prompt_preview = self._truncate_prompt_for_log(prompt, first_chars=300, last_chars=100)
self.logger.info(
"[%s] vLLM structured: prompt=%s schema=%s",
step_name, prompt_preview, json.dumps(schema)[:300],
)
# Merge thinking-control params so vLLM structured calls
# also respect enable_thinking/vllm_engine config (was missing).
body = self._build_extra_body()
# Try the new unified format first, then legacy guided_json
for fmt_name, base_extra in [
("structured_outputs", {"structured_outputs": {"json": schema}}),
("guided_json", {"guided_json": schema}),
]:
extra = {**base_extra, **body}
try:
self.logger.info("[%s] vLLM structured: trying format=%s extra=%s", step_name, fmt_name, extra)
response = await self._client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
extra_body=extra,
)
self.logger.info("[%s] vLLM structured full response: %s", step_name, response)
content = response.choices[0].message.content or ""
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.info(
"[%s] vLLM structured succeeded format=%s in %.2fms tokens=(%s/%s). Parsed=%s",
step_name, fmt_name, elapsed_ms,
response.usage.prompt_tokens if response.usage else "?",
response.usage.completion_tokens if response.usage else "?",
content,
)
return pydantic_model.model_validate_json(content)
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.warning(
"[%s] vLLM structured format=%s failed after %.2fms: %s",
step_name, fmt_name, elapsed_ms, exc,
exc_info=True,
)
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.error(
"[%s] vLLM structured: all formats failed after %.2fms", step_name, elapsed_ms,
exc_info=True,
)
raise LLMClientError("vLLM structured output failed with all guided formats")
async def _complete_structured_openai(self, prompt: str, pydantic_model, step_name: str, start_time: float):
"""Use OpenAI-native json_schema via LangChain's with_structured_output()."""
try:
model = self._get_langchain_model()
self.logger.info("[%s] Structured output method: json_schema (OpenAI-native)", step_name)
structured = model.with_structured_output(pydantic_model, method="json_schema")
result = await structured.ainvoke(prompt)
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.info(
"[%s] Structured LLM Response: %s",
step_name,
getattr(result, "model_dump", lambda: result)(),
)
self.logger.info(
"[%s] Structured LLM request completed in %.2fms",
step_name,
elapsed_ms,
)
return result
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.error(
"[%s] Structured LLM error after %.2fms: %s",
step_name, elapsed_ms, exc,
exc_info=True,
)
raise LLMClientError from exc