Fix HLS proxy and player functionality (first working version)

This commit is contained in:
Mikhail Yevchenko
2026-04-01 18:21:11 +00:00
parent 198f85b67d
commit 9bbbbc5a65
5 changed files with 681 additions and 303 deletions
+39 -18
View File
@@ -35,12 +35,19 @@ def player():
try:
stream_info = dlp.get_stream_info(video_url)
from urllib.parse import quote
# URL encode for path (use -- as delimiter)
encoded_url = quote(video_url, safe="")
proxy_hls_url = f"/hls?url={encoded_url}&path=index.m3u8"
# Only set HLS URL if we actually have HLS
hls_url = stream_info.get("hls_url")
proxy_hls_url = f"/hls/{encoded_url}--index.m3u8" if hls_url else None
return render_template(
"player.html",
video_url=video_url,
proxy_hls_url=proxy_hls_url,
direct_url=stream_info.get("direct_url"),
title=stream_info.get("title", "Video"),
thumbnail=stream_info.get("thumbnail")
)
@@ -49,35 +56,49 @@ def player():
abort(500, description=str(e))
@app.route("/hls")
def hls_proxy():
@app.route("/hls/<path:full_path>")
def hls_proxy(full_path):
try:
url_param = request.args.get("url", "")
if not url_param:
abort(400, description="Missing url parameter")
from urllib.parse import unquote
path = request.args.get("path", "")
video_url = unquote(url_param)
# Split: last part is filename, rest is video URL
# Format: /hls/<encoded_video_url>/<filename>
# Since / is ambiguous (in URL and in video URL), we use a delimiter
# Format: /hls/<encoded_video_url>--<filename>
if "--" not in full_path:
abort(400, description="Invalid path format")
parts = full_path.rsplit("--", 1)
if len(parts) != 2:
abort(400, description="Invalid path format")
encoded_video_url = parts[0]
filename = parts[1]
# Decode the video URL
video_url = unquote(encoded_video_url)
if not is_valid_url(video_url):
abort(400, description="Invalid URL")
# Main playlist request - get from yt-dlp and rewrite URLs
if path == "index.m3u8" or path == "":
# Main playlist request
if filename == "index.m3u8":
playlist = dlp.get_hls_playlist(video_url)
return Response(playlist, mimetype="application/vnd.apple.mpegurl")
return Response(playlist, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"})
# Sub-playlist or segment request - path is the absolute URL
segment_data = dlp.get_hls_segment(video_url, path)
# Sub-playlist or segment request
segment_url = unquote(filename)
segment_data = dlp.get_hls_segment_with_retry(video_url, segment_url)
if segment_data is None:
abort(500, description="Failed to fetch segment")
if path.endswith(".m3u8"):
return Response(segment_data, mimetype="application/vnd.apple.mpegurl")
return Response(segment_data, mimetype="video/mp2t")
# Determine content-type by filename extension
if filename.endswith(".m3u8"):
return Response(segment_data, mimetype="application/vnd.apple.mpegurl", headers={"Cache-Control": "public, max-age=31536000"})
return Response(segment_data, mimetype="video/mp2t", headers={"Cache-Control": "public, max-age=31536000"})
except HTTPException:
raise
@@ -86,7 +107,7 @@ def hls_proxy():
abort(400, description=str(e))
except Exception as e:
logger.error(f"HLS proxy error: {e}")
abort(500, description="Error fetching stream")
return Response(str(e), status=500, mimetype="text/plain")
@app.errorhandler(Exception)
+238 -25
View File
@@ -2,6 +2,7 @@ import logging
import os
import time
from typing import Optional
from urllib.parse import unquote
import yt_dlp
logger = logging.getLogger(__name__)
@@ -12,6 +13,20 @@ SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", 30))
_session_cache = {}
_cache_timestamps = {}
_ydl_instance = None
def _get_ydl():
"""Get or create a singleton yt-dlp instance."""
global _ydl_instance
if _ydl_instance is None:
_ydl_instance = yt_dlp.YoutubeDL({
"quiet": True,
"no_warnings": True,
"socket_timeout": SOCKET_TIMEOUT,
})
return _ydl_instance
def _get_cache_key(video_url: str) -> str:
return video_url
@@ -39,12 +54,58 @@ def _set_cached_info(video_url: str, info: dict) -> None:
def _extract_hls_url(info: dict) -> Optional[str]:
"""Extract HLS URL from yt-dlp info dict."""
# First check top-level fields (these are set when there's only one format)
url = info.get("manifest_url") or info.get("url")
if url and ".m3u8" in url:
return url
# Check requested_formats (post-processed by yt-dlp)
if info.get("requested_formats"):
for f in info["requested_formats"]:
url = f.get("url") or f.get("manifest_url")
if url and ".m3u8" in url:
return url
# Check formats for m3u8_native protocol
if info.get("formats"):
for f in reversed(info["formats"]):
if f.get("protocol") == "m3u8_native":
url = f.get("manifest_url") or f.get("url")
if url and ".m3u8" in url:
return url
# Try to find any m3u8 URL in formats
if info.get("formats"):
for f in info["formats"]:
url = f.get("url", "")
if ".m3u8" in url:
return url
return None
def _extract_direct_url(info: dict) -> Optional[str]:
"""Extract direct video URL when HLS is not available."""
# Check url field first
url = info.get("url")
if url:
return url
# Check requested_formats
if info.get("requested_formats"):
for f in info["requested_formats"]:
url = f.get("url")
if url:
return url
# Check formats for best quality https format
if info.get("formats"):
for f in reversed(info["formats"]):
if f.get("protocol") in ("https", "http"):
url = f.get("url")
if url:
return url
return None
@@ -54,20 +115,22 @@ def _get_video_info(video_url: str) -> dict:
if cached:
return cached
ydl_opts = {
"quiet": True,
"no_warnings": True,
"socket_timeout": SOCKET_TIMEOUT,
}
import shutil
if not shutil.which("node"):
deno_path = os.path.expanduser("~/.deno/bin/deno")
if not os.path.exists(deno_path):
logger.warning("No JavaScript runtime (node/deno) found - YouTube may not work properly")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
ydl = _get_ydl()
info = ydl.extract_info(video_url, download=False)
hls_url = _extract_hls_url(info)
direct_url = _extract_direct_url(info)
result = {
"title": info.get("title"),
"thumbnail": info.get("thumbnail"),
"hls_url": hls_url,
"direct_url": direct_url,
"raw_info": info,
}
_set_cached_info(video_url, result)
@@ -80,35 +143,83 @@ def get_stream_info(video_url: str) -> dict:
return {
"title": info["title"],
"hls_url": info["hls_url"],
"direct_url": info.get("direct_url"),
"thumbnail": info["thumbnail"],
}
def get_hls_playlist(video_url: str) -> str:
"""Get HLS playlist content with rewritten URLs."""
import urllib.request
import urllib.error
# First call _get_video_info to ensure cache is populated (yt-dlp quirk)
info = _get_video_info(video_url)
if not info["hls_url"]:
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS stream available for this video")
import urllib.request
with urllib.request.urlopen(info["hls_url"], timeout=SOCKET_TIMEOUT) as response:
playlist_content = response.read().decode("utf-8")
# Try to get playlist, retry once if URL expired
for attempt in range(2):
try:
with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response:
playlist_content = response.read().decode("utf-8")
return _rewrite_urls(playlist_content, video_url, hls_url)
except urllib.error.HTTPError as e:
if e.code == 410 and attempt == 0:
# Clear cache and fetch fresh HLS URL
_session_cache.pop(video_url, None)
_cache_timestamps.pop(video_url, None)
logger.info("HLS URL expired, fetching fresh HLS URL")
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS stream available for this video")
continue
raise
return _rewrite_urls(playlist_content, video_url, info["hls_url"])
def get_direct_video_url(video_url: str) -> str:
"""Get direct video URL when HLS is not available."""
info = _get_video_info(video_url)
if not info.get("direct_url"):
raise ValueError("No video URL available for this video")
return info["direct_url"]
def _rewrite_urls(content: str, video_url: str, base_url: str) -> str:
"""Rewrite relative URLs in HLS playlist to point through proxy."""
from urllib.parse import urljoin, quote
from urllib.parse import urljoin, quote, urlparse, parse_qs, urlencode
# URL encode the video URL for safe path usage
encoded_video_url = quote(video_url, safe="")
# Parse base URL to get directory path and query
base_parsed = urlparse(base_url)
base_path = base_parsed.path
base_query = parse_qs(base_parsed.query)
# Get directory path (remove the .m3u8 filename)
dir_path = base_path.rsplit("/", 1)[0]
lines = content.split("\n")
new_lines = []
for line in lines:
if line and not line.startswith("#") and line.startswith("http"):
abs_url = line
elif line and not line.startswith("#"):
abs_url = urljoin(base_url, line)
proxy_url = f"/hls?url={quote(video_url, safe='')}&path={quote(abs_url, safe='')}"
if line and not line.startswith("#"):
parsed = urlparse(line)
if parsed.scheme:
# Absolute URL - extract just the path component
# e.g., https://example.com/video/segment.ts -> segment.ts
filename = quote(parsed.path.split("/")[-1], safe="")
if parsed.query:
filename += "?" + quote(parsed.query, safe="")
else:
# Relative URL - use as-is (with query params if any)
filename = quote(line, safe="")
# New format: /hls/<encoded_video_url>--<filename> (-- is delimiter)
proxy_url = f"/hls/{encoded_video_url}--{filename}"
new_lines.append(proxy_url)
continue
new_lines.append(line)
@@ -117,19 +228,121 @@ def _rewrite_urls(content: str, video_url: str, base_url: str) -> str:
def get_hls_segment(video_url: str, segment_url: str) -> bytes:
"""Get HLS segment or sub-playlist content."""
from urllib.parse import unquote
decoded_url = unquote(segment_url)
import urllib.request
import urllib.error
from urllib.parse import unquote, urlparse, parse_qs, urlencode
# Get the base URL from yt-dlp cache
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS URL available")
# Parse the HLS URL to get base path
base_parsed = urlparse(hls_url)
base_path = base_parsed.path.rsplit("/", 1)[0]
base_query = parse_qs(base_parsed.query)
# Check if it's a playlist (regardless of query params)
is_playlist = unquote(segment_url).split("?")[0].endswith(".m3u8")
# Reconstruct full URL from filename
filename = unquote(segment_url)
if "?" in filename:
rel_path, rel_query = filename.split("?", 1)
rel_qs = parse_qs(rel_query)
full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{rel_path}"
merged_qs = {**base_query, **rel_qs}
if merged_qs:
full_url += "?" + urlencode(merged_qs, doseq=True)
else:
full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{base_path}/{filename}"
try:
response = urllib.request.urlopen(decoded_url, timeout=SOCKET_TIMEOUT)
response = urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT)
data = response.read()
except urllib.error.HTTPError as e:
if e.code == 410:
raise ValueError("HLS URL expired (410 Gone)")
raise
if decoded_url.endswith(".m3u8"):
return _rewrite_urls(data.decode("utf-8"), video_url, decoded_url).encode("utf-8")
if is_playlist:
return _rewrite_urls(data.decode("utf-8"), video_url, full_url).encode("utf-8")
return data
def get_hls_segment_with_retry(video_url: str, segment_url: str) -> bytes:
"""Get HLS segment with retry on 410 error (refetches sub-playlist if needed)."""
from urllib.parse import unquote
# Check if this is a segment (not a playlist)
is_segment = not unquote(segment_url).split("?")[0].endswith(".m3u8")
for attempt in range(2):
try:
return get_hls_segment(video_url, segment_url)
except ValueError as e:
if "410 Gone" in str(e) and attempt == 0:
if is_segment:
# For segments: re-fetch the sub-playlist (which has fresh segment URLs)
logger.info("Segment URL expired, re-fetching sub-playlist")
# Get fresh HLS URL
info = _get_video_info(video_url)
hls_url = info.get("hls_url")
if not hls_url:
raise ValueError("No HLS stream available")
# Fetch the sub-playlist from the fresh HLS URL
import urllib.request
from urllib.parse import urlparse, parse_qs, urlencode
# Get base path from HLS URL
parsed = urlparse(hls_url)
base_path = parsed.path.rsplit("/", 1)[0]
base_query = parse_qs(parsed.query)
# Find sub-playlist in main playlist
with urllib.request.urlopen(hls_url, timeout=SOCKET_TIMEOUT) as response:
playlist_content = response.read().decode("utf-8")
# Extract sub-playlist filename from first #EXT-X-STREAM-INF
sub_playlist_path = None
for line in playlist_content.split("\n"):
if line.startswith("#EXT-X-STREAM-INF:"):
continue
elif line and not line.startswith("#"):
sub_playlist_path = line
break
if not sub_playlist_path:
raise ValueError("Could not find sub-playlist URL")
# Build full sub-playlist URL with fresh tokens
if "?" in sub_playlist_path:
rel_path, rel_query = sub_playlist_path.split("?", 1)
rel_qs = parse_qs(rel_query)
full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{rel_path}"
merged_qs = {**base_query, **rel_qs}
full_url += "?" + urlencode(merged_qs, doseq=True)
else:
full_url = f"{parsed.scheme}://{parsed.netloc}{base_path}/{sub_playlist_path}"
logger.info(f"Fetching fresh sub-playlist: {full_url[:100]}...")
# Fetch sub-playlist content
with urllib.request.urlopen(full_url, timeout=SOCKET_TIMEOUT) as response:
sub_content = response.read().decode("utf-8")
# Rewrite URLs in sub-playlist
rewritten = _rewrite_urls(sub_content, video_url, full_url)
logger.info(f"Rewritten sub-playlist (first 200 chars): {rewritten[:200]}...")
return rewritten.encode("utf-8")
else:
# For sub-playlist: clear cache and retry
_session_cache.pop(video_url, None)
_cache_timestamps.pop(video_url, None)
logger.info("Sub-playlist expired, refetching")
continue
raise
+22 -9
View File
@@ -33,21 +33,34 @@
<a href="/" class="back-link">← Back</a>
<h1>{{ title }}</h1>
<div class="video-container">
<video controls>
Your browser does not support HLS.
<video controls id="video">
Your browser does not support video playback.
</video>
</div>
<script src="https://cdn.jsdelivr.net/npm/hls.js@latest"></script>
<script>
const video = document.querySelector('video');
const video = document.getElementById('video');
const hlsUrl = {{ proxy_hls_url | tojson }};
const directUrl = {{ direct_url | tojson }};
if (Hls.isSupported()) {
const hls = new Hls();
hls.loadSource(hlsUrl);
hls.attachMedia(video);
} else if (video.canPlayType('application/vnd.apple.mpegurl')) {
video.src = hlsUrl;
if (hlsUrl && hlsUrl !== 'null') {
if (Hls.isSupported()) {
const hls = new Hls();
hls.loadSource(hlsUrl);
hls.attachMedia(video);
} else if (video.canPlayType('application/vnd.apple.mpegurl')) {
video.src = hlsUrl;
} else {
loadDirectUrl();
}
} else if (directUrl && directUrl !== 'null') {
loadDirectUrl();
}
function loadDirectUrl() {
if (directUrl && directUrl !== 'null') {
video.src = directUrl;
}
}
</script>
</body>
-169
View File
@@ -1,169 +0,0 @@
import os
import subprocess
import time
import threading
import requests
import pytest
import sys
import urllib.parse
import http.server
import socketserver
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video"
TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8"
SERVER_PORT = 5002
TEST_HTTP_PORT = 8898
def generate_test_video():
os.makedirs(TEST_VIDEO_DIR, exist_ok=True)
cmd = [
"ffmpeg", "-y", "-f", "lavfi", "-i", "testsrc=duration=5:size=320x240:rate=24",
"-f", "lavfi", "-i", "sine=frequency=440:duration=5",
"-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
"-hls_time", "1", "-hls_list_size", "0",
"-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts",
TEST_VIDEO_M3U8
]
subprocess.run(cmd, capture_output=True, timeout=60)
assert os.path.exists(TEST_VIDEO_M3U8), "HLS manifest not generated"
segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")]
assert len(segments) > 0, "No segments generated"
class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
pass
class ReusableTCPServer(socketserver.TCPServer):
allow_reuse_address = True
def serve_test_video():
os.chdir(TEST_VIDEO_DIR)
with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd:
httpd.serve_forever()
def start_flask_app():
import app as flask_app
flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False)
@pytest.fixture(scope="module")
def test_servers():
print("\nGenerating test video...")
generate_test_video()
print(f"Starting HTTP server for test video on port {TEST_HTTP_PORT}...")
http_thread = threading.Thread(target=serve_test_video, daemon=True)
http_thread.start()
time.sleep(1)
for _ in range(10):
try:
requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1)
break
except:
time.sleep(0.5)
print("HTTP server ready")
print(f"Starting Flask proxy server on port {SERVER_PORT}...")
flask_thread = threading.Thread(target=start_flask_app, daemon=True)
flask_thread.start()
time.sleep(2)
print("Flask server ready")
yield
print("\nCleaning up...")
def test_direct_hls_access(test_servers):
"""Test that we can access the test HLS video directly"""
response = requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8", timeout=5)
assert response.status_code == 200
assert "#EXTM3U" in response.text
print("Direct HLS access: OK")
def test_hls_playlist_proxy(test_servers):
"""Test proxying HLS playlist"""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}"
response = requests.get(proxy_url, timeout=10)
assert response.status_code == 200
assert "#EXTM3U" in response.text
assert ".ts" in response.text
print("HLS playlist proxy: OK")
def test_hls_segment_proxy(test_servers):
"""Test proxying HLS segment"""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
# First get the rewritten playlist to extract the segment URL
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}"
playlist_response = requests.get(playlist_url, timeout=10)
assert playlist_response.status_code == 200
# Extract the segment path from the playlist (it's after the path= parameter)
for line in playlist_response.text.split("\n"):
if line.startswith("/hls?"):
from urllib.parse import urlparse, parse_qs
parsed = urlparse(line)
params = parse_qs(parsed.query)
if "path" in params:
segment_path = params["path"][0]
break
# Now request the segment using the path from the playlist
segment_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}&path={urllib.parse.quote(segment_path, safe='')}"
response = requests.get(segment_url, timeout=10)
assert response.status_code == 200
assert len(response.content) > 0
print("HLS segment proxy: OK")
def test_player_page(test_servers):
"""Test player page renders"""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
player_url = f"http://127.0.0.1:{SERVER_PORT}/player?url={urllib.parse.quote(video_url, safe='')}"
response = requests.get(player_url, timeout=10)
assert response.status_code == 200
assert "video" in response.text.lower()
print("Player page: OK")
def test_index_page(test_servers):
"""Test index page renders"""
response = requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=10)
assert response.status_code == 200
assert "video" in response.text.lower()
print("Index page: OK")
@pytest.mark.skip(reason="External URL test - run manually to verify pornhub support")
def test_pornhub_hls_extraction():
"""Test that pornhub HLS URLs are extracted correctly"""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
# Test with actual pornhub URL
url = "https://rt.pornhub.com/view_video.php?viewkey=69bc20ee15710"
hls_url = dlp.get_stream_info(url)["hls_url"]
assert hls_url and "m3u8" in hls_url
print(f"PornHub HLS URL: {hls_url[:100]}...")
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])
+374 -74
View File
@@ -1,116 +1,416 @@
import pytest
import sys
import os
import sys
import subprocess
import time
import threading
import requests
import urllib.parse
import http.server
import socketserver
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils import is_valid_url, extract_video_id, sanitize_path, get_error_message
import dlp
TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video"
TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8"
SERVER_PORT = 5005
TEST_HTTP_PORT = 8890
class TestURLValidation:
def test_valid_youtube_url(self):
assert is_valid_url("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
assert is_valid_url("https://youtu.be/dQw4w9WgXcQ")
def test_valid_youtu_be(self):
assert is_valid_url("https://youtu.be/abc123")
def test_valid_pornhub_url(self):
assert is_valid_url("https://www.pornhub.com/view_video.php?viewkey=abc123")
def test_invalid_url(self):
assert not is_valid_url("")
assert not is_valid_url("not-a-url")
def test_disallowed_domain(self):
os.environ["VALIDATION_ENABLED"] = "true"
assert not is_valid_url("https://evil.com/video")
def print_hex(data, max_len=200):
"""Print data as hex for debugging."""
if isinstance(data, bytes):
print(f"[HEX] {data[:max_len].hex()}")
else:
print(f"[HEX] {data[:max_len].encode().hex()}")
class TestVideoIDExtraction:
def test_extract_youtube_id(self):
assert extract_video_id("https://www.youtube.com/watch?v=dQw4w9WgXcQ") == "dQw4w9WgXcQ"
assert extract_video_id("https://youtu.be/dQw4w9WgXcQ") == "dQw4w9WgXcQ"
def test_extract_pornhub_id(self):
result = extract_video_id("https://www.pornhub.com/view_video.php?viewkey=ph123456")
assert result == "ph123456"
def test_extract_invalid(self):
assert extract_video_id("https://example.com/video") == ""
def print_headers(headers):
"""Print response headers."""
print(f"[HEADERS] {dict(headers)}")
class TestPathSanitization:
def test_sanitize_normal_path(self):
assert sanitize_path("path/to/file") == "path/to/file"
def generate_test_video():
"""Generate test HLS video using ffmpeg."""
print(f"\n[SETUP] Generating test video in {TEST_VIDEO_DIR}")
os.makedirs(TEST_VIDEO_DIR, exist_ok=True)
def test_sanitize_prevents_traversal(self):
assert sanitize_path("../etc/passwd") == "etc/passwd"
assert sanitize_path("path/../etc/passwd") == "path/etc/passwd"
cmd = [
"ffmpeg", "-y",
"-f", "lavfi", "-i", "testsrc=duration=10:size=320x240:rate=24",
"-f", "lavfi", "-i", "sine=frequency=440:duration=10",
"-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
"-hls_time", "2", "-hls_list_size", "0",
"-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts",
TEST_VIDEO_M3U8
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f"[ERROR] ffmpeg failed: {result.stderr}")
segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")]
print(f"[SETUP] Generated {len(segments)} segments")
return result.returncode == 0 and len(segments) > 0
class TestCacheMechanics:
def test_cache_basic(self):
class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
print(f"[HTTP] {self.address_string()} - {format % args}")
class ReusableTCPServer(socketserver.TCPServer):
allow_reuse_address = True
def serve_test_video():
print(f"[SETUP] Starting test HTTP server on port {TEST_HTTP_PORT}")
os.chdir(TEST_VIDEO_DIR)
with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd:
httpd.serve_forever()
def start_flask_app():
print(f"[SETUP] Starting Flask server on port {SERVER_PORT}")
import app as flask_app
flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False)
@pytest.fixture(scope="module")
def test_servers():
print("\n" + "="*60)
print("INTEGRATION TEST SETUP")
print("="*60)
generate_test_video()
http_thread = threading.Thread(target=serve_test_video, daemon=True)
http_thread.start()
time.sleep(1)
for _ in range(10):
try:
requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1)
break
except:
time.sleep(0.5)
print("[SETUP] Test HTTP server ready")
flask_thread = threading.Thread(target=start_flask_app, daemon=True)
flask_thread.start()
time.sleep(2)
for _ in range(10):
try:
requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1)
break
except:
time.sleep(0.5)
print("[SETUP] Flask server ready")
print("="*60 + "\n")
yield
print("\n[TEARDOWN] Tests complete")
# ============================================================================
# Test URL parsing - critical function
# ============================================================================
class TestURLParsing:
"""Test URL parsing functions as per AGENTS.md."""
def test_url_validation_youtube(self):
"""Test YouTube URL validation."""
from utils import is_valid_url
url = "https://www.youtube.com/watch?v=abc123"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is True, f"YouTube URL should be valid: {url}"
def test_url_validation_pornhub(self):
"""Test PornHub URL validation."""
from utils import is_valid_url
url = "https://rt.pornhub.com/view_video.php?viewkey=abc123"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is True, f"PornHub URL should be valid: {url}"
def test_url_validation_invalid(self):
"""Test invalid URL rejection."""
from utils import is_valid_url
url = "not-a-url"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is False, f"Invalid URL should be rejected: {url}"
def test_url_validation_disallowed(self):
"""Test disallowed domain rejection."""
from utils import is_valid_url
url = "https://evil.com/video"
print(f"[TEST] Validating: {url}")
result = is_valid_url(url)
print(f"[TEST] Result: {result}")
assert result is False, f"Disallowed domain should be rejected: {url}"
# ============================================================================
# Test caching - critical function
# ============================================================================
class TestCaching:
"""Test caching mechanics as per AGENTS.md."""
def test_cache_store_and_retrieve(self):
"""Test cache can store and retrieve data."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
test_data = {"title": "Test Video", "thumbnail": "http://test.com/thumb.jpg", "hls_url": "http://test.com/stream.m3u8"}
dlp._set_cached_info("http://test.com/video", test_data)
url = "https://test.com/video"
data = {"title": "Test", "hls_url": "http://example.com/playlist.m3u8"}
cached = dlp._get_cached_info("http://test.com/video")
assert cached is not None
assert cached["title"] == "Test Video"
assert cached["thumbnail"] == "http://test.com/thumb.jpg"
assert cached["hls_url"] == "http://test.com/stream.m3u8"
print(f"[TEST] Storing in cache: {url}")
dlp._session_cache[url] = data
dlp._cache_timestamps[url] = time.time()
def test_cache_expiry(self):
dlp.CACHE_TTL = 1
print(f"[TEST] Cache contents: {dlp._session_cache}")
assert url in dlp._session_cache
assert dlp._session_cache[url]["title"] == "Test"
def test_cache_hit_detection(self):
"""Test cache hit is detected."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
dlp._set_cached_info("http://test.com/video", {"data": "test"})
import time
time.sleep(1.1)
url = "https://test.com/video"
dlp._session_cache[url] = {"title": "Test"}
dlp._cache_timestamps[url] = time.time()
assert dlp._is_cache_expired("http://test.com/video") is True
dlp.CACHE_TTL = 31536000
print(f"[TEST] Checking cache for: {url}")
if url in dlp._session_cache:
print(f"[TEST] Cache HIT!")
else:
print(f"[TEST] Cache MISS!")
class TestErrorMessages:
def test_get_error_message(self):
assert "Bad Request" in get_error_message(400)
assert "Forbidden" in get_error_message(403)
assert "Not Found" in get_error_message(404)
assert "Internal Server Error" in get_error_message(500)
# ============================================================================
# Test playlist proxying - critical function
# ============================================================================
class TestPlaylistProxying:
"""Test playlist proxying as per AGENTS.md."""
def test_main_playlist_returns_valid_hls(self, test_servers):
"""Test main playlist returns valid HLS content."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
print(f"[TEST] Requesting main playlist: {proxy_url}")
response = requests.get(proxy_url, timeout=10)
print(f"[TEST] Status: {response.status_code}")
print_headers(response.headers)
print(f"[TEST] Content preview: {response.text[:200]}")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
assert "#EXTM3U" in response.text, "Should contain #EXTM3U"
assert ".ts" in response.text, "Should contain segment references"
print("[TEST] Main playlist returns valid HLS: PASS")
def test_playlist_contains_proxy_urls(self, test_servers):
"""Test playlist URLs are rewritten to proxy."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
print(f"[TEST] Requesting playlist: {proxy_url}")
response = requests.get(proxy_url, timeout=10)
print(f"[TEST] Content: {response.text}")
assert "/hls/" in response.text, "Playlist should contain proxy URLs"
print("[TEST] Playlist contains proxy URLs: PASS")
def test_playlist_content_type_correct(self, test_servers):
"""Test playlist returns correct content-type."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
print(f"[TEST] Requesting: {proxy_url}")
response = requests.get(proxy_url, timeout=10)
print(f"[TEST] Content-Type: {response.headers.get('Content-Type')}")
assert "application/vnd.apple.mpegurl" in response.headers.get("Content-Type", "")
assert "video/mp2t" not in response.headers.get("Content-Type", "")
print("[TEST] Playlist content-type correct: PASS")
class TestFlaskApp:
def test_index_route(self):
from app import app
with app.test_client() as client:
response = client.get("/")
assert response.status_code == 200
def test_player_route_missing_url(self):
# ============================================================================
# Test segment proxying - critical function
# ============================================================================
class TestSegmentProxying:
"""Test segment proxying as per AGENTS.md."""
def test_segment_returns_video_data(self, test_servers):
"""Test segment returns video data."""
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
encoded = urllib.parse.quote(video_url, safe="")
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
print(f"[TEST] Getting main playlist: {playlist_url}")
playlist_resp = requests.get(playlist_url, timeout=10)
# Find segment filename
segment_filename = None
for line in playlist_resp.text.split("\n"):
if line.startswith("/hls/") and "--" in line and ".ts" in line:
parts = line.rsplit("--", 1)
if len(parts) >= 2:
segment_filename = parts[-1]
print(f"[TEST] Found segment: {segment_filename}")
break
assert segment_filename is not None, "Should find segment in playlist"
seg_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--{segment_filename}"
print(f"[TEST] Requesting segment: {seg_url}")
seg_resp = requests.get(seg_url, timeout=10)
print(f"[TEST] Segment status: {seg_resp.status_code}")
print_headers(seg_resp.headers)
print(f"[TEST] Segment size: {len(seg_resp.content)} bytes")
assert seg_resp.status_code == 200
assert "video/mp2t" in seg_resp.headers.get("Content-Type", "")
assert len(seg_resp.content) > 1000, "Segment should have substantial data"
assert b"#EXTM3U" not in seg_resp.content[:100], "Segment should NOT be a playlist"
print("[TEST] Segment returns video data: PASS")
# ============================================================================
# Test error handling - critical function
# ============================================================================
class TestErrorHandling:
"""Test error handling as per AGENTS.md."""
def test_player_missing_url_returns_400(self):
"""Test player route with missing URL returns 400."""
from app import app
with app.test_client() as client:
print("[TEST] Testing /player with no URL")
response = client.get("/player")
print(f"[TEST] Status: {response.status_code}")
assert response.status_code == 400
def test_player_route_invalid_url(self):
def test_player_invalid_url_returns_400(self):
"""Test player route with invalid URL returns 400."""
from app import app
with app.test_client() as client:
response = client.get("/player?url=https://evil.com/video")
print("[TEST] Testing /player with invalid URL")
response = client.get("/player?url=not-valid")
print(f"[TEST] Status: {response.status_code}")
assert response.status_code == 400
def test_hls_proxy_invalid_path(self):
def test_hls_invalid_video_url_returns_400(self):
"""Test HLS route with invalid video URL returns 400."""
from app import app
with app.test_client() as client:
response = client.get("/hls")
print("[TEST] Testing /hls with invalid video URL")
response = client.get("/hls/evil.com--index.m3u8")
print(f"[TEST] Status: {response.status_code}")
assert response.status_code == 400
# ============================================================================
# Integration tests - main application flow as per AGENTS.md
# ============================================================================
class TestIntegration:
"""Integration tests for main application flow as per AGENTS.md."""
def test_pornhub_video_full_flow(self):
"""Test PornHub video with full debug output."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
print(f"\n[TEST] PornHub video: {video_url}")
# Get stream info
info = dlp.get_stream_info(video_url)
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
print(f"[TEST] HLS URL: {info.get('hls_url', 'N/A')[:80] if info.get('hls_url') else 'N/A'}")
# Get playlist
playlist = dlp.get_hls_playlist(video_url)
print(f"[TEST] Playlist content (first 300 chars): {playlist[:300]}")
print_hex(playlist[:100])
assert "#EXTM3U" in playlist
assert "/hls/" in playlist
print("[TEST] PornHub full flow: PASS")
def test_youtube_video_fallback(self):
"""Test YouTube uses direct URL fallback."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
video_url = "https://www.youtube.com/watch?v=PoV9fS4CnaY"
print(f"\n[TEST] YouTube video: {video_url}")
info = dlp.get_stream_info(video_url)
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
print(f"[TEST] Direct URL: {info.get('direct_url', 'N/A')[:80] if info.get('direct_url') else 'N/A'}")
assert "title" in info
print("[TEST] YouTube fallback: PASS")
def test_yt_dlp_consumes_proxy_playlist(self):
"""Test yt-dlp can consume proxy playlist like browser."""
import dlp
dlp._session_cache.clear()
dlp._cache_timestamps.clear()
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
encoded_url = urllib.parse.quote(video_url, safe="")
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded_url}--index.m3u8"
print(f"\n[TEST] yt-dlp proxy URL: {playlist_url}")
cmd = [
"yt-dlp",
"--hls-use-mpegts",
"--no-download",
"--print", "url",
playlist_url
]
print(f"[TEST] Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
print(f"[TEST] yt-dlp return code: {result.returncode}")
if result.stdout:
print(f"[TEST] yt-dlp output: {result.stdout[:200]}")
if result.returncode != 0:
print(f"[TEST] yt-dlp stderr: {result.stderr[:500]}")
assert result.returncode == 0, f"yt-dlp failed: {result.stderr}"
print("[TEST] yt-dlp consumes proxy playlist: PASS")
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v", "-s"])