import json import asyncio import base64 import logging from fastapi import APIRouter, WebSocket, WebSocketDisconnect from app.core.config import get_settings from app.services.asr_client import float32_to_s16le, build_display_text, _to_traditional logger = logging.getLogger(__name__) router = APIRouter(tags=["asr"]) try: from dashscope.audio.qwen_omni.omni_realtime import ( OmniRealtimeConversation, OmniRealtimeCallback, TranscriptionParams, MultiModality, ) except ImportError: OmniRealtimeConversation = None OmniRealtimeCallback = object TranscriptionParams = None MultiModality = None class DashScopeCallback(OmniRealtimeCallback): def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): super().__init__() self._queue = event_queue self._loop = loop def on_open(self): logger.info("DashScope realtime connection opened") def on_event(self, message): try: event = json.loads(message) if isinstance(message, str) else message self._loop.call_soon_threadsafe(self._queue.put_nowait, event) except Exception as e: logger.error("DashScope callback error: %s", e) def on_close(self, code, msg): logger.info("DashScope realtime closed: code=%s msg=%s", code, msg) def format_transcription_event(event: dict, accumulated: str) -> dict | None: event_type = event.get("type", "") if event_type == "conversation.item.input_audio_transcription.text": stash = event.get("stash", "") display = build_display_text(accumulated, stash) if stash else accumulated return { "delta": "", "full_text": _to_traditional(display), "language": event.get("language", "yue"), "is_final": False, } if event_type == "conversation.item.input_audio_transcription.completed": transcript = event.get("transcript", "") new_accumulated = build_display_text(accumulated, transcript) if transcript and transcript.strip() else accumulated return { "delta": "", "full_text": _to_traditional(new_accumulated), "language": event.get("language", "yue"), "is_final": True, } return None async def _ws_proxy_dashscope(client_ws: WebSocket, loop: asyncio.AbstractEventLoop, language: str = "yue"): event_queue: asyncio.Queue = asyncio.Queue() callback = DashScopeCallback(event_queue, loop) conversation = OmniRealtimeConversation( model=get_settings().asr_realtime_model_name, url="wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime", callback=callback, ) await loop.run_in_executor(None, conversation.connect) transcription_kwargs: dict = { "sample_rate": 16000, "input_audio_format": "pcm", } if language != "auto": transcription_kwargs["language"] = language transcription_params = TranscriptionParams(**transcription_kwargs) conversation.update_session( output_modalities=[MultiModality.TEXT], enable_input_audio_transcription=True, transcription_params=transcription_params, ) logger.info("dashscope-session-updated lang=%s", language) accumulated_text = "" async def read_events(): nonlocal accumulated_text while True: event = await event_queue.get() result = format_transcription_event(event, accumulated_text) if result is not None: if result["is_final"]: event_type = event.get("type", "") if event_type == "conversation.item.input_audio_transcription.completed": transcript = event.get("transcript", "") if transcript and transcript.strip(): accumulated_text = build_display_text(accumulated_text, transcript) result["full_text"] = _to_traditional(accumulated_text) await client_ws.send_json(result) read_task = asyncio.create_task(read_events()) try: while True: float32_bytes = await client_ws.receive_bytes() s16_bytes = float32_to_s16le(float32_bytes) audio_b64 = base64.b64encode(s16_bytes).decode("ascii") conversation.append_audio(audio_b64) except WebSocketDisconnect: pass finally: read_task.cancel() try: conversation.close() except Exception: pass logger.info("dashscope-session-closed text_len=%d", len(accumulated_text)) @router.websocket("/ws/asr/{video_id}") async def ws_asr_endpoint(websocket: WebSocket, video_id: str, language: str = "yue"): settings = get_settings() if not settings.dashscope_api_key: await websocket.accept() await websocket.send_json({"error": "DASHSCOPE_API_KEY is not configured"}) await websocket.close(code=1011, reason="DASHSCOPE_API_KEY not set") return await websocket.accept() loop = asyncio.get_event_loop() logger.info("ws-connect video_id=%s lang=%s", video_id, language) try: await _ws_proxy_dashscope(websocket, loop, language) except Exception as e: logger.error("ws-asr error: %s", e) finally: logger.info("ws-disconnect video_id=%s", video_id)