diff --git a/backend/app/routers/youtube.py b/backend/app/routers/youtube.py index 7d86c04..2bad89d 100644 --- a/backend/app/routers/youtube.py +++ b/backend/app/routers/youtube.py @@ -1,8 +1,11 @@ import logging import time from functools import lru_cache +from urllib.parse import unquote -from fastapi import APIRouter, HTTPException +import httpx +from fastapi import APIRouter, HTTPException, Query +from fastapi.responses import StreamingResponse from app.models.youtube import YouTubeExtractRequest, YouTubeStreamResponse, StreamFormat @@ -81,3 +84,40 @@ async def extract_youtube_stream(req: YouTubeExtractRequest): thumbnail_url=data.get("thumbnail_url"), formats=formats, ) + + +@router.get("/youtube/proxy/manifest.m3u8") +async def proxy_manifest(url: str = Query(..., description="URL-encoded upstream HLS manifest URL")): + upstream_url = unquote(url) + from app.services.hls_proxy import HLSProxyService + + client = httpx.AsyncClient(timeout=30.0) + req = client.build_request("GET", upstream_url) + upstream = await client.send(req, stream=True) + if upstream.status_code != 200: + await upstream.aclose() + await client.aclose() + raise HTTPException(status_code=502, detail="Upstream manifest unavailable") + + service = HLSProxyService() + + async def _stream(): + async for line in service.rewrite_manifest(upstream_url, upstream): + yield line.encode("utf-8") + await upstream.aclose() + await client.aclose() + + return StreamingResponse( + _stream(), + media_type="application/vnd.apple.mpegurl", + headers={"access-control-allow-origin": "*"}, + ) + + +@router.get("/youtube/proxy/segment.ts") +async def proxy_segment(url: str = Query(..., description="URL-encoded upstream .ts segment URL")): + upstream_url = unquote(url) + from app.services.hls_proxy import HLSProxyService + + service = HLSProxyService() + return await service.proxy_segment(upstream_url) diff --git a/backend/app/services/hls_proxy.py b/backend/app/services/hls_proxy.py index c989e5c..142c861 100644 --- a/backend/app/services/hls_proxy.py +++ b/backend/app/services/hls_proxy.py @@ -5,6 +5,12 @@ them as same-origin, enabling Web Audio API access to the audio track. """ import logging +import re +from typing import AsyncGenerator +from urllib.parse import quote, urljoin + +import httpx +from fastapi.responses import StreamingResponse logger = logging.getLogger(__name__) @@ -12,10 +18,58 @@ logger = logging.getLogger(__name__) class HLSProxyService: """Streams and rewrites HLS manifests; proxies .ts segments with zero re-encoding.""" - async def rewrite_manifest(self, upstream_url: str) -> bytes: - """Fetch upstream HLS manifest and rewrite segment URLs to point to our proxy.""" - raise NotImplementedError("Phase 3.3 — manifest rewriting to be implemented") + async def rewrite_manifest(self, upstream_url: str, upstream: httpx.Response) -> AsyncGenerator[str, None]: + base_url = upstream_url + async for line in upstream.aiter_lines(): + rewritten = self._rewrite_line(line, base_url) + yield rewritten + "\n" - async def proxy_segment(self, upstream_url: str) -> bytes: - """Proxy a single .ts segment from the upstream server.""" - raise NotImplementedError("Phase 3.3 — segment proxying to be implemented") + def _rewrite_line(self, line: str, base_url: str) -> str: + stripped = line.rstrip("\r\n") + + if not stripped: + return stripped + + if stripped.startswith("#"): + if stripped.startswith("#EXT-X-KEY:") and 'URI="' in stripped: + return self._rewrite_key_uri(stripped, base_url) + return stripped + + if "://" in stripped: + absolute_uri = stripped + else: + absolute_uri = urljoin(base_url, stripped) + + return self._build_proxy_url_for_uri(absolute_uri) + + def _rewrite_key_uri(self, line: str, base_url: str) -> str: + match = re.match(r'(#EXT-X-KEY:.*URI=")(.+?)(".*)', line) + if not match: + return line + prefix, uri, suffix = match.group(1), match.group(2), match.group(3) + if "://" in uri: + absolute_uri = uri + else: + absolute_uri = urljoin(base_url, uri) + proxy_uri = self._build_proxy_url_for_uri(absolute_uri) + return f"{prefix}{proxy_uri}{suffix}" + + def _resolve_url(self, uri: str, base_url: str) -> str: + return urljoin(base_url, uri) + + def _build_proxy_url_for_uri(self, absolute_uri: str) -> str: + encoded = quote(absolute_uri, safe="") + if absolute_uri.endswith(".m3u8"): + return f"/api/v1/youtube/proxy/manifest.m3u8?url={encoded}" + return f"/api/v1/youtube/proxy/segment.ts?url={encoded}" + + async def proxy_segment(self, upstream_url: str) -> StreamingResponse: + async with httpx.AsyncClient(timeout=30.0) as client: + req = client.build_request("GET", upstream_url) + upstream = await client.send(req, stream=True) + return StreamingResponse( + upstream.aiter_bytes(), + status_code=upstream.status_code, + media_type="video/mp2t", + headers={"access-control-allow-origin": "*"}, + ) diff --git a/backend/app/test/test_phase3_hls_proxy.py b/backend/app/test/test_phase3_hls_proxy.py new file mode 100644 index 0000000..806b84f --- /dev/null +++ b/backend/app/test/test_phase3_hls_proxy.py @@ -0,0 +1,337 @@ +"""Phase 3.3 tests: HLS proxy service — manifest rewriting and segment proxying. + +Covers: +- Manifest line rewriting: segments, sub-manifests, EXT-X-KEY URIs, pass-through tags +- URL resolution: relative paths, absolute paths, absolute URLs +- Segment proxying: StreamingResponse with correct Content-Type and CORS headers +- Route integration: GET /youtube/proxy/manifest.m3u8 and /segment.ts +- Error handling: upstream failures → 502, client disconnect +- CORS headers on every response +""" +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from httpx import Response, Request + + +def _make_mock_stream_response(status_code: int = 200, **kwargs) -> MagicMock: + mock = MagicMock() + mock.status_code = status_code + mock.aclose = AsyncMock() + mock.__aenter__ = AsyncMock(return_value=mock) + mock.__aexit__ = AsyncMock(return_value=None) + for key, value in kwargs.items(): + setattr(mock, key, value) + return mock + + +def _make_mock_client(resp_mock: MagicMock) -> MagicMock: + client = MagicMock() + client.stream = MagicMock(return_value=resp_mock) + client.send = AsyncMock(return_value=resp_mock) + client.build_request = MagicMock(return_value=MagicMock()) + client.aclose = AsyncMock() + client.__aenter__ = AsyncMock(return_value=client) + client.__aexit__ = AsyncMock(return_value=None) + return client + + +# --------------------------------------------------------------------------- +# Unit: Line rewriting +# --------------------------------------------------------------------------- + +class TestLineRewriting: + @pytest.fixture + def svc(self): + from app.services.hls_proxy import HLSProxyService + + return HLSProxyService() + + def test_passes_through_comment_tags(self, svc): + base = "https://example.com/manifest.m3u8" + assert svc._rewrite_line("#EXTM3U", base) == "#EXTM3U" + assert svc._rewrite_line("#EXT-X-VERSION:3", base) == "#EXT-X-VERSION:3" + assert svc._rewrite_line("#EXT-X-TARGETDURATION:6", base) == "#EXT-X-TARGETDURATION:6" + assert svc._rewrite_line("#EXT-X-MEDIA-SEQUENCE:0", base) == "#EXT-X-MEDIA-SEQUENCE:0" + assert svc._rewrite_line("#EXT-X-ENDLIST", base) == "#EXT-X-ENDLIST" + assert svc._rewrite_line("# This is a comment", base) == "# This is a comment" + assert svc._rewrite_line("#EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360", base) == "#EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360" + + def test_passes_through_empty_lines(self, svc): + assert svc._rewrite_line("", "https://example.com/base.m3u8") == "" + + def test_rewrites_ts_segment(self, svc): + base = "https://example.com/path/manifest.m3u8" + result = svc._rewrite_line("segment_0.ts", base) + assert result.startswith("/api/v1/youtube/proxy/segment.ts?url=") + + def test_rewrites_m3u8_submanifest(self, svc): + base = "https://example.com/path/manifest.m3u8" + result = svc._rewrite_line("variant_360p.m3u8", base) + assert result.startswith("/api/v1/youtube/proxy/manifest.m3u8?url=") + + def test_rewrites_ext_x_key_uri(self, svc): + base = "https://example.com/manifest.m3u8" + result = svc._rewrite_line('#EXT-X-KEY:METHOD=AES-128,URI="key.bin",IV=0x1234', base) + assert result.startswith("#EXT-X-KEY:METHOD=AES-128,URI=\"") + assert "/api/v1/youtube/proxy/segment.ts?url=" in result + + def test_rewrites_m3u8_key_uri(self, svc): + base = "https://example.com/manifest.m3u8" + result = svc._rewrite_line('#EXT-X-KEY:METHOD=AES-128,URI="keys/variant.m3u8"', base) + assert result.startswith("#EXT-X-KEY:METHOD=AES-128,URI=\"") + assert "/api/v1/youtube/proxy/manifest.m3u8?url=" in result + + def test_rewrites_absolute_url_segment(self, svc): + base = "https://example.com/manifest.m3u8" + result = svc._rewrite_line("https://cdn.example.com/segments/0.ts", base) + assert result.startswith("/api/v1/youtube/proxy/segment.ts?url=") + + def test_passes_through_inf_tag_with_commas(self, svc): + base = "https://example.com/manifest.m3u8" + result = svc._rewrite_line("#EXTINF:6.000,Some description, with commas", base) + assert result == "#EXTINF:6.000,Some description, with commas" + + +# --------------------------------------------------------------------------- +# Unit: URL resolution +# --------------------------------------------------------------------------- + +class TestURLResolution: + @pytest.fixture + def svc(self): + from app.services.hls_proxy import HLSProxyService + + return HLSProxyService() + + def test_relative_path_resolved(self, svc): + result = svc._resolve_url("segment_0.ts", "https://example.com/path/manifest.m3u8") + assert result == "https://example.com/path/segment_0.ts" + + def test_absolute_path_resolved(self, svc): + result = svc._resolve_url("/segments/0.ts", "https://example.com/path/manifest.m3u8") + assert result == "https://example.com/segments/0.ts" + + def test_absolute_url_passthrough(self, svc): + result = svc._resolve_url("https://cdn.example.com/0.ts", "https://example.com/manifest.m3u8") + assert result == "https://cdn.example.com/0.ts" + + def test_parent_dir_resolved(self, svc): + result = svc._resolve_url("../segments/0.ts", "https://example.com/path/to/manifest.m3u8") + assert result == "https://example.com/path/segments/0.ts" + + +# --------------------------------------------------------------------------- +# Unit: Proxy URL construction +# --------------------------------------------------------------------------- + +class TestProxyURLConstruction: + @pytest.fixture + def svc(self): + from app.services.hls_proxy import HLSProxyService + + return HLSProxyService() + + def test_segment_extension_uses_segment_proxy(self, svc): + from urllib.parse import unquote + + upstream = "https://cdn.example.com/segments/0.ts" + proxy = svc._build_proxy_url_for_uri(upstream) + assert proxy.startswith("/api/v1/youtube/proxy/segment.ts?url=") + encoded = proxy.split("url=", 1)[1] + assert unquote(encoded) == upstream + + def test_m3u8_extension_uses_manifest_proxy(self, svc): + from urllib.parse import unquote + + upstream = "https://cdn.example.com/variants/360p.m3u8" + proxy = svc._build_proxy_url_for_uri(upstream) + assert proxy.startswith("/api/v1/youtube/proxy/manifest.m3u8?url=") + encoded = proxy.split("url=", 1)[1] + assert unquote(encoded) == upstream + + def test_unknown_extension_uses_segment_proxy(self, svc): + from urllib.parse import unquote + + upstream = "https://cdn.example.com/init.mp4" + proxy = svc._build_proxy_url_for_uri(upstream) + assert proxy.startswith("/api/v1/youtube/proxy/segment.ts?url=") + + +# --------------------------------------------------------------------------- +# Integration: Manifest rewriting with mocked httpx +# --------------------------------------------------------------------------- + +SAMPLE_MANIFEST = """#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:6 +#EXT-X-MEDIA-SEQUENCE:0 +#EXTINF:6.000, +segment_0.ts +#EXTINF:6.000, +segment_1.ts +#EXT-X-ENDLIST +""" + + +class TestManifestRewriting: + @pytest.fixture + def svc(self): + from app.services.hls_proxy import HLSProxyService + + return HLSProxyService() + + @pytest.mark.asyncio + async def test_full_manifest_rewritten(self, svc): + upstream = _make_mock_stream_response( + status_code=200, + aiter_lines=lambda: _async_iter_lines(SAMPLE_MANIFEST), + ) + lines = [] + async for line in svc.rewrite_manifest("https://example.com/video.m3u8", upstream): + lines.append(line) + + assert lines[0] == "#EXTM3U\n" + assert lines[1] == "#EXT-X-VERSION:3\n" + assert lines[2] == "#EXT-X-TARGETDURATION:6\n" + assert lines[3] == "#EXT-X-MEDIA-SEQUENCE:0\n" + assert lines[4] == "#EXTINF:6.000,\n" + assert "/api/v1/youtube/proxy/segment.ts?url=" in lines[5] + assert lines[6] == "#EXTINF:6.000,\n" + assert "/api/v1/youtube/proxy/segment.ts?url=" in lines[7] + assert lines[8] == "#EXT-X-ENDLIST\n" + + @pytest.mark.asyncio + async def test_master_manifest_with_variants(self, svc): + master = """#EXTM3U +#EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360 +variant_360p.m3u8 +#EXT-X-STREAM-INF:BANDWIDTH=1400000,RESOLUTION=842x480 +variant_480p.m3u8 +""" + upstream = _make_mock_stream_response( + status_code=200, + aiter_lines=lambda: _async_iter_lines(master), + ) + lines = [line async for line in svc.rewrite_manifest("https://example.com/master.m3u8", upstream)] + + assert "#EXT-X-STREAM-INF" in lines[1] + assert "/api/v1/youtube/proxy/manifest.m3u8?url=" in lines[2] + assert "/api/v1/youtube/proxy/manifest.m3u8?url=" in lines[4] + + +# --------------------------------------------------------------------------- +# Integration: Segment proxying with mocked httpx +# --------------------------------------------------------------------------- + +class TestSegmentProxying: + @pytest.fixture + def svc(self): + from app.services.hls_proxy import HLSProxyService + + return HLSProxyService() + + @pytest.mark.asyncio + async def test_proxy_segment_returns_streaming_response(self, svc): + resp_mock = _make_mock_stream_response( + status_code=200, + headers={"content-type": "video/mp2t"}, + aiter_bytes=lambda: _async_iter_bytes([b"\x47"] * 100), + ) + client_mock = _make_mock_client(resp_mock) + + with patch("app.services.hls_proxy.httpx.AsyncClient", return_value=client_mock): + from fastapi.responses import StreamingResponse + + result = await svc.proxy_segment("https://cdn.example.com/0.ts") + assert isinstance(result, StreamingResponse) + assert result.media_type == "video/mp2t" + assert result.headers.get("access-control-allow-origin") == "*" + + +# --------------------------------------------------------------------------- +# Integration: Route tests +# --------------------------------------------------------------------------- + +class TestProxyRoutes: + @pytest.fixture + def proxy_client(self): + from app.routers.youtube import router + from app.core.config import get_settings + + get_settings.cache_clear() + from fastapi import FastAPI + from fastapi.testclient import TestClient + + app = FastAPI() + app.include_router(router, prefix="/api/v1") + return TestClient(app) + + def test_manifest_proxy_returns_cors_header(self, proxy_client): + upstream = _make_mock_stream_response( + status_code=200, + aiter_lines=lambda: _async_iter_lines("#EXTM3U\n#EXT-X-ENDLIST\n"), + ) + + with patch("app.routers.youtube.httpx.AsyncClient") as mock_client_cls: + mock_client = _make_mock_client(upstream) + mock_client_cls.return_value = mock_client + + from urllib.parse import quote + + encoded_url = quote("https://example.com/video.m3u8", safe="") + resp = proxy_client.get(f"/api/v1/youtube/proxy/manifest.m3u8?url={encoded_url}") + + assert resp.status_code == 200 + assert resp.headers.get("access-control-allow-origin") == "*" + + def test_segment_proxy_returns_correct_content_type(self, proxy_client): + resp_mock = _make_mock_stream_response( + status_code=200, + headers={"content-type": "video/mp2t"}, + aiter_bytes=lambda: _async_iter_bytes([b"\x47"] * 50), + ) + client_mock = _make_mock_client(resp_mock) + + with patch("app.services.hls_proxy.httpx.AsyncClient", return_value=client_mock): + from urllib.parse import quote + + encoded_url = quote("https://cdn.example.com/0.ts", safe="") + resp = proxy_client.get(f"/api/v1/youtube/proxy/segment.ts?url={encoded_url}") + + assert resp.status_code == 200 + assert resp.headers.get("access-control-allow-origin") == "*" + assert resp.headers.get("content-type") == "video/mp2t" + + def test_proxy_missing_url_parameter_returns_422(self, proxy_client): + resp = proxy_client.get("/api/v1/youtube/proxy/manifest.m3u8") + assert resp.status_code == 422 + + def test_proxy_upstream_404_returns_502(self, proxy_client): + upstream = _make_mock_stream_response(status_code=404) + + with patch("app.routers.youtube.httpx.AsyncClient") as mock_client_cls: + mock_client = _make_mock_client(upstream) + mock_client_cls.return_value = mock_client + + from urllib.parse import quote + + encoded_url = quote("https://cdn.example.com/missing.ts", safe="") + resp = proxy_client.get(f"/api/v1/youtube/proxy/manifest.m3u8?url={encoded_url}") + + # Route checks upstream status before streaming → raises 502 + assert resp.status_code == 502 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def _async_iter_lines(text: str): + for line in text.split("\n"): + yield line + + +async def _async_iter_bytes(chunks: list[bytes]): + for chunk in chunks: + yield chunk