53 lines
1.9 KiB
Python
53 lines
1.9 KiB
Python
import logging
|
|
from typing import List
|
|
|
|
import httpx
|
|
|
|
from app.core.config import Settings
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmbeddingClient:
|
|
"""Async embedding client for OpenRouter-compatible embeddings API."""
|
|
|
|
def __init__(self, settings: Settings):
|
|
self.base_url = settings.embedding_base_url.rstrip("/")
|
|
self.api_key = settings.embedding_api_key or settings.llm_api_key
|
|
self.model = settings.embedding_model
|
|
# Async HTTP client for connection pooling
|
|
self._client: httpx.AsyncClient | None = httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
timeout=settings.llm_timeout,
|
|
headers={"Authorization": f"Bearer {self.api_key}"},
|
|
)
|
|
|
|
async def embed(self, texts: List[str]) -> List[List[float]]:
|
|
if not texts:
|
|
return []
|
|
payload = {"model": self.model, "input": texts}
|
|
try:
|
|
resp = await self._client.post("/embeddings", json=payload)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
# Common OpenAI-like response shapes
|
|
if isinstance(data, dict):
|
|
# OpenRouter/OpenAI style: {"data": [{"embedding": []}, ...]}
|
|
if "data" in data and isinstance(data["data"], list):
|
|
return [item.get("embedding", []) for item in data["data"]]
|
|
# Alternative: {"embeddings": [[...], [...]]}
|
|
if "embeddings" in data and isinstance(data["embeddings"], list):
|
|
return data["embeddings"]
|
|
# Fallback: try to extract a flat list
|
|
if isinstance(data, list):
|
|
return data # type: ignore[return-value]
|
|
except Exception as exc: # pragma: no cover - network/runtime issues
|
|
logger.error("Embedding API call failed: %s", exc)
|
|
raise
|
|
return []
|
|
|
|
async def close(self):
|
|
if self._client:
|
|
await self._client.aclose()
|