import asyncio import logging import time from typing import Any from urllib.parse import quote import yt_dlp logger = logging.getLogger(__name__) class YouTubeService: def __init__(self, timeout: int, cache_ttl: int): self.timeout = timeout self.cache_ttl = cache_ttl self._cache: dict[str, tuple[float, dict]] = {} async def extract_streams(self, url: str) -> dict: now = time.monotonic() if url in self._cache: cached_at, cached_data = self._cache[url] is_live = cached_data.get("is_live", False) ttl = self.cache_ttl if is_live else self.cache_ttl * 6 if now - cached_at < ttl: logger.debug("Cache hit for URL=%s age=%.1fs", url, now - cached_at) return cached_data logger.debug("Cache expired for URL=%s", url) try: loop = asyncio.get_running_loop() info = await loop.run_in_executor(None, lambda: self._extract_sync(url)) except yt_dlp.utils.DownloadError as e: logger.warning("yt-dlp extraction failed for URL=%s: %s", url, e) return {"error": str(e)[:500], "video_id": "", "title": "", "formats": []} live_status = info.get("live_status", "not_live") is_live = live_status == "is_live" is_upcoming = live_status == "is_upcoming" result = { "video_id": info.get("id", ""), "title": info.get("title", ""), "is_live": is_live, "is_upcoming": is_upcoming, "thumbnail_url": info.get("thumbnail"), "formats": info.get("formats", []), "error": None, } if not is_upcoming and info.get("formats"): try: video_fmt, audio_fmt = self._select_best_formats(info["formats"]) result["video_proxy_url"] = self._build_proxy_url(video_fmt["url"]) result["audio_proxy_url"] = self._build_proxy_url(audio_fmt["url"]) except ValueError as e: result["error"] = str(e) ttl = self.cache_ttl if is_live else self.cache_ttl * 6 self._cache[url] = (now, result) return result def _extract_sync(self, url: str) -> dict: opts = self._get_ydl_opts(url) with yt_dlp.YoutubeDL(opts) as ydl: return ydl.extract_info(url, download=False) def _get_ydl_opts(self, url: str) -> dict: opts: dict[str, Any] = { "quiet": True, "no_warnings": True, "extract_flat": False, } return opts def _select_best_formats(self, formats: list[dict]) -> tuple[dict, dict]: video_only = [ f for f in formats if f.get("vcodec", "none") != "none" and f.get("acodec", "none") == "none" ] audio_only = [ f for f in formats if f.get("acodec", "none") != "none" and f.get("vcodec", "none") == "none" ] combined = [ f for f in formats if f.get("vcodec", "none") != "none" and f.get("acodec", "none") != "none" ] has_content = bool(combined or video_only or audio_only) if not has_content: raise ValueError("No streamable formats found") if video_only and audio_only: video_fmt = self._pick_best_video(video_only) audio_fmt = max(audio_only, key=lambda f: f.get("abr") or 0) return video_fmt, audio_fmt if combined and audio_only: combined_sorted = sorted(combined, key=lambda f: f.get("height") or 9999) return combined_sorted[0], audio_only[0] if combined: best_combined = self._pick_best_video(combined) return best_combined, best_combined if video_only: raise ValueError("No streamable audio format found") raise ValueError("No streamable video format found") def _pick_best_video(self, candidates: list[dict]) -> dict: def _sort_key(f: dict) -> tuple[int, int, int, int]: height = f.get("height") or 9999 tbr = f.get("tbr") or 0 is_m3u8 = 0 if f.get("protocol") in ("m3u8_native", "m3u8") else 1 at_or_under_480 = 0 if height <= 480 else 1 if at_or_under_480 == 0: return (0, is_m3u8, -height, -tbr) return (1, is_m3u8, height, -tbr) return sorted(candidates, key=_sort_key)[0] def _build_proxy_url(self, upstream_url: str) -> str: encoded = quote(upstream_url, safe="") return f"/api/v1/youtube/proxy/manifest.m3u8?url={encoded}"