Add docker support
This commit is contained in:
@@ -0,0 +1,96 @@
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.parse
|
||||
import time
|
||||
import urllib.request
|
||||
import os
|
||||
|
||||
SERVER_PORT = 5005
|
||||
|
||||
|
||||
def wait_server():
|
||||
for _ in range(20):
|
||||
try:
|
||||
urllib.request.urlopen(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1)
|
||||
return
|
||||
except Exception:
|
||||
time.sleep(0.5)
|
||||
raise RuntimeError("Server not ready")
|
||||
|
||||
|
||||
def test_full_proxy_flow():
|
||||
"""
|
||||
AGENTS.md compliant integration test:
|
||||
- real video URL
|
||||
- goes through proxy
|
||||
- yt-dlp consumes stream (like browser)
|
||||
"""
|
||||
|
||||
import threading
|
||||
# ensure project root is on PYTHONPATH
|
||||
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
if ROOT not in sys.path:
|
||||
sys.path.insert(0, ROOT)
|
||||
import app
|
||||
|
||||
# start server
|
||||
t = threading.Thread(
|
||||
target=lambda: app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False),
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
|
||||
wait_server()
|
||||
|
||||
video_urls = [
|
||||
"https://rt.pornhub.com/view_video.php?viewkey=ph5e7df37a9faf5",
|
||||
"https://rt.pornhub.com/view_video.php?viewkey=69c13273df690",
|
||||
]
|
||||
|
||||
from utils import get_video_id
|
||||
|
||||
def fetch(url):
|
||||
with urllib.request.urlopen(url, timeout=10) as r:
|
||||
status = r.status
|
||||
data = r.read().decode("utf-8", errors="ignore")
|
||||
print(f"[HTTP] {url} -> {status}")
|
||||
assert status == 200, f"Request failed: {url}"
|
||||
return data
|
||||
|
||||
def parse_playlist(text):
|
||||
return [l.strip() for l in text.split("\n") if l.strip() and not l.startswith("#")]
|
||||
|
||||
def is_media_playlist(text):
|
||||
return "#EXTINF" in text
|
||||
|
||||
def descend_to_media(url):
|
||||
text = fetch(url)
|
||||
depth = 0
|
||||
while not is_media_playlist(text):
|
||||
depth += 1
|
||||
assert depth <= 5, "Playlist nesting too deep"
|
||||
entries = parse_playlist(text)
|
||||
assert entries, "Empty playlist while descending"
|
||||
next_url = entries[0] if entries[0].startswith("http") else base + entries[0]
|
||||
text = fetch(next_url)
|
||||
return text
|
||||
|
||||
for video_url in video_urls:
|
||||
video_id = get_video_id(video_url)
|
||||
base = f"http://127.0.0.1:{SERVER_PORT}"
|
||||
index_url = f"{base}/hls/{video_id}/index.m3u8"
|
||||
|
||||
print(f"\n[TEST] Simulated player: {video_url}")
|
||||
|
||||
media = descend_to_media(index_url)
|
||||
segs = parse_playlist(media)
|
||||
assert segs, "Empty media playlist"
|
||||
|
||||
for i, seg in enumerate(segs[:3], start=1):
|
||||
seg_url = base + seg
|
||||
with urllib.request.urlopen(seg_url, timeout=10) as r:
|
||||
status = r.status
|
||||
data = r.read()
|
||||
print(f"[SEG {i}] {seg_url} -> {status}, {len(data)} bytes")
|
||||
assert status == 200, f"Segment failed: {seg_url}"
|
||||
assert len(data) > 0, "Empty segment"
|
||||
@@ -1,416 +0,0 @@
|
||||
import pytest
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import time
|
||||
import threading
|
||||
import requests
|
||||
import urllib.parse
|
||||
import http.server
|
||||
import socketserver
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video"
|
||||
TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8"
|
||||
SERVER_PORT = 5005
|
||||
TEST_HTTP_PORT = 8890
|
||||
|
||||
|
||||
def print_hex(data, max_len=200):
|
||||
"""Print data as hex for debugging."""
|
||||
if isinstance(data, bytes):
|
||||
print(f"[HEX] {data[:max_len].hex()}")
|
||||
else:
|
||||
print(f"[HEX] {data[:max_len].encode().hex()}")
|
||||
|
||||
|
||||
def print_headers(headers):
|
||||
"""Print response headers."""
|
||||
print(f"[HEADERS] {dict(headers)}")
|
||||
|
||||
|
||||
def generate_test_video():
|
||||
"""Generate test HLS video using ffmpeg."""
|
||||
print(f"\n[SETUP] Generating test video in {TEST_VIDEO_DIR}")
|
||||
os.makedirs(TEST_VIDEO_DIR, exist_ok=True)
|
||||
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-f", "lavfi", "-i", "testsrc=duration=10:size=320x240:rate=24",
|
||||
"-f", "lavfi", "-i", "sine=frequency=440:duration=10",
|
||||
"-c:v", "libx264", "-c:a", "aac", "-strict", "experimental",
|
||||
"-hls_time", "2", "-hls_list_size", "0",
|
||||
"-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts",
|
||||
TEST_VIDEO_M3U8
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||||
if result.returncode != 0:
|
||||
print(f"[ERROR] ffmpeg failed: {result.stderr}")
|
||||
segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")]
|
||||
print(f"[SETUP] Generated {len(segments)} segments")
|
||||
return result.returncode == 0 and len(segments) > 0
|
||||
|
||||
|
||||
class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def log_message(self, format, *args):
|
||||
print(f"[HTTP] {self.address_string()} - {format % args}")
|
||||
|
||||
|
||||
class ReusableTCPServer(socketserver.TCPServer):
|
||||
allow_reuse_address = True
|
||||
|
||||
|
||||
def serve_test_video():
|
||||
print(f"[SETUP] Starting test HTTP server on port {TEST_HTTP_PORT}")
|
||||
os.chdir(TEST_VIDEO_DIR)
|
||||
with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd:
|
||||
httpd.serve_forever()
|
||||
|
||||
|
||||
def start_flask_app():
|
||||
print(f"[SETUP] Starting Flask server on port {SERVER_PORT}")
|
||||
import app as flask_app
|
||||
flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_servers():
|
||||
print("\n" + "="*60)
|
||||
print("INTEGRATION TEST SETUP")
|
||||
print("="*60)
|
||||
|
||||
generate_test_video()
|
||||
|
||||
http_thread = threading.Thread(target=serve_test_video, daemon=True)
|
||||
http_thread.start()
|
||||
time.sleep(1)
|
||||
|
||||
for _ in range(10):
|
||||
try:
|
||||
requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1)
|
||||
break
|
||||
except:
|
||||
time.sleep(0.5)
|
||||
print("[SETUP] Test HTTP server ready")
|
||||
|
||||
flask_thread = threading.Thread(target=start_flask_app, daemon=True)
|
||||
flask_thread.start()
|
||||
time.sleep(2)
|
||||
|
||||
for _ in range(10):
|
||||
try:
|
||||
requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1)
|
||||
break
|
||||
except:
|
||||
time.sleep(0.5)
|
||||
print("[SETUP] Flask server ready")
|
||||
print("="*60 + "\n")
|
||||
|
||||
yield
|
||||
|
||||
print("\n[TEARDOWN] Tests complete")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test URL parsing - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestURLParsing:
|
||||
"""Test URL parsing functions as per AGENTS.md."""
|
||||
|
||||
def test_url_validation_youtube(self):
|
||||
"""Test YouTube URL validation."""
|
||||
from utils import is_valid_url
|
||||
url = "https://www.youtube.com/watch?v=abc123"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is True, f"YouTube URL should be valid: {url}"
|
||||
|
||||
def test_url_validation_pornhub(self):
|
||||
"""Test PornHub URL validation."""
|
||||
from utils import is_valid_url
|
||||
url = "https://rt.pornhub.com/view_video.php?viewkey=abc123"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is True, f"PornHub URL should be valid: {url}"
|
||||
|
||||
def test_url_validation_invalid(self):
|
||||
"""Test invalid URL rejection."""
|
||||
from utils import is_valid_url
|
||||
url = "not-a-url"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is False, f"Invalid URL should be rejected: {url}"
|
||||
|
||||
def test_url_validation_disallowed(self):
|
||||
"""Test disallowed domain rejection."""
|
||||
from utils import is_valid_url
|
||||
url = "https://evil.com/video"
|
||||
print(f"[TEST] Validating: {url}")
|
||||
result = is_valid_url(url)
|
||||
print(f"[TEST] Result: {result}")
|
||||
assert result is False, f"Disallowed domain should be rejected: {url}"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test caching - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestCaching:
|
||||
"""Test caching mechanics as per AGENTS.md."""
|
||||
|
||||
def test_cache_store_and_retrieve(self):
|
||||
"""Test cache can store and retrieve data."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
url = "https://test.com/video"
|
||||
data = {"title": "Test", "hls_url": "http://example.com/playlist.m3u8"}
|
||||
|
||||
print(f"[TEST] Storing in cache: {url}")
|
||||
dlp._session_cache[url] = data
|
||||
dlp._cache_timestamps[url] = time.time()
|
||||
|
||||
print(f"[TEST] Cache contents: {dlp._session_cache}")
|
||||
assert url in dlp._session_cache
|
||||
assert dlp._session_cache[url]["title"] == "Test"
|
||||
|
||||
def test_cache_hit_detection(self):
|
||||
"""Test cache hit is detected."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
url = "https://test.com/video"
|
||||
dlp._session_cache[url] = {"title": "Test"}
|
||||
dlp._cache_timestamps[url] = time.time()
|
||||
|
||||
print(f"[TEST] Checking cache for: {url}")
|
||||
if url in dlp._session_cache:
|
||||
print(f"[TEST] Cache HIT!")
|
||||
else:
|
||||
print(f"[TEST] Cache MISS!")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test playlist proxying - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestPlaylistProxying:
|
||||
"""Test playlist proxying as per AGENTS.md."""
|
||||
|
||||
def test_main_playlist_returns_valid_hls(self, test_servers):
|
||||
"""Test main playlist returns valid HLS content."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
print(f"[TEST] Requesting main playlist: {proxy_url}")
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
print_headers(response.headers)
|
||||
print(f"[TEST] Content preview: {response.text[:200]}")
|
||||
|
||||
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
|
||||
assert "#EXTM3U" in response.text, "Should contain #EXTM3U"
|
||||
assert ".ts" in response.text, "Should contain segment references"
|
||||
print("[TEST] Main playlist returns valid HLS: PASS")
|
||||
|
||||
def test_playlist_contains_proxy_urls(self, test_servers):
|
||||
"""Test playlist URLs are rewritten to proxy."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
print(f"[TEST] Requesting playlist: {proxy_url}")
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Content: {response.text}")
|
||||
assert "/hls/" in response.text, "Playlist should contain proxy URLs"
|
||||
print("[TEST] Playlist contains proxy URLs: PASS")
|
||||
|
||||
def test_playlist_content_type_correct(self, test_servers):
|
||||
"""Test playlist returns correct content-type."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
print(f"[TEST] Requesting: {proxy_url}")
|
||||
response = requests.get(proxy_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Content-Type: {response.headers.get('Content-Type')}")
|
||||
assert "application/vnd.apple.mpegurl" in response.headers.get("Content-Type", "")
|
||||
assert "video/mp2t" not in response.headers.get("Content-Type", "")
|
||||
print("[TEST] Playlist content-type correct: PASS")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test segment proxying - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestSegmentProxying:
|
||||
"""Test segment proxying as per AGENTS.md."""
|
||||
|
||||
def test_segment_returns_video_data(self, test_servers):
|
||||
"""Test segment returns video data."""
|
||||
video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8"
|
||||
encoded = urllib.parse.quote(video_url, safe="")
|
||||
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--index.m3u8"
|
||||
|
||||
print(f"[TEST] Getting main playlist: {playlist_url}")
|
||||
playlist_resp = requests.get(playlist_url, timeout=10)
|
||||
|
||||
# Find segment filename
|
||||
segment_filename = None
|
||||
for line in playlist_resp.text.split("\n"):
|
||||
if line.startswith("/hls/") and "--" in line and ".ts" in line:
|
||||
parts = line.rsplit("--", 1)
|
||||
if len(parts) >= 2:
|
||||
segment_filename = parts[-1]
|
||||
print(f"[TEST] Found segment: {segment_filename}")
|
||||
break
|
||||
|
||||
assert segment_filename is not None, "Should find segment in playlist"
|
||||
|
||||
seg_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded}--{segment_filename}"
|
||||
print(f"[TEST] Requesting segment: {seg_url}")
|
||||
|
||||
seg_resp = requests.get(seg_url, timeout=10)
|
||||
|
||||
print(f"[TEST] Segment status: {seg_resp.status_code}")
|
||||
print_headers(seg_resp.headers)
|
||||
print(f"[TEST] Segment size: {len(seg_resp.content)} bytes")
|
||||
|
||||
assert seg_resp.status_code == 200
|
||||
assert "video/mp2t" in seg_resp.headers.get("Content-Type", "")
|
||||
assert len(seg_resp.content) > 1000, "Segment should have substantial data"
|
||||
assert b"#EXTM3U" not in seg_resp.content[:100], "Segment should NOT be a playlist"
|
||||
|
||||
print("[TEST] Segment returns video data: PASS")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test error handling - critical function
|
||||
# ============================================================================
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Test error handling as per AGENTS.md."""
|
||||
|
||||
def test_player_missing_url_returns_400(self):
|
||||
"""Test player route with missing URL returns 400."""
|
||||
from app import app
|
||||
with app.test_client() as client:
|
||||
print("[TEST] Testing /player with no URL")
|
||||
response = client.get("/player")
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_player_invalid_url_returns_400(self):
|
||||
"""Test player route with invalid URL returns 400."""
|
||||
from app import app
|
||||
with app.test_client() as client:
|
||||
print("[TEST] Testing /player with invalid URL")
|
||||
response = client.get("/player?url=not-valid")
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_hls_invalid_video_url_returns_400(self):
|
||||
"""Test HLS route with invalid video URL returns 400."""
|
||||
from app import app
|
||||
with app.test_client() as client:
|
||||
print("[TEST] Testing /hls with invalid video URL")
|
||||
response = client.get("/hls/evil.com--index.m3u8")
|
||||
print(f"[TEST] Status: {response.status_code}")
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Integration tests - main application flow as per AGENTS.md
|
||||
# ============================================================================
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for main application flow as per AGENTS.md."""
|
||||
|
||||
def test_pornhub_video_full_flow(self):
|
||||
"""Test PornHub video with full debug output."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
|
||||
|
||||
print(f"\n[TEST] PornHub video: {video_url}")
|
||||
|
||||
# Get stream info
|
||||
info = dlp.get_stream_info(video_url)
|
||||
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
|
||||
print(f"[TEST] HLS URL: {info.get('hls_url', 'N/A')[:80] if info.get('hls_url') else 'N/A'}")
|
||||
|
||||
# Get playlist
|
||||
playlist = dlp.get_hls_playlist(video_url)
|
||||
print(f"[TEST] Playlist content (first 300 chars): {playlist[:300]}")
|
||||
print_hex(playlist[:100])
|
||||
|
||||
assert "#EXTM3U" in playlist
|
||||
assert "/hls/" in playlist
|
||||
print("[TEST] PornHub full flow: PASS")
|
||||
|
||||
def test_youtube_video_fallback(self):
|
||||
"""Test YouTube uses direct URL fallback."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
video_url = "https://www.youtube.com/watch?v=PoV9fS4CnaY"
|
||||
|
||||
print(f"\n[TEST] YouTube video: {video_url}")
|
||||
|
||||
info = dlp.get_stream_info(video_url)
|
||||
print(f"[TEST] Title: {info.get('title', 'N/A')[:50]}")
|
||||
print(f"[TEST] Direct URL: {info.get('direct_url', 'N/A')[:80] if info.get('direct_url') else 'N/A'}")
|
||||
|
||||
assert "title" in info
|
||||
print("[TEST] YouTube fallback: PASS")
|
||||
|
||||
def test_yt_dlp_consumes_proxy_playlist(self):
|
||||
"""Test yt-dlp can consume proxy playlist like browser."""
|
||||
import dlp
|
||||
dlp._session_cache.clear()
|
||||
dlp._cache_timestamps.clear()
|
||||
|
||||
video_url = "https://rt.pornhub.com/view_video.php?viewkey=69c13273df690"
|
||||
encoded_url = urllib.parse.quote(video_url, safe="")
|
||||
playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls/{encoded_url}--index.m3u8"
|
||||
|
||||
print(f"\n[TEST] yt-dlp proxy URL: {playlist_url}")
|
||||
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
"--hls-use-mpegts",
|
||||
"--no-download",
|
||||
"--print", "url",
|
||||
playlist_url
|
||||
]
|
||||
|
||||
print(f"[TEST] Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||||
|
||||
print(f"[TEST] yt-dlp return code: {result.returncode}")
|
||||
if result.stdout:
|
||||
print(f"[TEST] yt-dlp output: {result.stdout[:200]}")
|
||||
if result.returncode != 0:
|
||||
print(f"[TEST] yt-dlp stderr: {result.stderr[:500]}")
|
||||
|
||||
assert result.returncode == 0, f"yt-dlp failed: {result.stderr}"
|
||||
print("[TEST] yt-dlp consumes proxy playlist: PASS")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v", "-s"])
|
||||
Reference in New Issue
Block a user