97 lines
2.8 KiB
Python
97 lines
2.8 KiB
Python
import subprocess
|
|
import sys
|
|
import urllib.parse
|
|
import time
|
|
import urllib.request
|
|
import os
|
|
|
|
SERVER_PORT = 5005
|
|
|
|
|
|
def wait_server():
|
|
for _ in range(20):
|
|
try:
|
|
urllib.request.urlopen(f"http://127.0.0.1:{SERVER_PORT}/", timeout=1)
|
|
return
|
|
except Exception:
|
|
time.sleep(0.5)
|
|
raise RuntimeError("Server not ready")
|
|
|
|
|
|
def test_full_proxy_flow():
|
|
"""
|
|
AGENTS.md compliant integration test:
|
|
- real video URL
|
|
- goes through proxy
|
|
- yt-dlp consumes stream (like browser)
|
|
"""
|
|
|
|
import threading
|
|
# ensure project root is on PYTHONPATH
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT not in sys.path:
|
|
sys.path.insert(0, ROOT)
|
|
import app
|
|
|
|
# start server
|
|
t = threading.Thread(
|
|
target=lambda: app.app.run(host="127.0.0.1", port=SERVER_PORT, debug=False, use_reloader=False),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
wait_server()
|
|
|
|
video_urls = [
|
|
"https://rt.pornhub.com/view_video.php?viewkey=ph5e7df37a9faf5",
|
|
"https://rt.pornhub.com/view_video.php?viewkey=69c13273df690",
|
|
]
|
|
|
|
from utils import get_video_id
|
|
|
|
def fetch(url):
|
|
with urllib.request.urlopen(url, timeout=10) as r:
|
|
status = r.status
|
|
data = r.read().decode("utf-8", errors="ignore")
|
|
print(f"[HTTP] {url} -> {status}")
|
|
assert status == 200, f"Request failed: {url}"
|
|
return data
|
|
|
|
def parse_playlist(text):
|
|
return [l.strip() for l in text.split("\n") if l.strip() and not l.startswith("#")]
|
|
|
|
def is_media_playlist(text):
|
|
return "#EXTINF" in text
|
|
|
|
def descend_to_media(url):
|
|
text = fetch(url)
|
|
depth = 0
|
|
while not is_media_playlist(text):
|
|
depth += 1
|
|
assert depth <= 5, "Playlist nesting too deep"
|
|
entries = parse_playlist(text)
|
|
assert entries, "Empty playlist while descending"
|
|
next_url = entries[0] if entries[0].startswith("http") else base + entries[0]
|
|
text = fetch(next_url)
|
|
return text
|
|
|
|
for video_url in video_urls:
|
|
video_id = get_video_id(video_url)
|
|
base = f"http://127.0.0.1:{SERVER_PORT}"
|
|
index_url = f"{base}/hls/{video_id}/index.m3u8"
|
|
|
|
print(f"\n[TEST] Simulated player: {video_url}")
|
|
|
|
media = descend_to_media(index_url)
|
|
segs = parse_playlist(media)
|
|
assert segs, "Empty media playlist"
|
|
|
|
for i, seg in enumerate(segs[:3], start=1):
|
|
seg_url = base + seg
|
|
with urllib.request.urlopen(seg_url, timeout=10) as r:
|
|
status = r.status
|
|
data = r.read()
|
|
print(f"[SEG {i}] {seg_url} -> {status}, {len(data)} bytes")
|
|
assert status == 200, f"Segment failed: {seg_url}"
|
|
assert len(data) > 0, "Empty segment"
|