legco_ai_assistant/.plans/phase5_openrouter_asr.md

478 lines
20 KiB
Markdown

# Phase 5: OpenRouter ASR Provider
**Date:** 2026-05-18
**Status:** ✅ Implemented (2026-05-19, updated 2026-05-19)
**Source:** User request — add OpenRouter STT as alternative ASR provider for both batch and realtime
**Model:** `google/chirp-3` (changed from `google/gemini-3.1-flash-lite` — gemini-3.1-flash-lite is not an STT model; OpenRouter `/audio/transcriptions` supports 8 specific models)
**Research:** OpenRouter STT docs + librarian agent (real-world code patterns + model compatibility verification) + explore agent (codebase architecture map)
**Test Results:** 49/49 core ASR tests pass (Phase 2 + Phase 5); 6/7 WS tests pass (1 pre-existing timeout)
---
## 1. Objective
Add OpenRouter as a second ASR provider for **batch transcription** (`transcribe_full`). The realtime WebSocket streaming mode remains DashScope-only because OpenRouter has no WebSocket STT endpoint.
Users select the provider via a single env var. The existing REST endpoint `POST /api/v1/video/{video_id}/transcribe` and the WebSocket endpoint `/ws/asr/{video_id}` are unchanged from the frontend's perspective.
---
## 2. Scope
| In Scope | Out of Scope |
|----------|-------------|
| OpenRouter batch transcription (`transcribe_full`) | Frontend provider selector UI |
| OpenRouter realtime WebSocket streaming (chunked REST, ~3s chunks) | True realtime streaming (no WebSocket STT endpoint exists) |
| `ASR_PROVIDER` env var switching (batch + realtime) | Changing existing DashScope code behavior |
| Provider abstraction (protocol class) | Retraining/changing models |
| Tests for new provider | Docker image rebuild |
| `.env.example` update | |
---
## 3. Architecture
### 3.1 Current Flow (DashScope-only)
```
POST /api/v1/video/{video_id}/transcribe
→ video.py router
→ VideoService.extract_audio() → WAV bytes
→ ASRClient(settings).transcribe_full(audio_bytes, language)
→ OpenAI SDK → DashScope Chat Completions API (audio input)
→ return text
```
### 3.2 New Flow (Provider-based)
```
POST /api/v1/video/{video_id}/transcribe
→ video.py router
→ VideoService.extract_audio() → WAV bytes
→ ASRClient(settings).transcribe_full(audio_bytes, language)
├── ASR_PROVIDER=dashscope → DashScopeASRProvider (existing logic)
└── ASR_PROVIDER=openrouter → OpenRouterASRProvider (new)
→ return text
```
### 3.3 Provider Interface (Factory + Strategy Pattern)
Based on real-world multi-provider ASR patterns (DocsGPT, LiveKit, openai-agents-python), use **Factory + Strategy**:
```python
from abc import ABC, abstractmethod
from typing import Protocol
class ASRProvider(ABC):
"""Abstract base for all ASR providers."""
@abstractmethod
async def transcribe(self, audio_bytes: bytes, language: str) -> str:
"""Transcribe audio bytes to traditional Chinese text.
Raises ASRError on any failure (network, HTTP, empty response).
"""
...
class ASRProviderFactory:
"""Selects ASR provider based on settings."""
_providers: dict[str, type[ASRProvider]] = {}
@classmethod
def register(cls, name: str, provider_cls: type[ASRProvider]) -> None:
cls._providers[name] = provider_cls
@classmethod
def create(cls, name: str, settings) -> ASRProvider:
provider_cls = cls._providers.get(name)
if not provider_cls:
raise ValueError(f"Unknown ASR provider: {name}")
return provider_cls(settings)
```
**Why async?** The video router endpoint is already `async def`. The existing `transcribe_full` is sync (blocking), which blocks the event loop during 30-60s API calls. New providers should be async. Existing DashScope can be wrapped in `loop.run_in_executor()` temporarily.
### 3.4 Existing Provider Pattern (LLMClient)
The codebase already has a provider-switching pattern in `llm_client.py`**single-class conditional branching**, not ABC/interface:
```python
# llm_client.py pattern:
if settings.vllm_engine:
extra_body = {"chat_template_kwargs": {"enable_thinking": False}}
else:
extra_body = {"reasoning": {"enabled": False}}
```
For ASR, the same pattern would mean `ASRClient` checks `settings.asr_provider` to select the right SDK/URL. However, since DashScope and OpenRouter use fundamentally different APIs (DashScope = Chat Completions + audio input; OpenRouter = dedicated STT endpoint), the **Factory+Strategy** pattern (Section 3.3) is cleaner for ASR — each provider gets its own class implementing a common interface.
### 3.5 OpenRouter SDK vs Raw httpx
| Trade-off | Raw httpx | OpenRouter SDK (`pip install openrouter`) |
|-----------|-----------|------------------------------------------|
| Type safety | Manual | Pydantic models |
| Retry logic | Must implement (`tenacity`) | Built-in `retries=RetryConfig(...)` |
| Production readiness | Battle-tested | Beta (auto-generated from OpenAPI) |
| Dependencies | `httpx` (already installed) | SDK + Pydantic + extra deps |
**Decision**: Use **raw httpx + tenacity** for Phase 5. This matches the approach used by most production Python projects (lethe, openclaw) and avoids beta SDK risk. The official SDK can be adopted later if it stabilizes.
### 3.6 Retry & Error Handling
Based on production OpenRouter STT implementations (lethe, openrouter-proxy):
```python
from tenacity import (
retry, stop_after_attempt, wait_random_exponential,
retry_if_exception_type
)
RETRIABLE_STATUS = {429, 500, 502, 503, 504}
@retry(
reraise=True,
stop=stop_after_attempt(4),
wait=wait_random_exponential(multiplier=0.2, max=3.0),
retry=retry_if_exception_type((httpx.TransportError, httpx.HTTPStatusError)),
)
async def _call_stt_api(self, audio_b64: str, language: str) -> dict:
"""Call OpenRouter STT with retry and exponential backoff."""
...
```
Error categories to handle:
| Error | Response | Retry? |
|-------|----------|--------|
| `httpx.HTTPStatusError` (429) | Rate limited | Yes (backoff) |
| `httpx.HTTPStatusError` (5xx) | Server error | Yes (backoff) |
| `httpx.HTTPStatusError` (4xx, non-429) | Client error | No |
| `httpx.ConnectError` | Connection failed | Yes |
| `httpx.TimeoutException` | Timeout (>120s) | Yes |
| Empty `result["text"]` | No transcription | No |
**Note:** `tenacity` is NOT currently in `requirements.txt`. Add it as a new dependency.
### 3.7 API Differences
| | DashScope | OpenRouter |
|---|---|---|
| Endpoint | `https://dashscope-intl.aliyuncs.com/compatible-mode/v1` | `https://openrouter.ai/api/v1/audio/transcriptions` |
| Method | Chat Completions (`POST /chat/completions`) | Dedicated STT (`POST /audio/transcriptions`) |
| Audio format | `data:audio/wav;base64,...` (data URL) | `{"data": "<base64>", "format": "wav"}` (raw base64) |
| Auth | `DASHSCOPE_API_KEY` | `OPENROUTER_API_KEY` (separate key for accounting flexibility) |
| Response | `choices[0].message.content` | `{"text": "...", "usage": {...}}` (no segments/timestamps/speaker labels) |
| Response | `choices[0].message.content` | `{"text": "...", "usage": {...}}` |
| SDK | `openai.OpenAI` | `httpx.AsyncClient` (no official SDK needed) |
---
## 4. Configuration
### 4.1 New Env Vars
| Variable | Default | Description |
|----------|---------|-------------|
| `ASR_PROVIDER` | `dashscope` | ASR provider: `dashscope` or `openrouter` |
| `OPENROUTER_API_KEY` | `""` | OpenRouter API key (for STT; separate from LLM_API_KEY for accounting) |
| `ASR_OPENROUTER_MODEL` | `google/gemini-3.1-flash-lite` | OpenRouter STT model name |
### 4.2 Settings Changes
Add to `Settings` class in `config.py`:
```python
# ASR provider (Phase 5)
asr_provider: str = "dashscope" # "dashscope" or "openrouter"
openrouter_api_key: str = "" # separate from llm_api_key for accounting
asr_openrouter_model: str = "google/gemini-3.1-flash-lite"
```
**Note:** OpenRouter STT uses:
- `openrouter_api_key` — dedicated key (user preference for separate accounting)
- `llm_base_url``https://openrouter.ai/api/v1` (base, STT endpoint appended: `/audio/transcriptions`)
### 4.3 Validation
Add a startup validation in `config.py` or `asr_client.py`:
```python
VALID_ASR_PROVIDERS = {"dashscope", "openrouter"}
if settings.asr_provider not in VALID_ASR_PROVIDERS:
raise ValueError(f"Invalid ASR_PROVIDER: {settings.asr_provider}. Must be one of {VALID_ASR_PROVIDERS}")
```
---
## 5. Implementation Tasks
### Task 5.1: Add config vars and validation
**File:** `backend/app/core/config.py`
- Add `asr_provider: str = "dashscope"`
- Add `asr_openrouter_model: str = "google/gemini-3.1-flash-lite"`
- Add `model_config` validation or runtime check in `get_settings()`
**Test file:** `backend/app/test/test_phase5_config.py`
### Task 5.2: Create OpenRouter ASR provider
**File:** `backend/app/services/asr_providers.py` (new)
```python
class OpenRouterASRProvider:
def __init__(self, api_key: str, base_url: str, model: str):
self.api_key = api_key
# STT endpoint: base_url + /audio/transcriptions
self.stt_url = f"{base_url.rstrip('/')}/audio/transcriptions"
self.model = model
self._client: httpx.AsyncClient | None = None
async def transcribe(self, audio_bytes: bytes, language: str) -> str:
"""Transcribe using OpenRouter STT endpoint."""
...
```
**OpenRouter STT Request:**
```python
import base64
import httpx
audio_b64 = base64.b64encode(audio_bytes).decode("ascii")
payload = {
"model": self.model,
"input_audio": {
"data": audio_b64, # raw base64, NOT data URL
"format": "wav",
},
}
if language and language != "auto":
payload["language"] = language
response = await client.post(
self.stt_url,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=120.0, # 60s upstream timeout + buffer
)
response.raise_for_status()
result = response.json()
return _to_traditional(result["text"])
```
**Key design notes:**
- Uses `httpx.AsyncClient` (already in `requirements.txt`)
- Base64 format: raw bytes, NOT `data:audio/wav;base64,...` (DashScope uses data URL; OpenRouter wants raw base64)
- Timeout: 120s (OpenRouter docs say 60s upstream timeout; add buffer)
- Error handling: raise custom `ASRError` on HTTP errors, network errors, or empty response text
**Test file:** `backend/app/test/test_phase5_openrouter_provider.py`
### Task 5.3: Refactor ASRClient to use provider abstraction
**File:** `backend/app/services/asr_client.py`
Changes:
1. Define `ASRProvider` protocol (or ABC)
2. Extract existing DashScope logic into `DashScopeASRProvider` (sync wrapper for now)
3. `ASRClient.__init__` selects provider based on `settings.asr_provider`
4. `ASRClient.transcribe_full` delegates to provider
5. Make `transcribe_full` async (minor refactor to `video.py` router)
**Backward compatibility:** Default `asr_provider=dashscope` means zero behavior change for existing deployments.
**Test file:** `backend/app/test/test_phase2_asr_client.py` — update existing tests to work with new provider structure; add tests for provider switching.
### Task 5.4: Update video router for async transcription
**File:** `backend/app/routers/video.py`
Minimal change — the `asr.transcribe_full()` call becomes `await asr.transcribe_full()`:
```python
# Before (line 113):
text = asr.transcribe_full(audio_bytes, language=language)
# After:
text = await asr.transcribe_full(audio_bytes, language=language)
```
No other changes needed. The endpoint signature is already `async def`.
### Task 5.5: Update .env.example and config documentation
**File:** `backend/.env.example`
- Add `ASR_PROVIDER` and `ASR_OPENROUTER_MODEL` comments
**File:** `AGENTS.md` or development plan
- Note the new Phase 5 capability
### Task 5.6: Integration test (mock OpenRouter HTTP)
**File:** `backend/app/test/test_phase5_integration.py`
- Test full flow: video upload → transcribe with `ASR_PROVIDER=openrouter` → verify text
- Mock `httpx.AsyncClient.post` to return valid OpenRouter STT response
### Task 5.7: Acceptance test (real OpenRouter)
**File:** `backend/app/test/acceptance/test_acceptance_phase5_openrouter.py`
- Real OpenRouter API call with a short test audio file
- Verify transcription quality
- Marked `@pytest.mark.acceptance` and `@pytest.mark.slow`
---
## 6. Realtime ASR (Chunked REST — Implemented)
OpenRouter has no WebSocket STT endpoint. For realtime streaming, we implemented **chunked REST**: send accumulated audio chunks to OpenRouter REST endpoint every ~3 seconds.
### 6.1 Implementation (`_ws_proxy_openrouter`)
**File:** `backend/app/routers/ws_asr.py`
```python
async def _ws_proxy_openrouter(client_ws: WebSocket, language: str = "yue"):
"""WebSocket proxy for OpenRouter ASR: chunked REST approach.
Accumulates PCM audio from DashScope VPR server, flushes chunks ~every 3s
to OpenRouter REST API via pcm_to_wav() conversion.
"""
```
**Key design:**
- `pcm_to_wav(pcm_bytes, sample_rate=16000)` — converts raw PCM to WAV header + bytes
- `flush_lock` (asyncio.Lock) — prevents concurrent API calls during chunk flush
- ~3s chunk interval → calls OpenRouter `/audio/transcriptions` REST endpoint
- PCM accumulation: receives PCM frames from DashScope VPR server, appends to buffer
- On flush: converts accumulated PCM → WAV, sends to OpenRouter, emits `delta`/`full_text` events to client via WebSocket
### 6.2 Provider Dispatch in ws_asr
The WebSocket endpoint dispatches based on `ASR_PROVIDER`:
```python
# ws_asr.py endpoint dispatch:
if settings.asr_provider == "openrouter":
await _ws_proxy_openrouter(websocket, language)
else:
await _ws_proxy_dashscope(websocket, loop, language)
```
### 6.3 Language Code Handling
OpenRouter STT expects ISO 639-1 language codes. `yue` (ISO 639-3) is not supported — the chunked handler omits the language parameter when `language` is `"yue"` or `"auto"`, relying on auto-detection:
```python
if language and language not in ("auto", "yue"):
payload["language"] = language
```
### 6.4 Limitations
- **Latency**: ~3-5s delay per chunk (accumulation + API roundtrip). Not true realtime.
- **No incremental results**: Each chunk produces a full transcription, not word-by-word streaming.
- **DashScope VPR dependency**: The WebSocket still connects to DashScope's VPR server for audio capture; only the transcription API is swapped to OpenRouter.
---
## 7. Test Plan
| Test File | What It Covers | Mock Strategy |
|-----------|---------------|---------------|
| `test_phase5_config.py` | Config validation, invalid provider rejection | No mocks (pure config) |
| `test_phase5_openrouter_provider.py` | OpenRouterASRProvider unit tests | Mock `httpx.AsyncClient` |
| `test_phase2_asr_client.py` (updated) | ASRClient with both providers | Mock DashScope + OpenRouter |
| `test_phase5_integration.py` | Full video→transcribe with OpenRouter | Mock `httpx` (TestClient) |
| `test_acceptance_phase5_openrouter.py` | Real OpenRouter API | None (real API) |
**Test-first rule:** Write tests BEFORE implementation (per AGENTS.md convention). Each implementation task references its test file.
---
## 8. Acceptance Criteria
- [x] `ASR_PROVIDER=openrouter` in `.env` → batch transcription uses OpenRouter STT
- [x] `ASR_PROVIDER=dashscope` (default) → same behavior as before (backward compat)
- [x] Invalid `ASR_PROVIDER` value → clear error at startup
- [x] Realtime WebSocket ASR dispatches to OpenRouter chunked REST when `ASR_PROVIDER=openrouter`
- [x] Realtime WebSocket ASR stays DashScope when `ASR_PROVIDER=dashscope` (backward compat)
- [x] OpenRouter transcription returns traditional Chinese (same `_to_traditional` conversion)
- [x] Error handling: network errors, HTTP errors, empty responses → clear error messages
- [x] All existing tests pass unchanged (with `ASR_PROVIDER=dashscope`)
- [x] New tests pass
- [ ] Acceptance test returns valid transcription from real OpenRouter (pending)
---
## 9. Dependencies & Risks
| Risk | Mitigation |
|------|-----------|
| OpenRouter STT latency > DashScope | Acceptable tradeoff; OpenRouter is cheaper and uses existing API key |
| OpenRouter STT not as accurate for Cantonese | Language auto-detection used (yue omitted); needs acceptance testing |
| `transcribe_full` sync→async refactor could break callers | Only one caller (`video.py`); minimal blast radius |
| No streaming/WebSocket for OpenRouter | Chunked REST (~3s) implemented; documented latency tradeoff |
| OpenRouter 60s timeout for long videos | Document limitation; large files may need chunking (future) |
| Wrong model selected (e.g., non-STT model) | Librarian research confirmed 8 supported models; `google/chirp-3` verified compatible |
| Cantonese language code unsupported by OpenRouter STT | `yue` omitted; relies on auto-detection |
---
## 10. Estimated Effort
| Task | Est. Time |
|------|-----------|
| 5.1 Config | 15 min |
| 5.2 OpenRouter provider | 30 min |
| 5.3 Refactor ASRClient | 20 min |
| 5.4 Update video router | 5 min |
| 5.5 Update .env.example | 5 min |
| 5.6 Integration test | 20 min |
| 5.7 Acceptance test | 15 min |
| **Total** | **~2 hours** |
---
## 11. Implementation Notes (2026-05-19)
### Decisions During Implementation
- **`_to_traditional` moved to `asr_providers.py`** — original plan placed it in `asr_client.py` with a cross-import, but this caused a circular import (`asr_client` → `asr_providers``asr_client`). Moved to `asr_providers.py`; `asr_client.py` re-exports for backward compatibility with `ws_asr.py`.
- **Separate `OPENROUTER_API_KEY`** — per user preference for independent accounting.
- **`DashScopeASRProvider` wraps sync OpenAI call in `loop.run_in_executor()`** — avoids blocking the event loop without rewriting the existing DashScope client.
- **Model: `google/chirp-3`** — original plan specified `google/gemini-3.1-flash-lite`, but this model is NOT in OpenRouter's supported STT model list (8 models: whisper variants, chirp-3, voxtral, qwen3-asr-flash). Changed after librarian agent verified model compatibility.
- **Realtime OpenRouter: chunked REST (~3s)** — originally out of scope ("Realtime WebSocket stays DashScope-only"). User requested OpenRouter for realtime as well. Implemented via `_ws_proxy_openrouter()`: accumulates PCM from DashScope VPR server, converts to WAV via `pcm_to_wav()`, flushes to OpenRouter REST every ~3s. Uses `flush_lock` (asyncio.Lock) to prevent concurrent API calls.
- **Language code filtering** — OpenRouter STT doesn't support ISO 639-3 codes like `yue`. The chunked handler omits the `language` parameter when `language` is `"yue"` or `"auto"`, relying on auto-detection.
- **ffmpeg binary** — replaced x86-64 binary with aarch64 static build (johnvansickle.com) for Apple Silicon Mac compatibility.
- **Diagnostic logging** — added provider selection, transcription start/complete, and error response body logging to both batch and realtime paths.
### Files Changed
| File | Action | Details |
|------|--------|---------|
| `backend/app/core/config.py` | Modified | 3 new settings + validation in `get_settings()`; default model: `google/chirp-3` |
| `backend/app/services/asr_providers.py` | **New** | `ASRProvider` ABC, `DashScopeASRProvider`, `OpenRouterASRProvider` (with tenacity retry), `create_asr_provider()` factory, `_to_traditional()` |
| `backend/app/services/asr_client.py` | Refactored | Thin wrapper; `transcribe_full` now async; re-exports `_to_traditional` for backward compat |
| `backend/app/routers/video.py` | Modified | `await transcribe_full()`; provider-aware API key validation |
| `backend/app/routers/ws_asr.py` | Modified | `pcm_to_wav()`, `_ws_proxy_openrouter()` (3s chunked REST), endpoint dispatch on `ASR_PROVIDER` |
| `backend/.env.example` | Modified | Phase 5 vars with usage comments; default: `google/chirp-3` |
| `backend/requirements.txt` | Modified | Added `tenacity>=8.0.0` |
### Test Files
| File | Tests | Status |
|------|-------|--------|
| `test_phase5_config.py` | 6 | ✅ |
| `test_phase5_openrouter_provider.py` | 14 | ✅ |
| `test_phase5_integration.py` | 4 | ✅ |
| `test_phase2_asr_client.py` | 19 (3 updated) | ✅ |
| `test_phase2_full_transcript.py` | 6 (updated fixtures) | ✅ |
| `test_integration_phase2.py` | 7 (updated fixtures) | ✅ |
### Pre-existing Test Failures (Unrelated)
- Phase 3: `test_phase3_history_service.py`, `test_phase3_prompt_injection.py`, `test_phase3_prompt_service.py`, `test_phase3_prompts_router.py` — pre-existing failures in SQLite/prompt tests unrelated to ASR changes.
- Phase 1: 1 config test — pre-existing, unrelated.
- Phase 2 WS: 1 `test_phase2_ws_timeout` — pre-existing timeout, unrelated.