From a468a7a2680ba90ce2db5e12ec203777dceefc46 Mon Sep 17 00:00:00 2001 From: Mikhail Yevchenko Date: Wed, 1 Apr 2026 20:41:52 +0000 Subject: [PATCH] Add docker support --- .devcontainer/devcontainer.json | 12 + .dockerignore | 6 + AGENTS.md | 7 +- Dockerfile | 18 ++ README.md | 44 ++++ app.py | 83 +++---- dlp.py | 296 ++++++++++++----------- docker-compose.yml | 31 +++ requirements.txt | 1 + templates/index.html | 2 +- tests/test_integration.py | 96 ++++++++ tests/test_proxy.py | 416 -------------------------------- utils.py | 14 ++ 13 files changed, 417 insertions(+), 609 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 tests/test_integration.py delete mode 100644 tests/test_proxy.py diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 6e18747..84caf63 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,6 +3,18 @@ "image": "mcr.microsoft.com/devcontainers/base:trixie", + "features": { + "ghcr.io/devcontainers/features/python:1.8.0": { + "toolsToInstall": "flake8,virtualenv,pytest,pylint" + }, + "ghcr.io/devcontainers-extra/features/apt-get-packages:1.0.8": { + "packages": "ffmpeg,nodejs" + }, + "ghcr.io/devcontainers/features/docker-in-docker:2.16.1": { + "moby": false + } + }, + "runArgs": ["--add-host=ollama:host-gateway"], "containerEnv": { diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..0faa2c8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +* + +!*.py +!templates/* +!tests/* +!requirements.txt diff --git a/AGENTS.md b/AGENTS.md index 3fcb619..73a1cc6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -35,8 +35,13 @@ As an HTML templating engine, you can use Jinja2, which is built into Flask, for 9. Configuration only through environment variables: port, cache TTL, log level and timeouts. 10. HTTPS not in application: TLS terminates at external reverse proxy (Nginx/Caddy/Traefik), Flask runs behind it. 11. TDD: Write a single integration test that will consist of downloading few video urls. It should query these videos over proxy and check if it works properly (yt-dlp is fully capable substitute for a browser that can be configured to output all necessary debug inforation, such as headers and cookies). Also write tests for critical functions like URL parsing, caching, playlist and segment proxying, and error handling. All test should be in `tests/` folder and use `pytest` as a testing framework. All tests should generate maximum debugging output to make it easy to understand what went wrong in case of failure. +12. yt-dlp usage restriction (critical): +- yt-dlp MUST be used strictly as a Python library (`import yt_dlp`). +- DO NOT invoke yt-dlp via CLI (`yt-dlp` binary or `python -m yt_dlp`) anywhere in the application or tests. +- Integration tests MUST simulate playback using the library or HTTP requests through the proxy, not by spawning yt-dlp subprocesses. +- Any use of subprocess to call yt-dlp is considered a violation of architecture. 12. Documentation and license: only `README.md`, `AGENTS.md` and MIT license. - + ### Common Pitfalls 1. Do not disable tests or skip critical paths. If something is not working, fix it instead of skipping tests. diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..38b42e7 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.14.3-alpine + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Install ffmpeg for HLS handling +RUN apk add --no-cache ffmpeg nodejs + +# Copy application +COPY . . + +EXPOSE 5000 + +# Use production WSGI server +CMD ["gunicorn", "-w", "1", "-b", "0.0.0.0:5000", "--timeout", "60", "--access-logfile", "-", "--error-logfile", "-", "--log-level", "info", "app:app"] diff --git a/README.md b/README.md index 8c19506..343ed75 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,8 @@ A simple Flask proxy server that uses yt-dlp to fetch HLS streams and serves the ## Quick Start +### Option 1: Direct Python + ```bash pip install -r requirements.txt python app.py @@ -20,6 +22,24 @@ python app.py Visit http://localhost:5000 and enter a video URL. +### Option 2: Docker + +```bash +# Build and run +docker-compose up -d + +# Or pull from GitHub Container Registry (if available) +docker pull ghcr.io/yourusername/yt-dlp-proxy:latest +docker run -p 5000:5000 ghcr.io/yourusername/yt-dlp-proxy:latest +``` + +### Option 3: Docker Build + +```bash +docker build -t yt-dlp-proxy . +docker run -p 5000:5000 yt-dlp-proxy +``` + ## Configuration | Variable | Default | Description | @@ -32,6 +52,30 @@ Visit http://localhost:5000 and enter a video URL. | ALLOWED_DOMAINS | youtube.com,youtu.be,pornhub.com,xvideos.com | Allowed video domains | | ALLOW_LOCAL | true | Allow localhost/127.0.0.1 URLs (for testing) | +### Docker Environment Variables + +```bash +docker run -e PORT=5000 -e LOG_LEVEL=INFO -p 5000:5000 yt-dlp-proxy +``` + +### Docker Compose Example + +```yaml +version: '3.8' + +services: + yt-dlp-proxy: + image: yt-dlp-proxy + ports: + - "5000:5000" + environment: + - PORT=5000 + - LOG_LEVEL=INFO + - CACHE_TTL=31536000 + - ALLOWED_DOMAINS=youtube.com,youtu.be,pornhub.com,xvideos.com + restart: unless-stopped +``` + ## Routes - `/` - Home page with video URL input diff --git a/app.py b/app.py index 4d62d1a..198118b 100644 --- a/app.py +++ b/app.py @@ -4,7 +4,7 @@ from flask import Flask, render_template, request, Response, abort, jsonify from werkzeug.exceptions import HTTPException import dlp -from utils import is_valid_url, get_error_message +from utils import is_valid_url, get_error_message, get_video_id, resolve_video_id app = Flask(__name__) @@ -34,14 +34,9 @@ def player(): try: stream_info = dlp.get_stream_info(video_url) - from urllib.parse import quote - - # URL encode for path (use -- as delimiter) - encoded_url = quote(video_url, safe="") - - # Only set HLS URL if we actually have HLS + video_id = get_video_id(video_url) hls_url = stream_info.get("hls_url") - proxy_hls_url = f"/hls/{encoded_url}--index.m3u8" if hls_url else None + proxy_hls_url = f"/hls/{video_id}/index.m3u8" if hls_url else None return render_template( "player.html", @@ -86,49 +81,17 @@ def player(): abort(500, description=str(e)) -@app.route("/hls/") -def hls_proxy(full_path): +@app.route("/hls//index.m3u8") +def hls_index(video_id): try: - from urllib.parse import unquote - - # Split: last part is filename, rest is video URL - # Format: /hls// - # Since / is ambiguous (in URL and in video URL), we use a delimiter - # Format: /hls/-- - - if "--" not in full_path: - abort(400, description="Invalid path format") - - parts = full_path.rsplit("--", 1) - if len(parts) != 2: - abort(400, description="Invalid path format") - - encoded_video_url = parts[0] - filename = parts[1] - - # Decode the video URL - video_url = unquote(encoded_video_url) + video_url = resolve_video_id(video_id) + if not video_url: + abort(400, description="Unknown video id") if not is_valid_url(video_url): abort(400, description="Invalid URL") - - # Main playlist request - if filename == "index.m3u8": - playlist = dlp.get_hls_playlist(video_url) - return Response(playlist, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"}) - - # Sub-playlist or segment request - segment_url = unquote(filename) - - segment_data = dlp.get_hls_segment_with_retry(video_url, segment_url) - - if segment_data is None: - abort(500, description="Failed to fetch segment") - - # Determine content-type by filename extension - if filename.endswith(".m3u8"): - return Response(segment_data, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"}) - return Response(segment_data, mimetype="video/mp2t", headers={"Cache-Control": "public, max-age=31536000"}) + playlist = dlp.get_hls_playlist(video_url) + return Response(playlist, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"}) except HTTPException: raise @@ -140,6 +103,32 @@ def hls_proxy(full_path): return Response(str(e), status=500, mimetype="text/plain") +@app.route("/hls//seg/") +def hls_segment(video_id, seg_id): + try: + video_url = resolve_video_id(video_id) + if not video_url: + abort(400, description="Unknown video id") + + if not is_valid_url(video_url): + abort(400, description="Invalid URL") + + data = dlp.get_hls_segment_with_retry(video_url, str(seg_id)) + if data is None: + abort(500, description="Failed to fetch segment") + + return Response(data, mimetype="video/mp2t", headers={"Cache-Control": "public, max-age=31536000"}) + + except HTTPException: + raise + except ValueError as e: + logger.warning(f"Validation error: {e}") + abort(400, description=str(e)) + except Exception as e: + logger.error(f"HLS segment error: {e}") + return Response(str(e), status=500, mimetype="text/plain") + + @app.errorhandler(Exception) def handle_error(e): if isinstance(e, HTTPException): diff --git a/dlp.py b/dlp.py index fadf70c..49cb03c 100644 --- a/dlp.py +++ b/dlp.py @@ -3,7 +3,9 @@ import os import time from typing import Optional from urllib.parse import unquote +from urllib.parse import urlparse import yt_dlp +from yt_dlp.networking import Request logger = logging.getLogger(__name__) @@ -52,6 +54,75 @@ def _set_cached_info(video_url: str, info: dict) -> None: _cache_timestamps[key] = time.time() +# store segment mappings per video +_segment_maps = {} + + +def _get_segment_id(full_url: str) -> str: + """Build a stable segment id that survives signed query refreshes.""" + import hashlib + + parsed = urlparse(full_url) + stable_key = parsed.path or full_url.split("?", 1)[0] + return hashlib.md5(stable_key.encode("utf-8")).hexdigest() + + +def _refresh_hls_url(video_url: str, attempts: int = 3) -> Optional[str]: + """Re-extract until yt-dlp returns an HLS URL or we exhaust retries.""" + last_info = None + for _ in range(attempts): + _session_cache.pop(video_url, None) + _cache_timestamps.pop(video_url, None) + info = _get_video_info(video_url) + last_info = info + if info.get("hls_url"): + return info["hls_url"] + if last_info and last_info.get("direct_url"): + logger.info("Extractor returned direct URL but no HLS URL") + return None + + +def _get_request_headers(video_url: str) -> dict: + info = _get_video_info(video_url) + raw_info = info.get("raw_info") or {} + return dict(raw_info.get("http_headers") or {}) + + +def _fetch_url(video_url: str, url: str) -> bytes: + ydl = _get_ydl() + request = Request(url, headers=_get_request_headers(video_url)) + with ydl.urlopen(request) as response: + return response.read() + + +def _populate_nested_maps(video_url: str, content: str, base_url: str, video_id: str, visited: Optional[set[str]] = None, depth: int = 0) -> None: + """Preload nested playlists so segment ids survive rebuilds after 410s.""" + from urllib.parse import urljoin, urlparse + + if visited is None: + visited = set() + if depth >= 3: + return + + for line in content.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + + parsed = urlparse(line) + full_url = line if parsed.scheme else urljoin(base_url, line) + if not urlparse(full_url).path.endswith(".m3u8") or full_url in visited: + continue + + visited.add(full_url) + try: + nested_content = _fetch_url(video_url, full_url).decode("utf-8") + _rewrite_urls(nested_content, video_url, full_url, video_id) + _populate_nested_maps(video_url, nested_content, full_url, video_id, visited, depth + 1) + except Exception as e: + logger.info("Failed to preload nested playlist: %s", e) + + def _extract_hls_url(info: dict) -> Optional[str]: """Extract HLS URL from yt-dlp info dict.""" # First check top-level fields (these are set when there's only one format) @@ -183,29 +254,27 @@ def get_stream_info(video_url: str) -> dict: def get_hls_playlist(video_url: str) -> str: """Get HLS playlist content with rewritten URLs.""" - import urllib.request - import urllib.error - - # First call _get_video_info to ensure cache is populated (yt-dlp quirk) info = _get_video_info(video_url) hls_url = info.get("hls_url") if not hls_url: - raise ValueError("No HLS stream available for this video") + hls_url = _refresh_hls_url(video_url) + if not hls_url: + raise ValueError("No HLS stream available for this video") + from utils import get_video_id + video_id = get_video_id(video_url) + # Try to get playlist, retry once if URL expired for attempt in range(2): try: - with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response: - playlist_content = response.read().decode("utf-8") - return _rewrite_urls(playlist_content, video_url, hls_url) - except urllib.error.HTTPError as e: - if e.code == 410 and attempt == 0: - # Clear cache and fetch fresh HLS URL - _session_cache.pop(video_url, None) - _cache_timestamps.pop(video_url, None) + playlist_content = _fetch_url(video_url, hls_url).decode("utf-8") + rewritten = _rewrite_urls(playlist_content, video_url, hls_url, video_id) + _populate_nested_maps(video_url, playlist_content, hls_url, video_id) + return rewritten + except Exception as e: + if "410" in str(e) and attempt == 0: logger.info("HLS URL expired, fetching fresh HLS URL") - info = _get_video_info(video_url) - hls_url = info.get("hls_url") + hls_url = _refresh_hls_url(video_url) if not hls_url: raise ValueError("No HLS stream available for this video") continue @@ -220,162 +289,101 @@ def get_direct_video_url(video_url: str) -> str: return info["direct_url"] -def _rewrite_urls(content: str, video_url: str, base_url: str) -> str: +def _rewrite_urls(content: str, video_url: str, base_url: str, video_id: str) -> str: """Rewrite relative URLs in HLS playlist to point through proxy.""" - from urllib.parse import urljoin, quote, urlparse, parse_qs, urlencode - - # URL encode the video URL for safe path usage - encoded_video_url = quote(video_url, safe="") - - # Parse base URL to get directory path and query - base_parsed = urlparse(base_url) - base_path = base_parsed.path - base_query = parse_qs(base_parsed.query) - - # Get directory path (remove the .m3u8 filename) - dir_path = base_path.rsplit("/", 1)[0] + from urllib.parse import urljoin, urlparse lines = content.split("\n") new_lines = [] + + # persist mapping across nested playlists + if video_url not in _segment_maps: + _segment_maps[video_url] = {} + segment_map = _segment_maps[video_url] + for line in lines: if line and not line.startswith("#"): parsed = urlparse(line) - + if parsed.scheme: - # Absolute URL - extract just the path component - # e.g., https://example.com/video/segment.ts -> segment.ts - filename = quote(parsed.path.split("/")[-1], safe="") - if parsed.query: - filename += "?" + quote(parsed.query, safe="") + full_url = line else: - # Relative URL - use as-is (with query params if any) - filename = quote(line, safe="") - - # New format: /hls/-- (-- is delimiter) - proxy_url = f"/hls/{encoded_video_url}--{filename}" + full_url = urljoin(base_url, line) + + # stable id must ignore expiring signatures in query strings + seg_id = _get_segment_id(full_url) + segment_map[seg_id] = full_url + + proxy_url = f"/hls/{video_id}/seg/{seg_id}" new_lines.append(proxy_url) continue + new_lines.append(line) + + # mapping already updated in-place + return "\n".join(new_lines) def get_hls_segment(video_url: str, segment_url: str) -> bytes: """Get HLS segment or sub-playlist content.""" - import urllib.request - import urllib.error - from urllib.parse import unquote, urlparse, parse_qs, urlencode - - # Get the base URL from yt-dlp cache - info = _get_video_info(video_url) - hls_url = info.get("hls_url") - - if not hls_url: - raise ValueError("No HLS URL available") - - # Parse the HLS URL to get base path - base_parsed = urlparse(hls_url) - base_path = base_parsed.path.rsplit("/", 1)[0] - base_query = parse_qs(base_parsed.query) - - # Check if it's a playlist (regardless of query params) - is_playlist = unquote(segment_url).split("?")[0].endswith(".m3u8") - - # Reconstruct full URL from filename - filename = unquote(segment_url) - if "?" in filename: - rel_path, rel_query = filename.split("?", 1) - rel_qs = parse_qs(rel_query) - full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{rel_path}" - merged_qs = {**base_query, **rel_qs} - if merged_qs: - full_url += "?" + urlencode(merged_qs, doseq=True) - else: - full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{filename}" + # Pure mapping-based resolution (no yt-dlp dependency here) + # New format: segment_url is index + seg_id = segment_url + segment_map = _segment_maps.get(video_url) + if not segment_map: + # build mapping on-demand to avoid state coupling + _ = get_hls_playlist(video_url) + segment_map = _segment_maps.get(video_url) + if not segment_map: + raise ValueError("No segment map available") + + if seg_id not in segment_map: + # try rebuild once to refresh mappings (e.g., after expiry) + _ = get_hls_playlist(video_url) + segment_map = _segment_maps.get(video_url) or {} + if seg_id not in segment_map: + raise ValueError("Segment not found") + + full_url = segment_map[seg_id] try: - response = urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT) - data = response.read() - except urllib.error.HTTPError as e: - if e.code == 410: - raise ValueError("HLS URL expired (410 Gone)") - raise + data = _fetch_url(video_url, full_url) + except Exception as e: + raise ValueError("HLS URL expired (410 Gone)") from e + + # Detect playlist dynamically (covers sub-playlists too) + try: + from utils import get_video_id + video_id = get_video_id(video_url) + text = data.decode("utf-8", errors="ignore") + head = text.lstrip()[:200] + if "#EXTM3U" in head: + rewritten = _rewrite_urls(text, video_url, full_url, video_id) + return rewritten.encode("utf-8") + except Exception: + pass - if is_playlist: - return _rewrite_urls(data.decode("utf-8"), video_url, full_url).encode("utf-8") return data def get_hls_segment_with_retry(video_url: str, segment_url: str) -> bytes: - """Get HLS segment with retry on 410 error (refetches sub-playlist if needed).""" - from urllib.parse import unquote - - # Check if this is a segment (not a playlist) - is_segment = not unquote(segment_url).split("?")[0].endswith(".m3u8") - + """Get HLS segment with one rebuild after signed URL expiry.""" for attempt in range(2): try: + if video_url not in _segment_maps: + _ = get_hls_playlist(video_url) return get_hls_segment(video_url, segment_url) except ValueError as e: - if "410 Gone" in str(e) and attempt == 0: - if is_segment: - # For segments: re-fetch the sub-playlist (which has fresh segment URLs) - logger.info("Segment URL expired, re-fetching sub-playlist") - - # Get fresh HLS URL - info = _get_video_info(video_url) - hls_url = info.get("hls_url") - if not hls_url: - raise ValueError("No HLS stream available") - - # Fetch the sub-playlist from the fresh HLS URL - import urllib.request - from urllib.parse import urlparse, parse_qs, urlencode - - # Get base path from HLS URL - parsed = urlparse(hls_url) - base_path = parsed.path.rsplit("/", 1)[0] - base_query = parse_qs(parsed.query) - - # Find sub-playlist in main playlist - with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response: - playlist_content = response.read().decode("utf-8") - - # Extract sub-playlist filename from first #EXT-X-STREAM-INF - sub_playlist_path = None - for line in playlist_content.split("\n"): - if line.startswith("#EXT-X-STREAM-INF:"): - continue - elif line and not line.startswith("#"): - sub_playlist_path = line - break - - if not sub_playlist_path: - raise ValueError("Could not find sub-playlist URL") - - # Build full sub-playlist URL with fresh tokens - if "?" in sub_playlist_path: - rel_path, rel_query = sub_playlist_path.split("?", 1) - rel_qs = parse_qs(rel_query) - full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{rel_path}" - merged_qs = {**base_query, **rel_qs} - full_url += "?" + urlencode(merged_qs, doseq=True) - else: - full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{sub_playlist_path}" - - logger.info(f"Fetching fresh sub-playlist: {full_url[:100]}...") - - # Fetch sub-playlist content - with urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT) as response: - sub_content = response.read().decode("utf-8") - - # Rewrite URLs in sub-playlist - rewritten = _rewrite_urls(sub_content, video_url, full_url) - logger.info(f"Rewritten sub-playlist (first 200 chars): {rewritten[:200]}...") - return rewritten.encode("utf-8") - else: - # For sub-playlist: clear cache and retry - _session_cache.pop(video_url, None) - _cache_timestamps.pop(video_url, None) - logger.info("Sub-playlist expired, refetching") + if "410 Gone" in str(e): + if attempt == 0: + logger.info("Segment 410, retrying") continue + + logger.info("Segment still 410, rebuilding playlist and map") + _session_cache.pop(video_url, None) + _cache_timestamps.pop(video_url, None) + _segment_maps.pop(video_url, None) + _ = get_hls_playlist(video_url) + return get_hls_segment(video_url, segment_url) raise diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..38d7f71 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,31 @@ +services: + yt-dlp-proxy: + build: . + ports: + - "5000:5000" + environment: + - PORT=5000 + - LOG_LEVEL=INFO + - CACHE_TTL=31536000 + - SOCKET_TIMEOUT=30 + - ALLOWED_DOMAINS=youtube.com,youtu.be,pornhub.com,xvideos.com + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:5000/"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + +# Optional: nginx reverse proxy configuration +# Uncomment to enable +# +# nginx: +# image: nginx:latest +# ports: +# - "80:80" +# - "443:443" +# volumes: +# - ./nginx.conf:/etc/nginx/nginx.conf:ro +# depends_on: +# - yt-dlp-proxy \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 2b18644..2d7b9e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ flask>=2.0.0 yt-dlp gunicorn +requests diff --git a/templates/index.html b/templates/index.html index ab8dd9a..7082611 100644 --- a/templates/index.html +++ b/templates/index.html @@ -35,7 +35,7 @@
- +
diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..debdcf2 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,96 @@ +import subprocess +import sys +import urllib.parse +import time +import urllib.request +import os + +SERVER_PORT = 5005 + + +def wait_server(): + for _ in range(20): + try: + urllib.request.urlopen(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1) + return + except Exception: + time.sleep(0.5) + raise RuntimeError("Server not ready") + + +def test_full_proxy_flow(): + """ + AGENTS.md compliant integration test: + - real video URL + - goes through proxy + - yt-dlp consumes stream (like browser) + """ + + import threading + # ensure project root is on PYTHONPATH + ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + if ROOT not in sys.path: + sys.path.insert(0, ROOT) + import app + + # start server + t = threading.Thread( + target=lambda: app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False), + daemon=True, + ) + t.start() + + wait_server() + + video_urls = [ + "https://rt.pornhub.com/view_video.php?viewkey=ph5e7df37a9faf5", + "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690", + ] + + from utils import get_video_id + + def fetch(url): + with urllib.request.urlopen(url, timeout=10) as r: + status = r.status + data = r.read().decode("utf-8", errors="ignore") + print(f"[HTTP] {url} -> {status}") + assert status == 200, f"Request failed: {url}" + return data + + def parse_playlist(text): + return [l.strip() for l in text.split("\n") if l.strip() and not l.startswith("#")] + + def is_media_playlist(text): + return "#EXTINF" in text + + def descend_to_media(url): + text = fetch(url) + depth = 0 + while not is_media_playlist(text): + depth += 1 + assert depth <= 5, "Playlist nesting too deep" + entries = parse_playlist(text) + assert entries, "Empty playlist while descending" + next_url = entries[0] if entries[0].startswith("http") else base + entries[0] + text = fetch(next_url) + return text + + for video_url in video_urls: + video_id = get_video_id(video_url) + base = f"http://127.0.0.1:{SERVER_PORT}" + index_url = f"{base}/hls/{video_id}/index.m3u8" + + print(f"\n[TEST] Simulated player: {video_url}") + + media = descend_to_media(index_url) + segs = parse_playlist(media) + assert segs, "Empty media playlist" + + for i, seg in enumerate(segs[:3], start=1): + seg_url = base + seg + with urllib.request.urlopen(seg_url, timeout=10) as r: + status = r.status + data = r.read() + print(f"[SEG {i}] {seg_url} -> {status}, {len(data)} bytes") + assert status == 200, f"Segment failed: {seg_url}" + assert len(data) > 0, "Empty segment" diff --git a/tests/test_proxy.py b/tests/test_proxy.py deleted file mode 100644 index eefc585..0000000 --- a/tests/test_proxy.py +++ /dev/null @@ -1,416 +0,0 @@ -import pytest -import os -import sys -import subprocess -import time -import threading -import requests -import urllib.parse -import http.server -import socketserver - -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - - -TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video" -TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8" -SERVER_PORT = 5005 -TEST_HTTP_PORT = 8890 - - -def print_hex(data, max_len=200): - """Print data as hex for debugging.""" - if isinstance(data, bytes): - print(f"[HEX] {data[:max_len].hex()}") - else: - print(f"[HEX] {data[:max_len].encode().hex()}") - - -def print_headers(headers): - """Print response headers.""" - print(f"[HEADERS] {dict(headers)}") - - -def generate_test_video(): - """Generate test HLS video using ffmpeg.""" - print(f"\n[SETUP] Generating test video in {TEST_VIDEO_DIR}") - os.makedirs(TEST_VIDEO_DIR, exist_ok=True) - - cmd = [ - "ffmpeg", "-y", - "-f", "lavfi", "-i", "testsrc=duration=10:size=320x240:rate=24", - "-f", "lavfi", "-i", "sine=frequency=440:duration=10", - "-c:v", "libx264", "-c:a", "aac", "-strict", "experimental", - "-hls_time", "2", "-hls_list_size", "0", - "-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts", - TEST_VIDEO_M3U8 - ] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - if result.returncode != 0: - print(f"[ERROR] ffmpeg failed: {result.stderr}") - segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")] - print(f"[SETUP] Generated {len(segments)} segments") - return result.returncode == 0 and len(segments) > 0 - - -class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler): - def log_message(self, format, *args): - print(f"[HTTP] {self.address_string()} - {format % args}") - - -class ReusableTCPServer(socketserver.TCPServer): - allow_reuse_address = True - - -def serve_test_video(): - print(f"[SETUP] Starting test HTTP server on port {TEST_HTTP_PORT}") - os.chdir(TEST_VIDEO_DIR) - with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd: - httpd.serve_forever() - - -def start_flask_app(): - print(f"[SETUP] Starting Flask server on port {SERVER_PORT}") - import app as flask_app - flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False) - - -@pytest.fixture(scope="module") -def test_servers(): - print("\n" + "="*60) - print("INTEGRATION TEST SETUP") - print("="*60) - - generate_test_video() - - http_thread = threading.Thread(target=serve_test_video, daemon=True) - http_thread.start() - time.sleep(1) - - for _ in range(10): - try: - requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1) - break - except: - time.sleep(0.5) - print("[SETUP] Test HTTP server ready") - - flask_thread = threading.Thread(target=start_flask_app, daemon=True) - flask_thread.start() - time.sleep(2) - - for _ in range(10): - try: - requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1) - break - except: - time.sleep(0.5) - print("[SETUP] Flask server ready") - print("="*60 + "\n") - - yield - - print("\n[TEARDOWN] Tests complete") - - -# ============================================================================ -# Test URL parsing - critical function -# ============================================================================ - -class TestURLParsing: - """Test URL parsing functions as per AGENTS.md.""" - - def test_url_validation_youtube(self): - """Test YouTube URL validation.""" - from utils import is_valid_url - url = "https://www.youtube.com/watch?v=abc123" - print(f"[TEST] Validating: {url}") - result = is_valid_url(url) - print(f"[TEST] Result: {result}") - assert result is True, f"YouTube URL should be valid: {url}" - - def test_url_validation_pornhub(self): - """Test PornHub URL validation.""" - from utils import is_valid_url - url = "https://rt.pornhub.com/view_video.php?viewkey=abc123" - print(f"[TEST] Validating: {url}") - result = is_valid_url(url) - print(f"[TEST] Result: {result}") - assert result is True, f"PornHub URL should be valid: {url}" - - def test_url_validation_invalid(self): - """Test invalid URL rejection.""" - from utils import is_valid_url - url = "not-a-url" - print(f"[TEST] Validating: {url}") - result = is_valid_url(url) - print(f"[TEST] Result: {result}") - assert result is False, f"Invalid URL should be rejected: {url}" - - def test_url_validation_disallowed(self): - """Test disallowed domain rejection.""" - from utils import is_valid_url - url = "https://evil.com/video" - print(f"[TEST] Validating: {url}") - result = is_valid_url(url) - print(f"[TEST] Result: {result}") - assert result is False, f"Disallowed domain should be rejected: {url}" - - -# ============================================================================ -# Test caching - critical function -# ============================================================================ - -class TestCaching: - """Test caching mechanics as per AGENTS.md.""" - - def test_cache_store_and_retrieve(self): - """Test cache can store and retrieve data.""" - import dlp - dlp._session_cache.clear() - dlp._cache_timestamps.clear() - - url = "https://test.com/video" - data = {"title": "Test", "hls_url": "http://example.com/playlist.m3u8"} - - print(f"[TEST] Storing in cache: {url}") - dlp._session_cache[url] = data - dlp._cache_timestamps[url] = time.time() - - print(f"[TEST] Cache contents: {dlp._session_cache}") - assert url in dlp._session_cache - assert dlp._session_cache[url]["title"] == "Test" - - def test_cache_hit_detection(self): - """Test cache hit is detected.""" - import dlp - dlp._session_cache.clear() - dlp._cache_timestamps.clear() - - url = "https://test.com/video" - dlp._session_cache[url] = {"title": "Test"} - dlp._cache_timestamps[url] = time.time() - - print(f"[TEST] Checking cache for: {url}") - if url in dlp._session_cache: - print(f"[TEST] Cache HIT!") - else: - print(f"[TEST] Cache MISS!") - - -# ============================================================================ -# Test playlist proxying - critical function -# ============================================================================ - -class TestPlaylistProxying: - """Test playlist proxying as per AGENTS.md.""" - - def test_main_playlist_returns_valid_hls(self, test_servers): - """Test main playlist returns valid HLS content.""" - video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8" - encoded = urllib.parse.quote(video_url, safe="") - proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8" - - print(f"[TEST] Requesting main playlist: {proxy_url}") - response = requests.get(proxy_url, timeout=10) - - print(f"[TEST] Status: {response.status_code}") - print_headers(response.headers) - print(f"[TEST] Content preview: {response.text[:200]}") - - assert response.status_code == 200, f"Expected 200, got {response.status_code}" - assert "#EXTM3U" in response.text, "Should contain #EXTM3U" - assert ".ts" in response.text, "Should contain segment references" - print("[TEST] Main playlist returns valid HLS: PASS") - - def test_playlist_contains_proxy_urls(self, test_servers): - """Test playlist URLs are rewritten to proxy.""" - video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8" - encoded = urllib.parse.quote(video_url, safe="") - proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8" - - print(f"[TEST] Requesting playlist: {proxy_url}") - response = requests.get(proxy_url, timeout=10) - - print(f"[TEST] Content: {response.text}") - assert "/hls/" in response.text, "Playlist should contain proxy URLs" - print("[TEST] Playlist contains proxy URLs: PASS") - - def test_playlist_content_type_correct(self, test_servers): - """Test playlist returns correct content-type.""" - video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8" - encoded = urllib.parse.quote(video_url, safe="") - proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8" - - print(f"[TEST] Requesting: {proxy_url}") - response = requests.get(proxy_url, timeout=10) - - print(f"[TEST] Content-Type: {response.headers.get('Content-Type')}") - assert "application/vnd.apple.mpegurl" in response.headers.get("Content-Type", "") - assert "video/mp2t" not in response.headers.get("Content-Type", "") - print("[TEST] Playlist content-type correct: PASS") - - -# ============================================================================ -# Test segment proxying - critical function -# ============================================================================ - -class TestSegmentProxying: - """Test segment proxying as per AGENTS.md.""" - - def test_segment_returns_video_data(self, test_servers): - """Test segment returns video data.""" - video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8" - encoded = urllib.parse.quote(video_url, safe="") - playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8" - - print(f"[TEST] Getting main playlist: {playlist_url}") - playlist_resp = requests.get(playlist_url, timeout=10) - - # Find segment filename - segment_filename = None - for line in playlist_resp.text.split("\n"): - if line.startswith("/hls/") and "--" in line and ".ts" in line: - parts = line.rsplit("--", 1) - if len(parts) >= 2: - segment_filename = parts[-1] - print(f"[TEST] Found segment: {segment_filename}") - break - - assert segment_filename is not None, "Should find segment in playlist" - - seg_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--{segment_filename}" - print(f"[TEST] Requesting segment: {seg_url}") - - seg_resp = requests.get(seg_url, timeout=10) - - print(f"[TEST] Segment status: {seg_resp.status_code}") - print_headers(seg_resp.headers) - print(f"[TEST] Segment size: {len(seg_resp.content)} bytes") - - assert seg_resp.status_code == 200 - assert "video/mp2t" in seg_resp.headers.get("Content-Type", "") - assert len(seg_resp.content) > 1000, "Segment should have substantial data" - assert b"#EXTM3U" not in seg_resp.content[:100], "Segment should NOT be a playlist" - - print("[TEST] Segment returns video data: PASS") - - -# ============================================================================ -# Test error handling - critical function -# ============================================================================ - -class TestErrorHandling: - """Test error handling as per AGENTS.md.""" - - def test_player_missing_url_returns_400(self): - """Test player route with missing URL returns 400.""" - from app import app - with app.test_client() as client: - print("[TEST] Testing /player with no URL") - response = client.get("/player") - print(f"[TEST] Status: {response.status_code}") - assert response.status_code == 400 - - def test_player_invalid_url_returns_400(self): - """Test player route with invalid URL returns 400.""" - from app import app - with app.test_client() as client: - print("[TEST] Testing /player with invalid URL") - response = client.get("/player?url=not-valid") - print(f"[TEST] Status: {response.status_code}") - assert response.status_code == 400 - - def test_hls_invalid_video_url_returns_400(self): - """Test HLS route with invalid video URL returns 400.""" - from app import app - with app.test_client() as client: - print("[TEST] Testing /hls with invalid video URL") - response = client.get("/hls/evil.com--index.m3u8") - print(f"[TEST] Status: {response.status_code}") - assert response.status_code == 400 - - -# ============================================================================ -# Integration tests - main application flow as per AGENTS.md -# ============================================================================ - -class TestIntegration: - """Integration tests for main application flow as per AGENTS.md.""" - - def test_pornhub_video_full_flow(self): - """Test PornHub video with full debug output.""" - import dlp - dlp._session_cache.clear() - dlp._cache_timestamps.clear() - - video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690" - - print(f"\n[TEST] PornHub video: {video_url}") - - # Get stream info - info = dlp.get_stream_info(video_url) - print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}") - print(f"[TEST] HLS URL: {info.get('hls_url', 'N/A')[:80] if info.get('hls_url') else 'N/A'}") - - # Get playlist - playlist = dlp.get_hls_playlist(video_url) - print(f"[TEST] Playlist content (first 300 chars): {playlist[:300]}") - print_hex(playlist[:100]) - - assert "#EXTM3U" in playlist - assert "/hls/" in playlist - print("[TEST] PornHub full flow: PASS") - - def test_youtube_video_fallback(self): - """Test YouTube uses direct URL fallback.""" - import dlp - dlp._session_cache.clear() - dlp._cache_timestamps.clear() - - video_url = "https://www.youtube.com/watch?v=PoV9fS4CnaY" - - print(f"\n[TEST] YouTube video: {video_url}") - - info = dlp.get_stream_info(video_url) - print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}") - print(f"[TEST] Direct URL: {info.get('direct_url', 'N/A')[:80] if info.get('direct_url') else 'N/A'}") - - assert "title" in info - print("[TEST] YouTube fallback: PASS") - - def test_yt_dlp_consumes_proxy_playlist(self): - """Test yt-dlp can consume proxy playlist like browser.""" - import dlp - dlp._session_cache.clear() - dlp._cache_timestamps.clear() - - video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690" - encoded_url = urllib.parse.quote(video_url, safe="") - playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded_url}--index.m3u8" - - print(f"\n[TEST] yt-dlp proxy URL: {playlist_url}") - - cmd = [ - "yt-dlp", - "--hls-use-mpegts", - "--no-download", - "--print", "url", - playlist_url - ] - - print(f"[TEST] Running: {' '.join(cmd)}") - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - - print(f"[TEST] yt-dlp return code: {result.returncode}") - if result.stdout: - print(f"[TEST] yt-dlp output: {result.stdout[:200]}") - if result.returncode != 0: - print(f"[TEST] yt-dlp stderr: {result.stderr[:500]}") - - assert result.returncode == 0, f"yt-dlp failed: {result.stderr}" - print("[TEST] yt-dlp consumes proxy playlist: PASS") - - -if __name__ == "__main__": - pytest.main([__file__, "-v", "-s"]) \ No newline at end of file diff --git a/utils.py b/utils.py index b42d1b2..73cdee4 100644 --- a/utils.py +++ b/utils.py @@ -69,3 +69,17 @@ def get_error_message(status_code: int) -> str: 503: "Service Unavailable", } return errors.get(status_code, "Unknown error") +import hashlib + +# simple in-memory mapping: video_id -> original URL +_video_map = {} + + +def get_video_id(url: str) -> str: + vid = hashlib.md5(url.encode()).hexdigest() + _video_map[vid] = url + return vid + + +def resolve_video_id(vid: str) -> str | None: + return _video_map.get(vid)