import os import subprocess import time import threading import requests import pytest import sys import urllib.parse import http.server import socketserver sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) TEST_VIDEO_DIR = "/tmp/yt-dlp-test-video" TEST_VIDEO_M3U8 = f"{TEST_VIDEO_DIR}/index.m3u8" SERVER_PORT = 5002 TEST_HTTP_PORT = 8898 def generate_test_video(): os.makedirs(TEST_VIDEO_DIR, exist_ok=True) cmd = [ "ffmpeg", "-y", "-f", "lavfi", "-i", "testsrc=duration=5:size=320x240:rate=24", "-f", "lavfi", "-i", "sine=frequency=440:duration=5", "-c:v", "libx264", "-c:a", "aac", "-strict", "experimental", "-hls_time", "1", "-hls_list_size", "0", "-hls_segment_filename", f"{TEST_VIDEO_DIR}/segment%03d.ts", TEST_VIDEO_M3U8 ] subprocess.run(cmd, capture_output=True, timeout=60) assert os.path.exists(TEST_VIDEO_M3U8), "HLS manifest not generated" segments = [f for f in os.listdir(TEST_VIDEO_DIR) if f.endswith(".ts")] assert len(segments) > 0, "No segments generated" class QuietHTTPHandler(http.server.SimpleHTTPRequestHandler): def log_message(self, format, *args): pass class ReusableTCPServer(socketserver.TCPServer): allow_reuse_address = True def serve_test_video(): os.chdir(TEST_VIDEO_DIR) with ReusableTCPServer(("127.0.0.1", TEST_HTTP_PORT), QuietHTTPHandler) as httpd: httpd.serve_forever() def start_flask_app(): import app as flask_app flask_app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False) @pytest.fixture(scope="module") def test_servers(): print("\nGenerating test video...") generate_test_video() print(f"Starting HTTP server for test video on port {TEST_HTTP_PORT}...") http_thread = threading.Thread(target=serve_test_video, daemon=True) http_thread.start() time.sleep(1) for _ in range(10): try: requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/", timeout=1) break except: time.sleep(0.5) print("HTTP server ready") print(f"Starting Flask proxy server on port {SERVER_PORT}...") flask_thread = threading.Thread(target=start_flask_app, daemon=True) flask_thread.start() time.sleep(2) print("Flask server ready") yield print("\nCleaning up...") def test_direct_hls_access(test_servers): """Test that we can access the test HLS video directly""" response = requests.get(f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8", timeout=5) assert response.status_code == 200 assert "#EXTM3U" in response.text print("Direct HLS access: OK") def test_hls_playlist_proxy(test_servers): """Test proxying HLS playlist""" video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8" proxy_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}" response = requests.get(proxy_url, timeout=10) assert response.status_code == 200 assert "#EXTM3U" in response.text assert ".ts" in response.text print("HLS playlist proxy: OK") def test_hls_segment_proxy(test_servers): """Test proxying HLS segment""" video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8" # First get the rewritten playlist to extract the segment URL playlist_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}" playlist_response = requests.get(playlist_url, timeout=10) assert playlist_response.status_code == 200 # Extract the segment path from the playlist (it's after the path= parameter) for line in playlist_response.text.split("\n"): if line.startswith("/hls?"): from urllib.parse import urlparse, parse_qs parsed = urlparse(line) params = parse_qs(parsed.query) if "path" in params: segment_path = params["path"][0] break # Now request the segment using the path from the playlist segment_url = f"http://127.0.0.1:{SERVER_PORT}/hls?url={urllib.parse.quote(video_url, safe='')}&path={urllib.parse.quote(segment_path, safe='')}" response = requests.get(segment_url, timeout=10) assert response.status_code == 200 assert len(response.content) > 0 print("HLS segment proxy: OK") def test_player_page(test_servers): """Test player page renders""" video_url = f"http://127.0.0.1:{TEST_HTTP_PORT}/index.m3u8" player_url = f"http://127.0.0.1:{SERVER_PORT}/player?url={urllib.parse.quote(video_url, safe='')}" response = requests.get(player_url, timeout=10) assert response.status_code == 200 assert "video" in response.text.lower() print("Player page: OK") def test_index_page(test_servers): """Test index page renders""" response = requests.get(f"http://127.0.0.1:{SERVER_PORT}/", timeout=10) assert response.status_code == 200 assert "video" in response.text.lower() print("Index page: OK") @pytest.mark.skip(reason="External URL test - run manually to verify pornhub support") def test_pornhub_hls_extraction(): """Test that pornhub HLS URLs are extracted correctly""" import dlp dlp._session_cache.clear() dlp._cache_timestamps.clear() # Test with actual pornhub URL url = "https://rt.pornhub.com/view_video.php?viewkey=69bc20ee15710" hls_url = dlp.get_stream_info(url)["hls_url"] assert hls_url and "m3u8" in hls_url print(f"PornHub HLS URL: {hls_url[:100]}...") if __name__ == "__main__": pytest.main([__file__, "-v", "-s"])