129 lines
4.6 KiB
Python
129 lines
4.6 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Any
|
|
from urllib.parse import quote
|
|
|
|
import yt_dlp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class YouTubeService:
|
|
def __init__(self, timeout: int, cache_ttl: int):
|
|
self.timeout = timeout
|
|
self.cache_ttl = cache_ttl
|
|
self._cache: dict[str, tuple[float, dict]] = {}
|
|
|
|
async def extract_streams(self, url: str) -> dict:
|
|
now = time.monotonic()
|
|
if url in self._cache:
|
|
cached_at, cached_data = self._cache[url]
|
|
is_live = cached_data.get("is_live", False)
|
|
ttl = self.cache_ttl if is_live else self.cache_ttl * 6
|
|
if now - cached_at < ttl:
|
|
logger.debug("Cache hit for URL=%s age=%.1fs", url, now - cached_at)
|
|
return cached_data
|
|
logger.debug("Cache expired for URL=%s", url)
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
info = await loop.run_in_executor(None, lambda: self._extract_sync(url))
|
|
except yt_dlp.utils.DownloadError as e:
|
|
logger.warning("yt-dlp extraction failed for URL=%s: %s", url, e)
|
|
return {"error": str(e)[:500], "video_id": "", "title": "", "formats": []}
|
|
|
|
live_status = info.get("live_status", "not_live")
|
|
is_live = live_status == "is_live"
|
|
is_upcoming = live_status == "is_upcoming"
|
|
|
|
result = {
|
|
"video_id": info.get("id", ""),
|
|
"title": info.get("title", ""),
|
|
"is_live": is_live,
|
|
"is_upcoming": is_upcoming,
|
|
"thumbnail_url": info.get("thumbnail"),
|
|
"formats": info.get("formats", []),
|
|
"error": None,
|
|
}
|
|
|
|
if not is_upcoming and info.get("formats"):
|
|
try:
|
|
video_fmt, audio_fmt = self._select_best_formats(info["formats"])
|
|
result["video_proxy_url"] = self._build_proxy_url(video_fmt["url"])
|
|
result["audio_proxy_url"] = self._build_proxy_url(audio_fmt["url"])
|
|
except ValueError as e:
|
|
result["error"] = str(e)
|
|
|
|
ttl = self.cache_ttl if is_live else self.cache_ttl * 6
|
|
self._cache[url] = (now, result)
|
|
return result
|
|
|
|
def _extract_sync(self, url: str) -> dict:
|
|
opts = self._get_ydl_opts(url)
|
|
with yt_dlp.YoutubeDL(opts) as ydl:
|
|
return ydl.extract_info(url, download=False)
|
|
|
|
def _get_ydl_opts(self, url: str) -> dict:
|
|
opts: dict[str, Any] = {
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"extract_flat": False,
|
|
}
|
|
return opts
|
|
|
|
def _select_best_formats(self, formats: list[dict]) -> tuple[dict, dict]:
|
|
video_only = [
|
|
f
|
|
for f in formats
|
|
if f.get("vcodec", "none") != "none" and f.get("acodec", "none") == "none"
|
|
]
|
|
audio_only = [
|
|
f
|
|
for f in formats
|
|
if f.get("acodec", "none") != "none" and f.get("vcodec", "none") == "none"
|
|
]
|
|
combined = [
|
|
f
|
|
for f in formats
|
|
if f.get("vcodec", "none") != "none"
|
|
and f.get("acodec", "none") != "none"
|
|
]
|
|
|
|
has_content = bool(combined or video_only or audio_only)
|
|
if not has_content:
|
|
raise ValueError("No streamable formats found")
|
|
|
|
if video_only and audio_only:
|
|
video_fmt = self._pick_best_video(video_only)
|
|
audio_fmt = max(audio_only, key=lambda f: f.get("abr") or 0)
|
|
return video_fmt, audio_fmt
|
|
|
|
if combined and audio_only:
|
|
combined_sorted = sorted(combined, key=lambda f: f.get("height") or 9999)
|
|
return combined_sorted[0], audio_only[0]
|
|
|
|
if combined:
|
|
best_combined = self._pick_best_video(combined)
|
|
return best_combined, best_combined
|
|
|
|
if video_only:
|
|
raise ValueError("No streamable audio format found")
|
|
raise ValueError("No streamable video format found")
|
|
|
|
def _pick_best_video(self, candidates: list[dict]) -> dict:
|
|
def _sort_key(f: dict) -> tuple[int, int, int, int]:
|
|
height = f.get("height") or 9999
|
|
tbr = f.get("tbr") or 0
|
|
is_m3u8 = 0 if f.get("protocol") in ("m3u8_native", "m3u8") else 1
|
|
at_or_under_480 = 0 if height <= 480 else 1
|
|
if at_or_under_480 == 0:
|
|
return (0, is_m3u8, -height, -tbr)
|
|
return (1, is_m3u8, height, -tbr)
|
|
|
|
return sorted(candidates, key=_sort_key)[0]
|
|
|
|
def _build_proxy_url(self, upstream_url: str) -> str:
|
|
encoded = quote(upstream_url, safe="")
|
|
return f"/api/v1/youtube/proxy/manifest.m3u8?url={encoded}"
|