legco_ai_assistant/backend/app/services/embedding_client.py

53 lines
1.9 KiB
Python

import logging
from typing import List
import httpx
from app.core.config import Settings
logger = logging.getLogger(__name__)
class EmbeddingClient:
"""Async embedding client for OpenRouter-compatible embeddings API."""
def __init__(self, settings: Settings):
self.base_url = settings.embedding_base_url.rstrip("/")
self.api_key = settings.embedding_api_key or settings.llm_api_key
self.model = settings.embedding_model
# Async HTTP client for connection pooling
self._client: httpx.AsyncClient | None = httpx.AsyncClient(
base_url=self.base_url,
timeout=settings.llm_timeout,
headers={"Authorization": f"Bearer {self.api_key}"},
)
async def embed(self, texts: List[str]) -> List[List[float]]:
if not texts:
return []
payload = {"model": self.model, "input": texts}
try:
resp = await self._client.post("/embeddings", json=payload)
resp.raise_for_status()
data = resp.json()
# Common OpenAI-like response shapes
if isinstance(data, dict):
# OpenRouter/OpenAI style: {"data": [{"embedding": []}, ...]}
if "data" in data and isinstance(data["data"], list):
return [item.get("embedding", []) for item in data["data"]]
# Alternative: {"embeddings": [[...], [...]]}
if "embeddings" in data and isinstance(data["embeddings"], list):
return data["embeddings"]
# Fallback: try to extract a flat list
if isinstance(data, list):
return data # type: ignore[return-value]
except Exception as exc: # pragma: no cover - network/runtime issues
logger.error("Embedding API call failed: %s", exc)
raise
return []
async def close(self):
if self._client:
await self._client.aclose()