147 lines
5.8 KiB
Python
147 lines
5.8 KiB
Python
import logging
|
|
import os
|
|
import time
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from openai import AsyncOpenAI, APIError, APITimeoutError
|
|
|
|
from app.core.config import Settings
|
|
|
|
|
|
class LLMClientError(Exception):
|
|
pass
|
|
|
|
|
|
class LLMClient:
|
|
"""Asynchronous LLM client using OpenAI SDK with provider-agnostic config."""
|
|
|
|
def __init__(self, settings: Settings):
|
|
self.settings = settings
|
|
self.model = settings.llm_model_name
|
|
self.enable_thinking = settings.llm_enable_thinking
|
|
self.logger = logging.getLogger(__name__)
|
|
self._client = AsyncOpenAI(
|
|
base_url=settings.llm_base_url.rstrip("/"),
|
|
api_key=settings.llm_api_key,
|
|
timeout=settings.llm_timeout,
|
|
http_client=httpx.AsyncClient(
|
|
headers={"Content-Type": "application/json"},
|
|
),
|
|
)
|
|
self._langchain_model = None
|
|
|
|
def _truncate_prompt_for_log(self, prompt: str, first_chars: int = 100, last_chars: int = 100) -> str:
|
|
"""Truncate prompt for logging: show first N and last N chars with ellipsis."""
|
|
if len(prompt) <= first_chars + last_chars:
|
|
return prompt
|
|
return f"{prompt[:first_chars]}...({len(prompt) - first_chars - last_chars} chars omitted)...{prompt[-last_chars:]}"
|
|
|
|
async def complete(self, prompt: str, temperature: float = 0.7, step_name: str = "LLM") -> str:
|
|
"""Send a chat completion request with optional thinking control.
|
|
|
|
Args:
|
|
prompt: The prompt to send to the LLM.
|
|
temperature: Sampling temperature.
|
|
step_name: Identifier for the processing step (e.g., "QueryDecomposer").
|
|
|
|
Returns:
|
|
The LLM response text.
|
|
"""
|
|
messages = [{"role": "user", "content": prompt}]
|
|
extra_body = self._build_extra_body()
|
|
|
|
prompt_preview = self._truncate_prompt_for_log(prompt)
|
|
self.logger.info("[%s] LLM request started. Prompt: %s", step_name, prompt_preview)
|
|
start_time = time.perf_counter()
|
|
self.logger.info("LLM Extra Body %s", str(extra_body))
|
|
try:
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=messages,
|
|
temperature=temperature,
|
|
extra_body=extra_body if extra_body else None,
|
|
)
|
|
self.logger.info("LLM Response: %s",str(response))
|
|
content = response.choices[0].message.content or ""
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
self.logger.info(
|
|
"[%s] LLM request completed in %.2fms (prompt_tokens=%s, completion_tokens=%s)",
|
|
step_name,
|
|
elapsed_ms,
|
|
response.usage.prompt_tokens if response.usage else "?",
|
|
response.usage.completion_tokens if response.usage else "?",
|
|
)
|
|
return content
|
|
except (APITimeoutError, APIError) as exc:
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
self.logger.error("[%s] LLM API error after %.2fms: %s", step_name, elapsed_ms, exc)
|
|
raise LLMClientError from exc
|
|
except Exception as exc:
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
self.logger.error("[%s] Unexpected LLM error after %.2fms: %s", step_name, elapsed_ms, exc)
|
|
raise LLMClientError from exc
|
|
|
|
def _build_extra_body(self) -> dict:
|
|
"""Build extra_body for provider-specific parameters.
|
|
|
|
When thinking is enabled, no extra params are passed
|
|
and the model uses its default thinking mode.
|
|
|
|
vLLM: {"chat_template_kwargs": {"enable_thinking": False}}
|
|
OpenRouter: {"reasoning": {"enabled": False}}
|
|
"""
|
|
if self.enable_thinking:
|
|
return {}
|
|
|
|
if self.settings.vllm_engine:
|
|
return {
|
|
"chat_template_kwargs": {"enable_thinking": False},
|
|
}
|
|
return {"reasoning": {"enabled": False}}
|
|
|
|
async def close(self):
|
|
await self._client.close()
|
|
|
|
def _get_langchain_model(self):
|
|
if self._langchain_model is None:
|
|
from langchain.chat_models import init_chat_model
|
|
|
|
os.environ.setdefault("OPENAI_API_KEY", self.settings.llm_api_key)
|
|
os.environ.setdefault("OPENAI_BASE_URL", self.settings.llm_base_url)
|
|
|
|
# Pass thinking/reasoning disable params via model_kwargs.
|
|
# LangChain's ChatOpenAI forwards model_kwargs as top-level
|
|
# request parameters, which is equivalent to OpenAI SDK's extra_body.
|
|
model_kwargs = self._build_extra_body() or None
|
|
|
|
self._langchain_model = init_chat_model(
|
|
model=self.model,
|
|
model_provider="openai",
|
|
temperature=0.0,
|
|
model_kwargs=model_kwargs,
|
|
)
|
|
return self._langchain_model
|
|
|
|
async def complete_structured(self, prompt: str, pydantic_model, step_name: str = "LLM"):
|
|
prompt_preview = self._truncate_prompt_for_log(prompt)
|
|
self.logger.info("[%s] Structured LLM request started. Prompt: %s", step_name, prompt_preview)
|
|
start_time = time.perf_counter()
|
|
|
|
try:
|
|
model = self._get_langchain_model()
|
|
structured = model.with_structured_output(pydantic_model, method="json_schema")
|
|
result = await structured.ainvoke(prompt)
|
|
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
self.logger.info(
|
|
"[%s] Structured LLM request completed in %.2fms",
|
|
step_name,
|
|
elapsed_ms,
|
|
)
|
|
return result
|
|
except Exception as exc:
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
self.logger.error("[%s] Structured LLM error after %.2fms: %s", step_name, elapsed_ms, exc)
|
|
raise LLMClientError from exc
|