legco_ai_assistant/backend/app/services/llm_client.py

158 lines
6.4 KiB
Python

import logging
import os
import time
from typing import Optional
import httpx
from openai import AsyncOpenAI, APIError, APITimeoutError
from app.core.config import Settings
class LLMClientError(Exception):
pass
class LLMClient:
"""Asynchronous LLM client using OpenAI SDK with provider-agnostic config."""
def __init__(self, settings: Settings):
self.settings = settings
self.model = settings.llm_model_name
self.enable_thinking = settings.llm_enable_thinking
self.logger = logging.getLogger(__name__)
self._client = AsyncOpenAI(
base_url=settings.llm_base_url.rstrip("/"),
api_key=settings.llm_api_key,
timeout=settings.llm_timeout,
http_client=httpx.AsyncClient(
headers={"Content-Type": "application/json"},
),
)
self._langchain_model = None
def _truncate_prompt_for_log(self, prompt: str, first_chars: int = 100, last_chars: int = 100) -> str:
"""Truncate prompt for logging: show first N and last N chars with ellipsis."""
if len(prompt) <= first_chars + last_chars:
return prompt
return f"{prompt[:first_chars]}...({len(prompt) - first_chars - last_chars} chars omitted)...{prompt[-last_chars:]}"
async def complete(self, prompt: str, temperature: float = 0.7, step_name: str = "LLM") -> str:
"""Send a chat completion request with optional thinking control.
Args:
prompt: The prompt to send to the LLM.
temperature: Sampling temperature.
step_name: Identifier for the processing step (e.g., "QueryDecomposer").
Returns:
The LLM response text.
"""
messages = [{"role": "user", "content": prompt}]
extra_body = self._build_extra_body()
prompt_preview = self._truncate_prompt_for_log(prompt)
self.logger.info("[%s] LLM request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
self.logger.info("LLM Extra Body %s", str(extra_body))
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
temperature=temperature,
extra_body=extra_body if extra_body else None,
)
self.logger.info("LLM Response: %s",str(response))
content = response.choices[0].message.content or ""
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.info(
"[%s] LLM request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)",
step_name,
elapsed_ms,
response.usage.prompt_tokens if response.usage else "?",
response.usage.completion_tokens if response.usage else "?",
)
return content
except (APITimeoutError, APIError) as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.error("[%s] LLM API error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientError from exc
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.error("[%s] Unexpected LLM error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientError from exc
def _build_extra_body(self) -> dict:
"""Build extra_body for provider-specific parameters.
When thinking is enabled, no extra params are passed
and the model uses its default thinking mode.
vLLM: {"chat_template_kwargs": {"enable_thinking": False}}
OpenRouter: {"reasoning": {"enabled": False}}
"""
if self.enable_thinking:
return {}
if self.settings.vllm_engine:
return {
"chat_template_kwargs": {"enable_thinking": False},
}
return {"reasoning": {"enabled": False}}
async def close(self):
await self._client.close()
def _get_langchain_model(self):
if self._langchain_model is None:
from langchain.chat_models import init_chat_model
os.environ.setdefault("OPENAI_API_KEY", self.settings.llm_api_key)
os.environ.setdefault("OPENAI_BASE_URL", self.settings.llm_base_url)
# vLLM's chat_template_kwargs is incompatible with LangChain's
# with_structured_output() — it leaks into AsyncCompletions.parse()
# which rejects the unknown kwarg. Only provider-agnostic params
# (e.g. OpenAI's reasoning) are safe to pass via model_kwargs.
model_kwargs: dict[str, Any] | None = None
if not self.settings.vllm_engine and not self.enable_thinking:
model_kwargs = {"reasoning": {"enabled": False}}
self._langchain_model = init_chat_model(
model=self.model,
model_provider="openai",
temperature=0.0,
model_kwargs=model_kwargs,
)
return self._langchain_model
async def complete_structured(self, prompt: str, pydantic_model, step_name: str = "LLM"):
prompt_preview = self._truncate_prompt_for_log(prompt)
self.logger.info("[%s] Structured LLM request started. Prompt: %s", step_name, prompt_preview)
start_time = time.perf_counter()
extra_body = self._build_extra_body()
self.logger.info("[%s] Structured LLM Extra Body: %s", step_name, str(extra_body))
try:
model = self._get_langchain_model()
structured = model.with_structured_output(pydantic_model, method="json_schema")
result = await structured.ainvoke(prompt)
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.info(
"[%s] Structured LLM Response: %s",
step_name,
getattr(result, "model_dump", lambda: result)(),
)
self.logger.info(
"[%s] Structured LLM request completed in %.2fms",
step_name,
elapsed_ms,
)
return result
except Exception as exc:
elapsed_ms = (time.perf_counter() - start_time) * 1000
self.logger.error("[%s] Structured LLM error after %.2fms: %s", step_name, elapsed_ms, exc)
raise LLMClientError from exc